Chapter 11: Middleware

Middleware in PMCP provides a powerful way to intercept, modify, and extend request/response processing. This chapter covers both the basic Middleware trait and the enhanced AdvancedMiddleware system with priority ordering, context propagation, and advanced patterns.

Table of Contents


Understanding Middleware

Middleware operates as a chain of interceptors that process messages bidirectionally:

Client                        Middleware Chain                      Server
   |                                                                    |
   |---- Request ---> [MW1] -> [MW2] -> [MW3] -> [Transport] --------->|
   |                    ↓        ↓        ↓                             |
   |<--- Response --- [MW1] <- [MW2] <- [MW3] <- [Transport] ----------|
   |                                                                    |

When to Use Middleware

  • Cross-cutting concerns: Logging, metrics, tracing
  • Request modification: Authentication, compression, validation
  • Error handling: Retry logic, circuit breakers
  • Performance optimization: Caching, rate limiting
  • Observability: Request tracking, performance monitoring

Basic Middleware

The Middleware trait provides the foundation for request/response interception.

Trait Definition

#![allow(unused)]
fn main() {
use pmcp::shared::Middleware;
use async_trait::async_trait;

#[async_trait]
pub trait Middleware: Send + Sync {
    /// Called before a request is sent
    async fn on_request(&self, request: &mut JSONRPCRequest) -> Result<()>;

    /// Called after a response is received
    async fn on_response(&self, response: &mut JSONRPCResponse) -> Result<()>;

    /// Called when a message is sent (any type)
    async fn on_send(&self, message: &TransportMessage) -> Result<()>;

    /// Called when a message is received (any type)
    async fn on_receive(&self, message: &TransportMessage) -> Result<()>;
}
}

Basic Example

#![allow(unused)]
fn main() {
use pmcp::shared::{Middleware, TransportMessage};
use pmcp::types::{JSONRPCRequest, JSONRPCResponse};
use async_trait::async_trait;
use std::time::Instant;

/// Custom middleware that tracks request timing
struct TimingMiddleware {
    start_times: dashmap::DashMap<String, Instant>,
}

impl TimingMiddleware {
    fn new() -> Self {
        Self {
            start_times: dashmap::DashMap::new(),
        }
    }
}

#[async_trait]
impl Middleware for TimingMiddleware {
    async fn on_request(&self, request: &mut JSONRPCRequest) -> pmcp::Result<()> {
        // Track start time
        self.start_times.insert(
            request.id.to_string(),
            Instant::now()
        );

        tracing::info!("Request started: {}", request.method);
        Ok(())
    }

    async fn on_response(&self, response: &mut JSONRPCResponse) -> pmcp::Result<()> {
        // Calculate elapsed time
        if let Some((_, start)) = self.start_times.remove(&response.id.to_string()) {
            let elapsed = start.elapsed();
            tracing::info!("Response for {} took {:?}", response.id, elapsed);
        }
        Ok(())
    }
}
}

MiddlewareChain

Chain multiple middleware together for sequential processing:

#![allow(unused)]
fn main() {
use pmcp::shared::{MiddlewareChain, LoggingMiddleware};
use std::sync::Arc;
use tracing::Level;

// Create middleware chain
let mut chain = MiddlewareChain::new();

// Add middleware in order
chain.add(Arc::new(LoggingMiddleware::new(Level::INFO)));
chain.add(Arc::new(TimingMiddleware::new()));
chain.add(Arc::new(CustomMiddleware));

// Process request through all middleware
chain.process_request(&mut request).await?;

// Process response through all middleware
chain.process_response(&mut response).await?;
}

Advanced Middleware

The AdvancedMiddleware trait adds priority ordering, context propagation, conditional execution, and lifecycle hooks.

Trait Definition

#![allow(unused)]
fn main() {
use pmcp::shared::{AdvancedMiddleware, MiddlewareContext, MiddlewarePriority};

#[async_trait]
pub trait AdvancedMiddleware: Send + Sync {
    /// Get middleware priority for execution ordering
    fn priority(&self) -> MiddlewarePriority {
        MiddlewarePriority::Normal
    }

    /// Get middleware name for identification
    fn name(&self) -> &'static str;

    /// Check if middleware should execute for this context
    async fn should_execute(&self, context: &MiddlewareContext) -> bool {
        true
    }

    /// Called before a request is sent with context
    async fn on_request_with_context(
        &self,
        request: &mut JSONRPCRequest,
        context: &MiddlewareContext,
    ) -> Result<()>;

    /// Called after a response is received with context
    async fn on_response_with_context(
        &self,
        response: &mut JSONRPCResponse,
        context: &MiddlewareContext,
    ) -> Result<()>;

    /// Lifecycle hooks
    async fn on_chain_start(&self, context: &MiddlewareContext) -> Result<()>;
    async fn on_chain_complete(&self, context: &MiddlewareContext) -> Result<()>;
    async fn on_error(&self, error: &Error, context: &MiddlewareContext) -> Result<()>;
}
}

MiddlewarePriority

Control execution order with priority levels:

#![allow(unused)]
fn main() {
use pmcp::shared::MiddlewarePriority;

pub enum MiddlewarePriority {
    Critical = 0,  // Validation, security - executed first
    High = 1,      // Authentication, rate limiting
    Normal = 2,    // Business logic, transformation
    Low = 3,       // Logging, metrics
    Lowest = 4,    // Cleanup, finalization
}
}

Execution order: Higher priority (lower number) executes first for requests, last for responses.

MiddlewareContext

Share data and metrics across middleware layers:

#![allow(unused)]
fn main() {
use pmcp::shared::MiddlewareContext;

let context = MiddlewareContext::with_request_id("req-123".to_string());

// Set metadata
context.set_metadata("user_id".to_string(), "user-456".to_string());

// Get metadata
if let Some(user_id) = context.get_metadata("user_id") {
    tracing::info!("User ID: {}", user_id);
}

// Record metrics
context.record_metric("processing_time_ms".to_string(), 123.45);

// Get elapsed time
let elapsed = context.elapsed();
}

EnhancedMiddlewareChain

