Security Boundaries
Remote MCP deployments introduce security considerations that don't exist with local servers. This lesson covers the security architecture of cloud deployments and how to protect your MCP servers and the data they access.
The Security Landscape
When you deploy an MCP server remotely, you're exposing functionality over the internet:
┌─────────────────────────────────────────────────────────────────┐
│ SECURITY BOUNDARIES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Internet (Untrusted) │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PUBLIC BOUNDARY │ │
│ │ - TLS termination │ │
│ │ - Authentication (OAuth, API keys) │ │
│ │ - Rate limiting │ │
│ │ - Request validation │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ MCP SERVER │ │
│ │ - Tool authorization │ │
│ │ - Input validation │ │
│ │ - Output sanitization │ │
│ │ - Audit logging │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PRIVATE BOUNDARY │ │
│ │ - VPC isolation │ │
│ │ - Database credentials │ │
│ │ - Internal API access │ │
│ │ - Secrets management │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Authentication
OAuth 2.0 with Cognito (AWS)
PMCP supports OAuth 2.0 authentication via AWS Cognito:
# Initialize with OAuth support
cargo pmcp deploy init --target aws-lambda --oauth-provider cognito
# This creates:
# - Cognito User Pool for user management
# - Lambda authorizer for token validation
# - OAuth endpoints (/oauth2/authorize, /oauth2/token)
OAuth flow:
┌─────────────────────────────────────────────────────────────────┐
│ OAUTH 2.0 FLOW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Client Registration (one-time): │
│ POST /oauth2/register │
│ → Returns client_id, client_secret │
│ │
│ 2. Authorization Request: │
│ GET /oauth2/authorize?client_id=...&redirect_uri=... │
│ → User logs in, grants permission │
│ → Redirects with authorization code │
│ │
│ 3. Token Exchange: │
│ POST /oauth2/token │
│ grant_type=authorization_code&code=... │
│ → Returns access_token, refresh_token │
│ │
│ 4. API Access: │
│ POST /mcp │
│ Authorization: Bearer <access_token> │
│ → MCP request processed │
│ │
└─────────────────────────────────────────────────────────────────┘
Token Validation
The Lambda authorizer validates tokens before requests reach your MCP server:
#![allow(unused)] fn main() { // Simplified authorizer logic (generated by cargo pmcp deploy) async fn validate_token(token: &str) -> Result<AuthContext> { // Decode JWT header to get key ID let header = decode_header(token)?; let kid = header.kid.ok_or(AuthError::MissingKeyId)?; // Fetch Cognito public keys (cached) let jwks = get_cognito_jwks().await?; let key = jwks.find(&kid).ok_or(AuthError::UnknownKey)?; // Verify signature and claims let claims = decode::<Claims>(token, &key, &validation)?; // Check expiration if claims.exp < current_timestamp() { return Err(AuthError::TokenExpired); } // Check issuer if claims.iss != expected_issuer() { return Err(AuthError::InvalidIssuer); } Ok(AuthContext { user_id: claims.sub, scopes: claims.scope.split(' ').collect(), email: claims.email, }) } }
API Key Authentication
For simpler use cases, API keys can be used:
#![allow(unused)] fn main() { // In your MCP server use pmcp::middleware::ApiKeyAuth; let server = Server::builder() .name("my-server") .middleware(ApiKeyAuth::new(|api_key| { // Validate API key against your store validate_api_key(api_key).await })) .tool("query_data", ...) .build()?; }
API keys should be:
- Generated with sufficient entropy (256+ bits)
- Stored hashed (bcrypt/argon2)
- Transmitted only over HTTPS
- Rotatable without downtime
Authorization
Authentication tells you who is making a request. Authorization determines what they can do.
Scope-Based Access Control
Define scopes for different access levels:
#![allow(unused)] fn main() { #[derive(Debug, Clone)] enum Scope { Read, // Read-only access to data Write, // Modify data Admin, // Administrative operations } async fn check_authorization( auth_context: &AuthContext, tool: &str, required_scope: Scope, ) -> Result<()> { // Check if user has required scope let has_scope = match required_scope { Scope::Read => auth_context.scopes.contains(&"mcp:read"), Scope::Write => auth_context.scopes.contains(&"mcp:write"), Scope::Admin => auth_context.scopes.contains(&"mcp:admin"), }; if !has_scope { return Err(AuthError::InsufficientPermissions { user: auth_context.user_id.clone(), tool: tool.to_string(), required: format!("{:?}", required_scope), }); } // Log access for audit tracing::info!( user = %auth_context.user_id, tool = %tool, scope = ?required_scope, "Authorization granted" ); Ok(()) } }
Tool-Level Authorization
Annotate tools with required permissions:
#![allow(unused)] fn main() { use pmcp::server::TypedTool; // Read-only tool - anyone with 'read' scope can use let read_tool = TypedTool::new("list_items", |input: ListInput| async move { // Tool implementation }) .read_only() // Hint for clients .with_required_scope("mcp:read"); // Actual enforcement // Destructive tool - requires 'write' scope let write_tool = TypedTool::new("delete_item", |input: DeleteInput| async move { // Tool implementation }) .destructive() // Hint: this modifies data .with_required_scope("mcp:write"); // Admin tool - requires elevated permissions let admin_tool = TypedTool::new("purge_all", |input: PurgeInput| async move { // Tool implementation }) .destructive() .with_required_scope("mcp:admin"); }
Network Security
VPC Isolation (AWS)
Place your Lambda in a VPC to access private resources:
┌─────────────────────────────────────────────────────────────────┐
│ AWS VPC │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ PUBLIC SUBNET │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────────────────────┐ │ │
│ │ │ NAT Gateway │ │ API Gateway │ │ │
│ │ │ (for egress)│ │ (HTTPS termination) │ │ │
│ │ └─────────────┘ └─────────────────────────────┘ │ │
│ │ │ │ │
│ └────────────────────────────────┼──────────────────────────┘ │
│ │ │
│ ┌────────────────────────────────┼──────────────────────────┐ │
│ │ PRIVATE SUBNET │ │
│ │ │ │ │
│ │ ┌─────────────────────────────▼────────────────────────┐ │ │
│ │ │ Lambda Function │ │ │
│ │ │ (your MCP server) │ │ │
│ │ └─────────────────────────────┬────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────────▼────────────────────────┐ │ │
│ │ │ RDS PostgreSQL │ │ │
│ │ │ (private, no public access) │ │ │
│ │ └──────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
CDK configuration:
// In deploy/lib/stack.ts
const vpc = new ec2.Vpc(this, 'Vpc', {
maxAzs: 2,
natGateways: 1, // For Lambda to reach internet
});
const mcpFunction = new lambda.Function(this, 'McpFunction', {
// ... other config
vpc,
vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS },
securityGroups: [mcpSecurityGroup],
});
// Database in same VPC
const database = new rds.DatabaseInstance(this, 'Database', {
vpc,
vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_ISOLATED },
publiclyAccessible: false, // No internet access
});
// Allow Lambda to connect to database
database.connections.allowFrom(mcpFunction, ec2.Port.tcp(5432));
Security Groups
Restrict network access with security groups:
// Lambda security group - outbound only to database and internet
const lambdaSg = new ec2.SecurityGroup(this, 'LambdaSg', {
vpc,
description: 'MCP Lambda security group',
allowAllOutbound: false,
});
// Allow HTTPS to internet (for external APIs)
lambdaSg.addEgressRule(ec2.Peer.anyIpv4(), ec2.Port.tcp(443), 'HTTPS');
// Allow connection to database
lambdaSg.addEgressRule(
databaseSg,
ec2.Port.tcp(5432),
'PostgreSQL'
);
// Database security group - inbound only from Lambda
const databaseSg = new ec2.SecurityGroup(this, 'DatabaseSg', {
vpc,
description: 'Database security group',
allowAllOutbound: false,
});
databaseSg.addIngressRule(
lambdaSg,
ec2.Port.tcp(5432),
'From Lambda'
);
Secrets Management
Never hardcode secrets in your code or deployment configuration.
AWS Secrets Manager
Store and retrieve secrets securely:
#![allow(unused)] fn main() { use aws_sdk_secretsmanager::Client; async fn get_database_credentials() -> Result<DbCredentials> { let config = aws_config::load_from_env().await; let client = Client::new(&config); let response = client .get_secret_value() .secret_id("mcp-server/database") .send() .await?; let secret_string = response.secret_string().ok_or(Error::NoSecret)?; let credentials: DbCredentials = serde_json::from_str(secret_string)?; Ok(credentials) } // Use in Lambda initialization (cached across warm invocations) static DB_CREDENTIALS: OnceCell<DbCredentials> = OnceCell::new(); async fn get_credentials() -> &'static DbCredentials { DB_CREDENTIALS.get_or_init(|| async { get_database_credentials().await.expect("Failed to get credentials") }).await } }
Environment Variables (Limited Use)
For non-sensitive configuration, environment variables are fine:
# .pmcp/deploy.toml
[lambda.environment]
RUST_LOG = "info"
DATABASE_HOST = "db.internal.example.com" # Not a secret
# DATABASE_PASSWORD = "..." # NEVER DO THIS
# Instead, use:
DATABASE_SECRET_ARN = "arn:aws:secretsmanager:us-east-1:123456789:secret:db-creds"
Cloudflare Workers Secrets
# Set secrets via wrangler
wrangler secret put DATABASE_PASSWORD
# Enter secret value interactively (not stored in shell history)
# Access in code
async fn handler(req: Request, env: Env, ctx: Context) -> Result<Response> {
let db_password = env.secret("DATABASE_PASSWORD")?.to_string();
// ...
}
Input Validation
All input from MCP clients must be validated. Assume all input is malicious.
Schema Validation
PMCP's TypedTool validates input against JSON Schema:
#![allow(unused)] fn main() { use schemars::JsonSchema; use serde::Deserialize; #[derive(Deserialize, JsonSchema)] pub struct QueryInput { /// Table name (alphanumeric only) #[schemars(regex(pattern = r"^[a-zA-Z][a-zA-Z0-9_]*$"))] table: String, /// Maximum rows to return (1-1000) #[serde(default = "default_limit")] #[schemars(range(min = 1, max = 1000))] limit: u32, /// Filter conditions #[serde(default)] filters: Vec<Filter>, } fn default_limit() -> u32 { 100 } #[derive(Deserialize, JsonSchema)] pub struct Filter { /// Column name (alphanumeric only) #[schemars(regex(pattern = r"^[a-zA-Z][a-zA-Z0-9_]*$"))] column: String, /// Comparison operator operator: Operator, /// Value to compare (sanitized) value: serde_json::Value, } #[derive(Deserialize, JsonSchema)] #[serde(rename_all = "lowercase")] pub enum Operator { Eq, // = Ne, // != Lt, // < Gt, // > Like, // LIKE (with escaping) In, // IN (parameterized) } }
SQL Injection Prevention
Always use parameterized queries:
#![allow(unused)] fn main() { // DANGEROUS: String interpolation let query = format!( "SELECT * FROM {} WHERE name = '{}'", input.table, input.name // SQL INJECTION VULNERABILITY ); // SAFE: Parameterized query with allowlist async fn query_table(input: QueryInput) -> Result<Vec<Row>> { // Allowlist tables const ALLOWED_TABLES: &[&str] = &["users", "orders", "products"]; if !ALLOWED_TABLES.contains(&input.table.as_str()) { return Err(Error::InvalidTable(input.table)); } // Build parameterized query let mut query = sqlx::query_as::<_, Row>( &format!("SELECT * FROM {} WHERE 1=1", input.table) // Table name validated above ); // Add parameterized filters for filter in &input.filters { // Column name validated by regex in schema // Value is parameterized query = match filter.operator { Operator::Eq => query.bind(&filter.value), Operator::Like => { // Escape LIKE wildcards let escaped = escape_like(&filter.value.to_string()); query.bind(format!("%{}%", escaped)) } // ... }; } query.fetch_all(&pool).await } fn escape_like(s: &str) -> String { s.replace('\\', "\\\\") .replace('%', "\\%") .replace('_', "\\_") } }
Audit Logging
Track all access for security and compliance:
#![allow(unused)] fn main() { use tracing::{info, warn}; #[derive(Debug, Serialize)] struct AuditEvent { timestamp: DateTime<Utc>, event_type: &'static str, user_id: String, tool: String, input_hash: String, // Hash of input, not raw data result: AuditResult, duration_ms: u64, source_ip: Option<String>, } #[derive(Debug, Serialize)] enum AuditResult { Success, AuthFailure { reason: String }, ValidationError { field: String }, ExecutionError { error_type: String }, } async fn audit_tool_call( auth: &AuthContext, tool: &str, input: &serde_json::Value, result: &Result<serde_json::Value, Error>, duration: Duration, ) { let event = AuditEvent { timestamp: Utc::now(), event_type: "tool_call", user_id: auth.user_id.clone(), tool: tool.to_string(), input_hash: sha256_hex(&input.to_string()), result: match result { Ok(_) => AuditResult::Success, Err(e) => AuditResult::ExecutionError { error_type: e.to_string(), }, }, duration_ms: duration.as_millis() as u64, source_ip: auth.source_ip.clone(), }; // Log structured audit event info!( audit = ?event, "Tool call audit" ); // Optionally send to dedicated audit service if let Err(e) = send_to_audit_service(&event).await { warn!(error = %e, "Failed to send audit event"); } } }
Rate Limiting
Protect against abuse with rate limiting:
#![allow(unused)] fn main() { use governor::{Quota, RateLimiter}; use std::num::NonZeroU32; // Per-user rate limiter: 100 requests per minute static RATE_LIMITER: Lazy<RateLimiter<String, _, _>> = Lazy::new(|| { RateLimiter::keyed(Quota::per_minute(NonZeroU32::new(100).unwrap())) }); async fn check_rate_limit(user_id: &str) -> Result<()> { match RATE_LIMITER.check_key(&user_id.to_string()) { Ok(_) => Ok(()), Err(_) => Err(Error::RateLimitExceeded { retry_after: Duration::from_secs(60), }), } } // In your handler async fn handle_mcp_request(auth: AuthContext, request: Request) -> Response { // Check rate limit first if let Err(e) = check_rate_limit(&auth.user_id).await { return Response::json(&json!({ "jsonrpc": "2.0", "error": { "code": -32000, "message": "Rate limit exceeded", "data": { "retry_after": e.retry_after.as_secs() } } })).status(429); } // Process request... } }
Security Checklist
Before deploying to production, verify:
Authentication & Authorization
- OAuth or API key authentication enabled
- Token validation includes signature, expiration, and issuer checks
- Scopes defined for different access levels
- Tool authorization enforced server-side
Network Security
- TLS enforced (HTTPS only)
- Database in private subnet (no public access)
- Security groups restrict traffic to necessary ports
- VPC endpoints for AWS services (avoid internet)
Secrets Management
- No secrets in code or config files
- Secrets stored in Secrets Manager/Vault
- Secrets rotated regularly
- Least-privilege IAM roles
Input Validation
- All input validated against schema
- SQL injection prevented (parameterized queries)
- Table/column names allowlisted
- File paths validated (no traversal)
Monitoring & Response
- Audit logging enabled
- Rate limiting configured
- Alerts for suspicious activity
- Incident response plan documented
Summary
Security for remote MCP deployments requires defense in depth:
┌─────────────────────────────────────────────────────────────────┐
│ SECURITY LAYERS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: Network Perimeter │
│ - TLS/HTTPS only │
│ - DDoS protection (CloudFlare, AWS Shield) │
│ - Rate limiting │
│ │
│ Layer 2: Authentication │
│ - OAuth 2.0 / API keys │
│ - Token validation │
│ - Session management │
│ │
│ Layer 3: Authorization │
│ - Scope-based access control │
│ - Tool-level permissions │
│ - Data-level filtering │
│ │
│ Layer 4: Input Validation │
│ - Schema validation │
│ - Parameterized queries │
│ - Output sanitization │
│ │
│ Layer 5: Infrastructure │
│ - VPC isolation │
│ - Secrets management │
│ - Least-privilege IAM │
│ │
│ Layer 6: Detection & Response │
│ - Audit logging │
│ - Anomaly detection │
│ - Incident response │
│ │
└─────────────────────────────────────────────────────────────────┘
The goal is to ensure that even if one layer is compromised, other layers prevent full system compromise.