Automatic priority ordering and context support:

#![allow(unused)]
fn main() {
use pmcp::shared::{EnhancedMiddlewareChain, MiddlewareContext};
use std::sync::Arc;

// Create enhanced chain with auto-sorting
let mut chain = EnhancedMiddlewareChain::new();

// Add middleware (auto-sorted by priority)
chain.add(Arc::new(ValidationMiddleware));      // Critical
chain.add(Arc::new(RateLimitMiddleware::new(10, 20, Duration::from_secs(1))));  // High
chain.add(Arc::new(MetricsMiddleware::new("my-service".to_string())));  // Low

// Create context
let context = MiddlewareContext::with_request_id("req-001".to_string());

// Process with context
chain.process_request_with_context(&mut request, &context).await?;
chain.process_response_with_context(&mut response, &context).await?;
}

Built-in Middleware

PMCP provides several production-ready middleware implementations.

LoggingMiddleware

Logs all requests and responses at configurable levels:

#![allow(unused)]
fn main() {
use pmcp::shared::LoggingMiddleware;
use tracing::Level;

// Create logging middleware
let logger = LoggingMiddleware::new(Level::INFO);

// Or use default (DEBUG level)
let default_logger = LoggingMiddleware::default();
}

Use cases: Request/response visibility, debugging, audit trails.

AuthMiddleware

Adds authentication to requests:

#![allow(unused)]
fn main() {
use pmcp::shared::AuthMiddleware;

let auth = AuthMiddleware::new("Bearer api-token-12345".to_string());
}

Note: This is a basic implementation. For production, implement custom auth middleware with your authentication scheme.

RetryMiddleware

Configures retry behavior for failed requests:

#![allow(unused)]
fn main() {
use pmcp::shared::RetryMiddleware;

// Custom retry settings
let retry = RetryMiddleware::new(
    5,      // max_retries
    1000,   // initial_delay_ms
    30000   // max_delay_ms (exponential backoff cap)
);

// Or use defaults (3 retries, 1s initial, 30s max)
let default_retry = RetryMiddleware::default();
}

Use cases: Network resilience, transient failure handling.

RateLimitMiddleware (Advanced)

Token bucket rate limiting with automatic refill:

#![allow(unused)]
fn main() {
use pmcp::shared::RateLimitMiddleware;
use std::time::Duration;

// 10 requests per second, burst of 20
let rate_limiter = RateLimitMiddleware::new(
    10,                        // max_requests per refill_duration
    20,                        // bucket_size (burst capacity)
    Duration::from_secs(1)     // refill_duration
);
}

Features:

  • High priority (MiddlewarePriority::High)
  • Automatic token refill based on time
  • Thread-safe with atomic operations
  • Records rate limit metrics in context

Use cases: API rate limiting, resource protection, QoS enforcement.

CircuitBreakerMiddleware (Advanced)

Fault tolerance with automatic failure detection:

#![allow(unused)]
fn main() {
use pmcp::shared::CircuitBreakerMiddleware;
use std::time::Duration;

// Open circuit after 5 failures in 60s window, timeout for 30s
let circuit_breaker = CircuitBreakerMiddleware::new(
    5,                         // failure_threshold
    Duration::from_secs(60),   // time_window
    Duration::from_secs(30),   // timeout_duration
);
}

States:

  • Closed: Normal operation, requests pass through
  • Open: Too many failures, requests fail fast
  • Half-Open: Testing if service recovered, limited requests allowed

Features:

  • High priority (MiddlewarePriority::High)
  • Automatic state transitions
  • Records circuit breaker state in metrics

Use cases: Cascading failure prevention, service degradation, fault isolation.

MetricsMiddleware (Advanced)

Collects performance and usage metrics:

#![allow(unused)]
fn main() {
use pmcp::shared::MetricsMiddleware;

let metrics = MetricsMiddleware::new("my-service".to_string());

// Query metrics
let request_count = metrics.get_request_count("tools/call");
let error_count = metrics.get_error_count("tools/call");
let avg_duration = metrics.get_average_duration("tools/call");  // in microseconds

tracing::info!(
    "Method: tools/call, Requests: {}, Errors: {}, Avg: {}μs",
    request_count,
    error_count,
    avg_duration
);
}

Collected metrics:

  • Request counts per method
  • Error counts per method
  • Average processing time per method
  • Total processing time

Use cases: Observability, performance monitoring, capacity planning.

CompressionMiddleware (Advanced)

Compresses large messages to reduce network usage:

#![allow(unused)]
fn main() {
use pmcp::shared::{CompressionMiddleware, CompressionType};

// Gzip compression for messages larger than 1KB
let compression = CompressionMiddleware::new(
    CompressionType::Gzip,
    1024  // min_size in bytes
);

// Compression types
pub enum CompressionType {
    None,
    Gzip,
    Deflate,
}
}

Features:

  • Normal priority (MiddlewarePriority::Normal)
  • Size threshold to avoid compressing small messages
  • Records compression metrics (original size, compression type)

Use cases: Large payload optimization, bandwidth reduction.


Custom Middleware

Basic Custom Middleware

#![allow(unused)]
fn main() {
use pmcp::shared::Middleware;
use pmcp::types::{JSONRPCRequest, JSONRPCResponse};
use async_trait::async_trait;

struct MetadataMiddleware {
    client_id: String,
}

#[async_trait]
impl Middleware for MetadataMiddleware {
    async fn on_request(&self, request: &mut JSONRPCRequest) -> pmcp::Result<()> {
        tracing::info!("Client {} sending request: {}", self.client_id, request.method);
        // Could add client_id to request params here
        Ok(())
    }

    async fn on_response(&self, response: &mut JSONRPCResponse) -> pmcp::Result<()> {
        tracing::info!("Client {} received response for: {:?}", self.client_id, response.id);
        Ok(())
    }
}
}

Advanced Custom Middleware

#![allow(unused)]
fn main() {
use pmcp::shared::{AdvancedMiddleware, MiddlewareContext, MiddlewarePriority};
use pmcp::types::JSONRPCRequest;
use async_trait::async_trait;

struct ValidationMiddleware {
    strict_mode: bool,
}

#[async_trait]
impl AdvancedMiddleware for ValidationMiddleware {
    fn name(&self) -> &'static str {
        "validation"
    }

    fn priority(&self) -> MiddlewarePriority {
        MiddlewarePriority::Critical  // Run first
    }

    async fn should_execute(&self, context: &MiddlewareContext) -> bool {
        // Only execute for high-priority requests in strict mode
        if self.strict_mode {
            matches!(
                context.priority,
                Some(pmcp::shared::transport::MessagePriority::High)
            )
        } else {
            true
        }
    }

    async fn on_request_with_context(
        &self,
        request: &mut JSONRPCRequest,
        context: &MiddlewareContext,
    ) -> pmcp::Result<()> {
        // Validate request
        if request.method.is_empty() {
            context.record_metric("validation_failures".to_string(), 1.0);
            return Err(pmcp::Error::Validation("Empty method name".to_string()));
        }

        if request.jsonrpc != "2.0" {
            context.record_metric("validation_failures".to_string(), 1.0);
            return Err(pmcp::Error::Validation("Invalid JSON-RPC version".to_string()));
        }

        context.set_metadata("method".to_string(), request.method.clone());
        context.record_metric("validation_passed".to_string(), 1.0);
        Ok(())
    }
}
}

Middleware Ordering

#![allow(unused)]
fn main() {
use pmcp::shared::EnhancedMiddlewareChain;
use std::sync::Arc;

let mut chain = EnhancedMiddlewareChain::new();

// 1. Critical: Validation, security (first in, last out)
chain.add(Arc::new(ValidationMiddleware::new()));

// 2. High: Rate limiting, circuit breaker (protect downstream)
chain.add(Arc::new(RateLimitMiddleware::new(10, 20, Duration::from_secs(1))));
chain.add(Arc::new(CircuitBreakerMiddleware::new(
    5,
    Duration::from_secs(60),
    Duration::from_secs(30)
)));

// 3. Normal: Business logic, compression, transformation
chain.add(Arc::new(CompressionMiddleware::new(CompressionType::Gzip, 1024)));
chain.add(Arc::new(CustomBusinessLogic));

// 4. Low: Metrics, logging (observe everything)
chain.add(Arc::new(MetricsMiddleware::new("my-service".to_string())));
chain.add(Arc::new(LoggingMiddleware::new(Level::INFO)));
}

Ordering Principles

  1. Validation First: Reject invalid requests before doing expensive work
  2. Protection Before Processing: Rate limit and circuit break early
  3. Transform in the Middle: Business logic and compression
  4. Observe Everything: Logging and metrics wrap all operations

Manual Ordering (No Auto-Sort)

#![allow(unused)]
fn main() {
// Disable automatic priority sorting
let mut chain = EnhancedMiddlewareChain::new_no_sort();

// Add in explicit order
chain.add(Arc::new(FirstMiddleware));
chain.add(Arc::new(SecondMiddleware));
chain.add(Arc::new(ThirdMiddleware));

// Manual sort by priority if needed
chain.sort_by_priority();
}

Performance Considerations

Minimizing Overhead

#![allow(unused)]
fn main() {
// ✅ Good: Lightweight check
async fn on_request_with_context(
    &self,
    request: &mut JSONRPCRequest,
    context: &MiddlewareContext,
) -> pmcp::Result<()> {
    // Quick validation
    if !request.method.starts_with("tools/") {
        return Ok(());  // Skip early
    }

    // Expensive operation only when needed
    self.expensive_validation(request).await
}

// ❌ Bad: Always does expensive work
async fn on_request_with_context(
    &self,
    request: &mut JSONRPCRequest,
    context: &MiddlewareContext,
) -> pmcp::Result<()> {
    // Always expensive, even when unnecessary
    self.expensive_validation(request).await
}
}

Async Best Practices

#![allow(unused)]
fn main() {
// ✅ Good: Non-blocking
async fn on_request_with_context(
    &self,
    request: &mut JSONRPCRequest,
    context: &MiddlewareContext,
) -> pmcp::Result<()> {
    // Async I/O is fine
    let user = self.user_service.get_user(&request.user_id).await?;
    context.set_metadata("user_name".to_string(), user.name);
    Ok(())
}

// ❌ Bad: Blocking in async
async fn on_request_with_context(
    &self,
    request: &mut JSONRPCRequest,
    context: &MiddlewareContext,
) -> pmcp::Result<()> {
    // Blocks the executor!
    let data = std::fs::read_to_string("config.json")?;
    Ok(())
}
}

Conditional Execution

#![allow(unused)]
fn main() {
impl AdvancedMiddleware for ExpensiveMiddleware {
    async fn should_execute(&self, context: &MiddlewareContext) -> bool {
        // Only run for specific methods
        context.get_metadata("method")
            .map(|m| m.starts_with("tools/"))
            .unwrap_or(false)
    }

    async fn on_request_with_context(
        &self,
        request: &mut JSONRPCRequest,
        context: &MiddlewareContext,
    ) -> pmcp::Result<()> {
        // This only runs if should_execute returned true
        self.expensive_operation(request).await
    }
}
}

Performance Monitoring

#![allow(unused)]
fn main() {
use pmcp::shared::PerformanceMetrics;

let context = MiddlewareContext::default();

// Metrics are automatically collected
chain.process_request_with_context(&mut request, &context).await?;

// Access metrics
let metrics = context.metrics;
tracing::info!(
    "Requests: {}, Errors: {}, Avg time: {:?}",
    metrics.request_count(),
    metrics.error_count(),
    metrics.average_time()
);
}

Examples

Example 1: Basic Middleware Chain

See examples/15_middleware.rs:

use pmcp::shared::{MiddlewareChain, LoggingMiddleware};
use std::sync::Arc;
use tracing::Level;

#[tokio::main]
async fn main() -> pmcp::Result<()> {
    tracing_subscriber::fmt::init();

    // Create middleware chain
    let mut middleware = MiddlewareChain::new();
    middleware.add(Arc::new(LoggingMiddleware::new(Level::DEBUG)));
    middleware.add(Arc::new(TimingMiddleware::new()));

    // Use with transport/client
    // (middleware integration is transport-specific)

    Ok(())
}

Example 2: Enhanced Middleware System

See examples/30_enhanced_middleware.rs:

use pmcp::shared::{
    EnhancedMiddlewareChain,
    MiddlewareContext,
    RateLimitMiddleware,
    CircuitBreakerMiddleware,
    MetricsMiddleware,
    CompressionMiddleware,
    CompressionType,
};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> pmcp::Result<()> {
    tracing_subscriber::fmt().init();

    // Create enhanced chain
    let mut chain = EnhancedMiddlewareChain::new();

    // Add middleware (auto-sorted by priority)
    chain.add(Arc::new(ValidationMiddleware::new(false)));
    chain.add(Arc::new(RateLimitMiddleware::new(5, 10, Duration::from_secs(1))));
    chain.add(Arc::new(CircuitBreakerMiddleware::new(
        3,
        Duration::from_secs(10),
        Duration::from_secs(5)
    )));
    chain.add(Arc::new(MetricsMiddleware::new("my-service".to_string())));
    chain.add(Arc::new(CompressionMiddleware::new(CompressionType::Gzip, 1024)));

    tracing::info!("Middleware chain configured with {} middleware", chain.len());

    // Create context
    let context = MiddlewareContext::with_request_id("req-001".to_string());

    // Process requests
    let mut request = create_test_request();
    chain.process_request_with_context(&mut request, &context).await?;

    Ok(())
}

Example 3: Custom Validation Middleware

#![allow(unused)]
fn main() {
use pmcp::shared::{AdvancedMiddleware, MiddlewareContext, MiddlewarePriority};
use async_trait::async_trait;

// Uses your preferred JSON Schema library (e.g., jsonschema)
struct SchemaValidationMiddleware {
    schemas: Arc<HashMap<String, JsonSchema>>,
}

#[async_trait]
impl AdvancedMiddleware for SchemaValidationMiddleware {
    fn name(&self) -> &'static str {
        "schema_validation"
    }

    fn priority(&self) -> MiddlewarePriority {
        MiddlewarePriority::Critical
    }

    async fn on_request_with_context(
        &self,
        request: &mut JSONRPCRequest,
        context: &MiddlewareContext,
    ) -> pmcp::Result<()> {
        // Get schema for this method
        let schema = self.schemas.get(&request.method)
            .ok_or_else(|| pmcp::Error::Validation(
                format!("No schema for method: {}", request.method)
            ))?;

        // Validate params against schema
        if let Some(ref params) = request.params {
            schema.validate(params).map_err(|e| {
                context.record_metric("schema_validation_failed".to_string(), 1.0);
                pmcp::Error::Validation(format!("Schema validation failed: {}", e))
            })?;
        }

        context.record_metric("schema_validation_passed".to_string(), 1.0);
        Ok(())
    }
}
}

Summary

Key Takeaways

  1. Two Middleware Systems: Basic Middleware for simple cases, AdvancedMiddleware for production
  2. Priority Ordering: Control execution order with MiddlewarePriority
  3. Context Propagation: Share data and metrics with MiddlewareContext
  4. Built-in Patterns: Rate limiting, circuit breakers, metrics, compression
  5. Conditional Execution: should_execute() for selective middleware
  6. Performance: Use should_execute(), async operations, and metrics tracking

When to Use Each System

Basic Middleware (MiddlewareChain):

  • Simple logging or tracing
  • Development and debugging
  • Lightweight request modification

Advanced Middleware (EnhancedMiddlewareChain):

  • Production deployments
  • Complex ordering requirements
  • Performance monitoring
  • Fault tolerance patterns (rate limiting, circuit breakers)
  • Context-dependent behavior

Best Practices

  1. Keep Middleware Focused: Single responsibility per middleware
  2. Order Matters: Validation → Protection → Logic → Observation
  3. Use Priorities: Let EnhancedMiddlewareChain auto-sort
  4. Conditional Execution: Skip expensive operations when possible
  5. Monitor Performance: Use PerformanceMetrics and context
  6. Handle Errors Gracefully: Implement on_error() for cleanup
  7. Test in Isolation: Unit test middleware independently

Examples Reference

  • examples/15_middleware.rs: Basic middleware chain
  • examples/30_enhanced_middleware.rs: Advanced patterns with built-in middleware
  • Inline doctests in src/shared/middleware.rs demonstrate each middleware

HTTP-Level Middleware

HTTP-level middleware operates at the HTTP transport layer, before MCP protocol processing. This is useful for header injection, authentication, compression, and other HTTP-specific concerns.

Architecture: Two-Layer Middleware System

PMCP has two distinct middleware layers:

┌─────────────────────────────────────────────────────────────┐
│  Client Application                                         │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  Protocol-Level Middleware (AdvancedMiddleware)             │
│  - Operates on JSONRPCRequest/JSONRPCResponse               │
│  - LoggingMiddleware, MetricsMiddleware, ValidationMiddleware│
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  HTTP-Level Middleware (HttpMiddleware)                     │
│  - Operates on HTTP request/response                        │
│  - OAuthClientMiddleware, header injection, compression     │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  HTTP Transport (StreamableHttpTransport)                   │
└─────────────────────────────────────────────────────────────┘

Key principle: Middleware doesn’t run twice. Protocol-level operates on JSON-RPC messages, HTTP-level operates on HTTP requests.

HttpMiddleware Trait

#![allow(unused)]
fn main() {
use pmcp::client::http_middleware::{HttpMiddleware, HttpRequest, HttpResponse, HttpMiddlewareContext};
use async_trait::async_trait;

#[async_trait]
pub trait HttpMiddleware: Send + Sync {
    /// Called before HTTP request is sent
    async fn on_request(
        &self,
        request: &mut HttpRequest,
        context: &HttpMiddlewareContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Called after HTTP response is received
    async fn on_response(
        &self,
        response: &mut HttpResponse,
        context: &HttpMiddlewareContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Called when an error occurs
    async fn on_error(
        &self,
        error: &pmcp::Error,
        context: &HttpMiddlewareContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Priority for ordering (lower runs first)
    fn priority(&self) -> i32 {
        50 // Default priority
    }

    /// Should this middleware execute for this context?
    async fn should_execute(&self, _context: &HttpMiddlewareContext) -> bool {
        true
    }
}
}

HttpRequest and HttpResponse

Simplified HTTP representations for middleware:

#![allow(unused)]
fn main() {
pub struct HttpRequest {
    pub method: String,           // "GET", "POST", etc.
    pub url: String,
    pub headers: HashMap<String, String>,
    pub body: Vec<u8>,
}

pub struct HttpResponse {
    pub status: u16,
    pub headers: HashMap<String, String>,
    pub body: Vec<u8>,
}
}

HttpMiddlewareContext

Context for HTTP middleware execution:

#![allow(unused)]
fn main() {
pub struct HttpMiddlewareContext {
    pub request_id: Option<String>,
    pub url: String,
    pub method: String,
    pub attempt: u32,
    pub metadata: Arc<RwLock<HashMap<String, String>>>,
}

// Usage
let context = HttpMiddlewareContext::new(url.to_string(), "POST".to_string());
context.set_metadata("user_id".to_string(), "user-123".to_string());
let user_id = context.get_metadata("user_id");
}

OAuthClientMiddleware

Built-in OAuth middleware for automatic token injection:

#![allow(unused)]
fn main() {
use pmcp::client::oauth_middleware::{OAuthClientMiddleware, BearerToken};
use std::time::Duration;

// Create bearer token
let token = BearerToken::new("my-api-token".to_string());

// Or with expiration
let token = BearerToken::with_expiry(
    "my-api-token".to_string(),
    Duration::from_secs(3600) // 1 hour
);

// Create OAuth middleware
let oauth_middleware = OAuthClientMiddleware::new(token);

// Add to HttpMiddlewareChain
let mut http_chain = HttpMiddlewareChain::new();
http_chain.add(Arc::new(oauth_middleware));
}

Features:

  • Automatic token injection into Authorization header
  • Token expiry checking before each request
  • 401/403 detection for token refresh triggers
  • OAuth precedence policy (respects transport auth_provider)

OAuth Precedence Policy

To avoid duplicate authentication, OAuth middleware follows this precedence:

1. Transport auth_provider (highest priority)
   ↓
2. HttpMiddleware OAuth (OAuthClientMiddleware)
   ↓
3. Extra headers from config (lowest priority)

The middleware checks auth_already_set metadata to skip injection when transport auth is configured:

#![allow(unused)]
fn main() {
// OAuth middleware checks metadata
if context.get_metadata("auth_already_set").is_some() {
    // Skip - transport auth_provider takes precedence
    return Ok(());
}

// Also skips if Authorization header already present
if request.has_header("Authorization") {
    // Warn about duplicate auth configuration
    return Ok(());
}
}

Example: Custom HTTP Middleware

#![allow(unused)]
fn main() {
use pmcp::client::http_middleware::{HttpMiddleware, HttpRequest, HttpMiddlewareContext};
use async_trait::async_trait;

/// Adds custom correlation headers
struct CorrelationHeaderMiddleware {
    service_name: String,
}

#[async_trait]
impl HttpMiddleware for CorrelationHeaderMiddleware {
    async fn on_request(
        &self,
        request: &mut HttpRequest,
        context: &HttpMiddlewareContext,
    ) -> pmcp::Result<()> {
        // Add service name header
        request.add_header("X-Service-Name".to_string(), self.service_name.clone());

        // Add timestamp header
        request.add_header(
            "X-Request-Timestamp".to_string(),
            std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_secs()
                .to_string(),
        );

        // Add request ID if available
        if let Some(req_id) = &context.request_id {
            request.add_header("X-Request-ID".to_string(), req_id.clone());
        }

        Ok(())
    }

    fn priority(&self) -> i32 {
        20 // Run after OAuth (priority 10)
    }
}
}

Integration with ClientBuilder

Use ClientBuilder::with_middleware() for protocol-level middleware:

#![allow(unused)]
fn main() {
use pmcp::ClientBuilder;
use pmcp::shared::{MetricsMiddleware, LoggingMiddleware};
use std::sync::Arc;

let transport = /* your transport */;

let client = ClientBuilder::new(transport)
    .with_middleware(Arc::new(MetricsMiddleware::new("my-client".to_string())))
    .with_middleware(Arc::new(LoggingMiddleware::default()))
    .build();
}

Note: HTTP middleware is configured separately on the transport (StreamableHttpTransport), not via ClientBuilder.

Integration with StreamableHttpTransport

Configure HTTP middleware when creating the transport:

#![allow(unused)]
fn main() {
use pmcp::shared::{StreamableHttpTransport, StreamableHttpConfig};
use pmcp::client::http_middleware::HttpMiddlewareChain;
use pmcp::client::oauth_middleware::{OAuthClientMiddleware, BearerToken};
use std::sync::Arc;
use std::time::Duration;

// Create HTTP middleware chain
let mut http_chain = HttpMiddlewareChain::new();

// Add OAuth middleware
let token = BearerToken::with_expiry(
    "api-token-12345".to_string(),
    Duration::from_secs(3600)
);
http_chain.add(Arc::new(OAuthClientMiddleware::new(token)));

// Add custom middleware
http_chain.add(Arc::new(CorrelationHeaderMiddleware {
    service_name: "my-client".to_string(),
}));

// Create transport config with HTTP middleware
let config = StreamableHttpConfig::new("https://api.example.com".to_string())
    .with_http_middleware(Arc::new(http_chain));

let transport = StreamableHttpTransport::with_config(config).await?;
}

Complete Example: OAuth + Protocol Middleware

use pmcp::{ClientBuilder, ClientCapabilities};
use pmcp::shared::{StreamableHttpTransport, StreamableHttpConfig, MetricsMiddleware};
use pmcp::client::http_middleware::HttpMiddlewareChain;
use pmcp::client::oauth_middleware::{OAuthClientMiddleware, BearerToken};
use std::sync::Arc;
use std::time::Duration;

#[tokio::main]
async fn main() -> pmcp::Result<()> {
    // 1. Create HTTP middleware chain
    let mut http_chain = HttpMiddlewareChain::new();

    // Add OAuth (priority 10 - runs first)
    let token = BearerToken::with_expiry(
        std::env::var("API_TOKEN")?,
        Duration::from_secs(3600)
    );
    http_chain.add(Arc::new(OAuthClientMiddleware::new(token)));

    // Add correlation headers (priority 20 - runs after OAuth)
    http_chain.add(Arc::new(CorrelationHeaderMiddleware {
        service_name: "my-service".to_string(),
    }));

    // 2. Create transport with HTTP middleware
    let config = StreamableHttpConfig::new("https://api.example.com".to_string())
        .with_http_middleware(Arc::new(http_chain));

    let transport = StreamableHttpTransport::with_config(config).await?;

    // 3. Create client with protocol middleware
    let client = ClientBuilder::new(transport)
        .with_middleware(Arc::new(MetricsMiddleware::new("my-client".to_string())))
        .build();

    // 4. Use client - both middleware layers automatically apply
    let mut client = client;
    let init_result = client.initialize(ClientCapabilities::minimal()).await?;

    println!("Connected: {}", init_result.server_info.name);

    Ok(())
}

Middleware Execution Flow

For a typical HTTP POST request:

1. Client.call_tool(name, args)
   ↓
2. Protocol Middleware (Request):
   - MetricsMiddleware::on_request()
   - LoggingMiddleware::on_request()
   ↓
3. Transport serialization (JSON-RPC → bytes)
   ↓
4. HTTP Middleware (Request):
   - OAuthClientMiddleware::on_request() → Add Authorization header
   - CorrelationHeaderMiddleware::on_request() → Add X-Service-Name, X-Request-ID
   ↓
5. HTTP Transport sends POST request
   ↓
6. HTTP Transport receives response
   ↓
7. HTTP Middleware (Response) [reverse order]:
   - CorrelationHeaderMiddleware::on_response()
   - OAuthClientMiddleware::on_response() → Check for 401
   ↓
8. Transport deserialization (bytes → JSON-RPC)
   ↓
9. Protocol Middleware (Response) [reverse order]:
   - LoggingMiddleware::on_response()
   - MetricsMiddleware::on_response()
   ↓
10. Client receives result

Error Handling

Both middleware layers support error hooks:

#![allow(unused)]
fn main() {
#[async_trait]
impl HttpMiddleware for MyMiddleware {
    async fn on_error(
        &self,
        error: &pmcp::Error,
        context: &HttpMiddlewareContext,
    ) -> pmcp::Result<()> {
        // Log error with context
        tracing::error!(
            "HTTP error for {} {}: {}",
            context.method,
            context.url,
            error
        );

        // Clean up resources if needed
        self.cleanup().await;

        Ok(())
    }
}
}

Short-circuit behavior: If middleware returns an error:

  1. Processing stops immediately
  2. on_error() is called for ALL middleware in the chain
  3. Original error is returned to caller

Middleware Priority Reference

HTTP Middleware:

  • 0-9: Reserved for critical security middleware
  • 10: OAuthClientMiddleware (default)
  • 20-49: Custom authentication/authorization
  • 50: Default priority
  • 51-99: Logging, metrics, headers

Protocol Middleware:

  • Critical (0): Validation, security
  • High (1): Rate limiting, circuit breakers
  • Normal (2): Business logic, compression
  • Low (3): Metrics, logging
  • Lowest (4): Cleanup

Server Middleware

Server middleware provides the same two-layer architecture for MCP servers: protocol-level (JSON-RPC) and HTTP-level (transport).

Architecture: Server Two-Layer Middleware

┌─────────────────────────────────────────────────────────────┐
│  Client HTTP Request                                        │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  Server HTTP Middleware (ServerHttpMiddleware)              │
│  - Operates on HTTP request/response                        │
│  - ServerHttpLoggingMiddleware, auth verification, CORS     │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  Protocol-Level Middleware (AdvancedMiddleware)             │
│  - Operates on JSONRPCRequest/JSONRPCResponse               │
│  - MetricsMiddleware, validation, business logic            │
└────────────────────────┬────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────┐
│  Server Request Handler                                     │
└─────────────────────────────────────────────────────────────┘

Execution order:

  • Request: HTTP middleware → Protocol middleware → Handler
  • Response: Handler → Protocol middleware → HTTP middleware
  • Notification: Protocol middleware only (SSE streaming)

ServerHttpMiddleware Trait

HTTP-level middleware for server transport:

#![allow(unused)]
fn main() {
use pmcp::server::http_middleware::{
    ServerHttpMiddleware, ServerHttpRequest, ServerHttpResponse, ServerHttpContext
};
use async_trait::async_trait;

#[async_trait]
pub trait ServerHttpMiddleware: Send + Sync {
    /// Called before processing HTTP request
    async fn on_request(
        &self,
        request: &mut ServerHttpRequest,
        context: &ServerHttpContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Called after generating HTTP response
    async fn on_response(
        &self,
        response: &mut ServerHttpResponse,
        context: &ServerHttpContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Called when an error occurs
    async fn on_error(
        &self,
        error: &pmcp::Error,
        context: &ServerHttpContext,
    ) -> pmcp::Result<()> {
        Ok(())
    }

    /// Priority for ordering (lower runs first)
    fn priority(&self) -> i32 {
        50 // Default priority
    }

    /// Should this middleware execute?
    async fn should_execute(&self, _context: &ServerHttpContext) -> bool {
        true
    }
}
}

ServerHttpRequest and ServerHttpResponse

Simplified HTTP representations with HeaderMap support:

#![allow(unused)]
fn main() {
use hyper::http::{HeaderMap, HeaderValue, Method, StatusCode, Uri};

pub struct ServerHttpRequest {
    pub method: Method,
    pub uri: Uri,
    pub headers: HeaderMap<HeaderValue>,
    pub body: Vec<u8>,
}

pub struct ServerHttpResponse {
    pub status: StatusCode,
    pub headers: HeaderMap<HeaderValue>,
    pub body: Vec<u8>,
}

// Helper methods
impl ServerHttpRequest {
    pub fn get_header(&self, name: &str) -> Option<&str>;
    pub fn add_header(&mut self, name: &str, value: &str);
}

impl ServerHttpResponse {
    pub fn new(status: StatusCode, headers: HeaderMap<HeaderValue>, body: Vec<u8>) -> Self;
    pub fn get_header(&self, name: &str) -> Option<&str>;
    pub fn add_header(&mut self, name: &str, value: &str);
}
}

ServerHttpContext

Context for server HTTP middleware:

#![allow(unused)]
fn main() {
pub struct ServerHttpContext {
    pub request_id: String,
    pub session_id: Option<String>,
    pub start_time: std::time::Instant,
}

impl ServerHttpContext {
    /// Get elapsed time since request started
    pub fn elapsed(&self) -> std::time::Duration;
}
}

ServerHttpLoggingMiddleware

Built-in logging middleware with sensitive data redaction:

#![allow(unused)]
fn main() {
use pmcp::server::http_middleware::ServerHttpLoggingMiddleware;

// Create with secure defaults
let logging = ServerHttpLoggingMiddleware::new()
    .with_level(tracing::Level::INFO)       // Log level
    .with_redact_query(true)                // Strip query params
    .with_max_body_bytes(1024);             // Log first 1KB of body

// Default sensitive headers are redacted:
// - authorization, cookie, x-api-key, x-amz-security-token, x-goog-api-key

// Add custom redacted header
let logging = logging.redact_header("x-internal-token");

// Allow specific header (use with caution!)
let logging = logging.allow_header("x-debug-header");
}

Features:

  • Secure by default: Redacts authorization, cookies, API keys
  • Query stripping: Optionally redact query parameters
  • Body gating: Only log safe content types (JSON, text)
  • SSE detection: Skips body logging for text/event-stream
  • Multi-value headers: Preserves all header values

Use cases: Request/response visibility, debugging, audit trails, compliance.

ServerPreset: Default Middleware Bundles

ServerPreset provides pre-configured middleware for common server scenarios:

#![allow(unused)]
fn main() {
use pmcp::server::preset::ServerPreset;
use pmcp::server::builder::ServerCoreBuilder;
use pmcp::server::streamable_http_server::{StreamableHttpServer, StreamableHttpServerConfig};
use std::sync::Arc;
use tokio::sync::Mutex;

// Create preset with defaults
let preset = ServerPreset::default();

// Build server with protocol middleware
let server = ServerCoreBuilder::new()
    .name("my-server")
    .version("1.0.0")
    .protocol_middleware(preset.protocol_middleware())
    .build()?;

// Create HTTP server with HTTP middleware
let config = StreamableHttpServerConfig {
    http_middleware: preset.http_middleware(),
    ..Default::default()
};

let http_server = StreamableHttpServer::with_config(
    "127.0.0.1:3000".parse().unwrap(),
    Arc::new(Mutex::new(server)),
    config,
);
}

Defaults:

  • Protocol: MetricsMiddleware (tracks requests, durations, errors)
  • HTTP: ServerHttpLoggingMiddleware (INFO level, redaction enabled)

Opt-in customization:

#![allow(unused)]
fn main() {
use pmcp::shared::middleware::RateLimitMiddleware;
use std::time::Duration;

// Add rate limiting (protocol layer)
let preset = ServerPreset::new("my-service")
    .with_rate_limit(RateLimitMiddleware::new(100, 100, Duration::from_secs(60)));

// Add custom HTTP middleware (transport layer)
let preset = preset.with_http_middleware_item(MyCustomMiddleware);
}

Example: Custom Server HTTP Middleware

#![allow(unused)]
fn main() {
use pmcp::server::http_middleware::{ServerHttpMiddleware, ServerHttpRequest, ServerHttpContext};
use async_trait::async_trait;

/// Adds CORS headers for browser clients
struct CorsMiddleware {
    allowed_origins: Vec<String>,
}

#[async_trait]
impl ServerHttpMiddleware for CorsMiddleware {
    async fn on_response(
        &self,
        response: &mut ServerHttpResponse,
        context: &ServerHttpContext,
    ) -> pmcp::Result<()> {
        // Add CORS headers
        response.add_header(
            "Access-Control-Allow-Origin",
            &self.allowed_origins.join(", ")
        );
        response.add_header(
            "Access-Control-Allow-Methods",
            "GET, POST, OPTIONS"
        );
        response.add_header(
            "Access-Control-Allow-Headers",
            "Content-Type, Authorization"
        );

        Ok(())
    }

    fn priority(&self) -> i32 {
        90 // Run late (after logging)
    }
}
}

Integration with StreamableHttpServer

Configure middleware when creating the server:

#![allow(unused)]
fn main() {
use pmcp::server::streamable_http_server::{StreamableHttpServer, StreamableHttpServerConfig};
use pmcp::server::http_middleware::ServerHttpMiddlewareChain;
use pmcp::server::preset::ServerPreset;
use std::sync::Arc;
use tokio::sync::Mutex;

// Option 1: Use ServerPreset (recommended)
let preset = ServerPreset::new("my-service");

let server = ServerCoreBuilder::new()
    .name("my-server")
    .version("1.0.0")
    .protocol_middleware(preset.protocol_middleware())
    .build()?;

let config = StreamableHttpServerConfig {
    http_middleware: preset.http_middleware(),
    ..Default::default()
};

let http_server = StreamableHttpServer::with_config(
    "127.0.0.1:3000".parse().unwrap(),
    Arc::new(Mutex::new(server)),
    config,
);

// Option 2: Custom HTTP middleware chain
let mut http_chain = ServerHttpMiddlewareChain::new();
http_chain.add(Arc::new(ServerHttpLoggingMiddleware::new()));
http_chain.add(Arc::new(CorsMiddleware {
    allowed_origins: vec!["https://example.com".to_string()],
}));

let config = StreamableHttpServerConfig {
    http_middleware: Some(Arc::new(http_chain)),
    ..Default::default()
};
}

Server Middleware Execution Flow

For a typical HTTP POST request:

1. HTTP POST arrives at StreamableHttpServer
   ↓
2. Server HTTP Middleware (Request):
   - ServerHttpLoggingMiddleware::on_request() → Log request
   - CorsMiddleware::on_request() → Check origin
   ↓
3. Deserialize JSON-RPC from body
   ↓
4. Protocol Middleware (Request):
   - MetricsMiddleware::on_request() → Increment counter
   - ValidationMiddleware::on_request() → Validate method
   ↓
5. Server Request Handler (tool/prompt/resource)
   ↓
6. Protocol Middleware (Response) [reverse order]:
   - ValidationMiddleware::on_response()
   - MetricsMiddleware::on_response() → Record duration
   ↓
7. Serialize JSON-RPC to body
   ↓
8. Server HTTP Middleware (Response) [reverse order]:
   - CorsMiddleware::on_response() → Add CORS headers
   - ServerHttpLoggingMiddleware::on_response() → Log response
   ↓
9. HTTP response sent to client

Protocol Middleware Ordering

Protocol middleware execution order is determined by priority:

#![allow(unused)]
fn main() {
use pmcp::shared::middleware::MiddlewarePriority;

pub enum MiddlewarePriority {
    Critical = 0,  // Validation, security - executed first
    High = 1,      // Rate limiting, circuit breakers
    Normal = 2,    // Business logic, transformation
    Low = 3,       // Metrics
    Lowest = 4,    // Logging, cleanup
}
}

Request order: Lower priority value executes first (Critical → High → Normal → Low → Lowest) Response order: Reverse (Lowest → Low → Normal → High → Critical) Notification order: Same as request (Critical → High → Normal → Low → Lowest)

SSE Notification Routing

For SSE streaming, notifications go through protocol middleware only:

1. Server generates notification (e.g., progress update)
   ↓
2. Protocol Middleware (Notification):
   - ValidationMiddleware::on_notification()
   - MetricsMiddleware::on_notification()
   ↓
3. Serialize as SSE event
   ↓
4. Stream to client (no HTTP middleware - already connected)

Note: HTTP middleware does NOT run for SSE notifications, only for the initial connection.

Fast-Path Optimization

StreamableHttpServer uses a fast path when no HTTP middleware is configured:

#![allow(unused)]
fn main() {
// Fast path: No HTTP middleware
if config.http_middleware.is_none() {
    // Direct JSON-RPC parsing, zero copies
    return handle_post_fast_path(state, request).await;
}

// Middleware path: Full chain processing
handle_post_with_middleware(state, request).await
}

Performance: Fast path skips HTTP request/response conversions entirely.

Example: Complete Server Setup

use pmcp::server::preset::ServerPreset;
use pmcp::server::builder::ServerCoreBuilder;
use pmcp::server::streamable_http_server::{StreamableHttpServer, StreamableHttpServerConfig};
use pmcp::server::ToolHandler;
use pmcp::shared::middleware::RateLimitMiddleware;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

#[tokio::main]
async fn main() -> pmcp::Result<()> {
    // 1. Create preset with defaults + rate limiting
    let preset = ServerPreset::new("my-api-server")
        .with_rate_limit(RateLimitMiddleware::new(100, 100, Duration::from_secs(60)));

    // 2. Build server with protocol middleware
    let server = ServerCoreBuilder::new()
        .name("my-api-server")
        .version("1.0.0")
        .tool("echo", EchoTool)
        .tool("calculate", CalculateTool)
        .protocol_middleware(preset.protocol_middleware())
        .build()?;

    // 3. Create HTTP server with HTTP middleware
    let config = StreamableHttpServerConfig {
        http_middleware: preset.http_middleware(),
        session_id_generator: Some(Box::new(|| {
            format!("session-{}", uuid::Uuid::new_v4())
        })),
        ..Default::default()
    };

    let http_server = StreamableHttpServer::with_config(
        "127.0.0.1:3000".parse().unwrap(),
        Arc::new(Mutex::new(server)),
        config,
    );

    // 4. Start server
    let (addr, handle) = http_server.start().await?;
    println!("Server listening on: {}", addr);

    // Server now has:
    // - HTTP logging with redaction
    // - Protocol metrics collection
    // - Rate limiting (100 req/min)
    // - Session management

    handle.await?;
    Ok(())
}

Best Practices

  1. Use ServerPreset for common cases: Defaults cover most production needs
  2. HTTP middleware for transport concerns: Auth, CORS, logging, compression
  3. Protocol middleware for business logic: Validation, metrics, rate limiting
  4. Order matters:
    • HTTP: Auth → CORS → Logging
    • Protocol: Validation → Rate Limit → Metrics
  5. Redaction is critical: Never log sensitive headers/query params
  6. Fast path for performance: Omit HTTP middleware if not needed
  7. SSE optimization: Don’t buffer bodies for text/event-stream

Server Middleware Priority Reference

HTTP Middleware:

  • 0-9: Reserved for critical security middleware
  • 10-29: Authentication, authorization
  • 30-49: CORS, security headers
  • 50: Default priority (ServerHttpLoggingMiddleware)
  • 51-99: Metrics, custom headers

Protocol Middleware (same as client):

  • Critical (0): Validation, security
  • High (1): Rate limiting, circuit breakers
  • Normal (2): Business logic
  • Low (3): Metrics
  • Lowest (4): Logging, cleanup

Further Reading

  • Repository docs: docs/advanced/middleware-composition.md
  • Advanced Middleware API: https://docs.rs/pmcp/latest/pmcp/shared/middleware/
  • Performance Metrics API: https://docs.rs/pmcp/latest/pmcp/shared/middleware/struct.PerformanceMetrics.html
  • Example: examples/40_middleware_demo.rs - Complete two-layer middleware demonstration