Ch23 Custom Transports

Introduction

MCP is transport-agnostic: it can run over any bidirectional communication channel that supports JSON-RPC 2.0 message exchange. While the PMCP SDK provides built-in transports (stdio, HTTP, WebSocket, SSE), you can build custom transports for specialized use cases.

When to Build Custom Transports

Consider custom transports when:

✅ Good Reasons:

  • Async messaging systems: SQS, SNS, Kafka, RabbitMQ for decoupled architectures
  • Custom protocols: Domain-specific protocols in your infrastructure
  • Performance optimization: Specialized binary protocols, compression, batching
  • Testing: In-memory or mock transports for unit/integration tests
  • Legacy integration: Wrapping existing communication channels
  • Security requirements: Custom encryption, authentication flows

❌ Avoid Custom Transports For:

  • Standard HTTP/HTTPS servers → Use built-in HttpTransport
  • Local processes → Use built-in StdioTransport
  • Real-time WebSocket → Use built-in WebSocketTransport

Client vs Server Reality

Critical Insight: Like web browsers vs websites, there will be far fewer MCP clients than MCP servers.

  • Few clients: Claude Desktop, IDEs, agent frameworks (like web browsers)
  • Many servers: Every tool, API, database, service (like websites)

Implication: Custom transports require both sides to implement the same protocol. Unless you control both client and server (e.g., internal infrastructure), stick to standard transports for maximum compatibility.

The Transport Trait

All transports in PMCP implement the Transport trait:

#![allow(unused)]
fn main() {
use pmcp::shared::{Transport, TransportMessage};
use async_trait::async_trait;

#[async_trait]
pub trait Transport: Send + Sync + Debug {
    /// Send a message (request, response, or notification)
    async fn send(&mut self, message: TransportMessage) -> Result<()>;

    /// Receive a message (blocks until message arrives)
    async fn receive(&mut self) -> Result<TransportMessage>;

    /// Close the transport gracefully
    async fn close(&mut self) -> Result<()>;

    /// Check if transport is still connected (optional)
    fn is_connected(&self) -> bool {
        true
    }

    /// Transport type name for debugging (optional)
    fn transport_type(&self) -> &'static str {
        "unknown"
    }
}
}

Message Types

TransportMessage represents all MCP communication:

#![allow(unused)]
fn main() {
pub enum TransportMessage {
    /// Request with ID (expects response)
    Request {
        id: RequestId,
        request: Request,  // ClientRequest or ServerRequest
    },

    /// Response to a request
    Response(JSONRPCResponse),

    /// Notification (no response expected)
    Notification(Notification),
}
}

Key Design Points:

  • Framing: Your transport handles message boundaries
  • Serialization: PMCP handles JSON-RPC serialization
  • Bidirectional: Both send and receive must work concurrently
  • Error handling: Use pmcp::error::TransportError for transport-specific errors

Example 1: In-Memory Transport (Testing)

Use case: Unit tests, benchmarks, integration tests without network

#![allow(unused)]
fn main() {
use pmcp::shared::{Transport, TransportMessage};
use pmcp::error::Result;
use async_trait::async_trait;
use tokio::sync::mpsc;
use std::sync::Arc;
use tokio::sync::Mutex;

/// In-memory transport using channels
#[derive(Debug)]
pub struct InMemoryTransport {
    /// Channel for sending messages
    tx: mpsc::Sender<TransportMessage>,
    /// Channel for receiving messages
    rx: Arc<Mutex<mpsc::Receiver<TransportMessage>>>,
    /// Connected state
    connected: Arc<std::sync::atomic::AtomicBool>,
}

impl InMemoryTransport {
    /// Create a pair of connected transports (client <-> server)
    pub fn create_pair() -> (Self, Self) {
        let (tx1, rx1) = mpsc::channel(100);
        let (tx2, rx2) = mpsc::channel(100);

        let transport1 = Self {
            tx: tx1,
            rx: Arc::new(Mutex::new(rx2)),
            connected: Arc::new(std::sync::atomic::AtomicBool::new(true)),
        };

        let transport2 = Self {
            tx: tx2,
            rx: Arc::new(Mutex::new(rx1)),
            connected: Arc::new(std::sync::atomic::AtomicBool::new(true)),
        };

        (transport1, transport2)
    }
}

#[async_trait]
impl Transport for InMemoryTransport {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        use std::sync::atomic::Ordering;

        if !self.connected.load(Ordering::Relaxed) {
            return Err(pmcp::error::Error::Transport(
                pmcp::error::TransportError::ConnectionClosed
            ));
        }

        self.tx
            .send(message)
            .await
            .map_err(|_| pmcp::error::Error::Transport(
                pmcp::error::TransportError::ConnectionClosed
            ))
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        let mut rx = self.rx.lock().await;
        rx.recv()
            .await
            .ok_or_else(|| pmcp::error::Error::Transport(
                pmcp::error::TransportError::ConnectionClosed
            ))
    }

    async fn close(&mut self) -> Result<()> {
        use std::sync::atomic::Ordering;
        self.connected.store(false, Ordering::Relaxed);
        Ok(())
    }

    fn is_connected(&self) -> bool {
        use std::sync::atomic::Ordering;
        self.connected.load(Ordering::Relaxed)
    }

    fn transport_type(&self) -> &'static str {
        "in-memory"
    }
}

// Usage in tests
#[cfg(test)]
mod tests {
    use super::*;
    use pmcp::{Client, Server, ClientCapabilities};

    #[tokio::test]
    async fn test_in_memory_transport() {
        // Create connected pair
        let (client_transport, server_transport) = InMemoryTransport::create_pair();

        // Create client and server
        let mut client = Client::new(client_transport);
        let server = Server::builder()
            .name("test-server")
            .version("1.0.0")
            .build()
            .unwrap();

        // Run server in background
        tokio::spawn(async move {
            server.run(server_transport).await
        });

        // Client can now communicate with server
        let result = client.initialize(ClientCapabilities::minimal()).await;
        assert!(result.is_ok());
    }
}
}

Benefits:

  • ✅ Zero network overhead
  • ✅ Deterministic testing
  • ✅ Fast benchmarks
  • ✅ Isolated test environments

Example 2: Async Queue Transport (Production)

Use case: Decoupled architectures with message queues (SQS, Kafka, RabbitMQ)

This example shows a conceptual async transport using AWS SQS:

#![allow(unused)]
fn main() {
use pmcp::shared::{Transport, TransportMessage};
use pmcp::error::Result;
use async_trait::async_trait;
use aws_sdk_sqs::Client as SqsClient;
use aws_config;  // For load_from_env
use tokio::sync::mpsc;
use std::sync::Arc;
use tokio::sync::Mutex;

/// SQS-based async transport
#[derive(Debug)]
pub struct SqsTransport {
    /// SQS client
    sqs: SqsClient,
    /// Request queue URL (for sending)
    request_queue_url: String,
    /// Response queue URL (for receiving)
    response_queue_url: String,
    /// Local message buffer
    message_rx: Arc<Mutex<mpsc::Receiver<TransportMessage>>>,
    message_tx: mpsc::Sender<TransportMessage>,
    /// Background poller handle
    poller_handle: Option<tokio::task::JoinHandle<()>>,
}

impl SqsTransport {
    pub async fn new(
        request_queue_url: String,
        response_queue_url: String,
    ) -> Result<Self> {
        let config = aws_config::load_from_env().await;
        let sqs = SqsClient::new(&config);

        let (tx, rx) = mpsc::channel(100);

        let mut transport = Self {
            sqs: sqs.clone(),
            request_queue_url,
            response_queue_url: response_queue_url.clone(),
            message_rx: Arc::new(Mutex::new(rx)),
            message_tx: tx,
            poller_handle: None,
        };

        // Start background poller for incoming messages
        transport.start_poller().await?;

        Ok(transport)
    }

    async fn start_poller(&mut self) -> Result<()> {
        let sqs = self.sqs.clone();
        let queue_url = self.response_queue_url.clone();
        let tx = self.message_tx.clone();

        let handle = tokio::spawn(async move {
            loop {
                // Long-poll SQS for messages (20 seconds)
                match sqs
                    .receive_message()
                    .queue_url(&queue_url)
                    .max_number_of_messages(10)
                    .wait_time_seconds(20)
                    .send()
                    .await
                {
                    Ok(output) => {
                        if let Some(messages) = output.messages {
                            for msg in messages {
                                if let Some(body) = msg.body {
                                    // Parse JSON-RPC message
                                    if let Ok(transport_msg) =
                                        serde_json::from_str::<TransportMessage>(&body)
                                    {
                                        let _ = tx.send(transport_msg).await;
                                    }

                                    // Delete message from queue
                                    if let Some(receipt) = msg.receipt_handle {
                                        let _ = sqs
                                            .delete_message()
                                            .queue_url(&queue_url)
                                            .receipt_handle(receipt)
                                            .send()
                                            .await;
                                    }
                                }
                            }
                        }
                    }
                    Err(e) => {
                        tracing::error!("SQS polling error: {}", e);
                        tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
                    }
                }
            }
        });

        self.poller_handle = Some(handle);
        Ok(())
    }
}

#[async_trait]
impl Transport for SqsTransport {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        // Serialize message to JSON
        let json = serde_json::to_string(&message)
            .map_err(|e| pmcp::error::Error::Transport(
                pmcp::error::TransportError::InvalidMessage(e.to_string())
            ))?;

        // Send to SQS request queue
        self.sqs
            .send_message()
            .queue_url(&self.request_queue_url)
            .message_body(json)
            .send()
            .await
            .map_err(|e| pmcp::error::Error::Transport(
                pmcp::error::TransportError::InvalidMessage(e.to_string())
            ))?;

        Ok(())
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        let mut rx = self.message_rx.lock().await;
        rx.recv()
            .await
            .ok_or_else(|| pmcp::error::Error::Transport(
                pmcp::error::TransportError::ConnectionClosed
            ))
    }

    async fn close(&mut self) -> Result<()> {
        if let Some(handle) = self.poller_handle.take() {
            handle.abort();
        }
        Ok(())
    }

    fn transport_type(&self) -> &'static str {
        "sqs-async"
    }
}
}

Architecture Benefits:

  • Decoupled: Client/server don’t need to be online simultaneously
  • Scalable: Multiple servers can consume from same queue
  • Reliable: Message persistence and retry mechanisms
  • Async workflows: Long-running operations without blocking

Trade-offs:

  • Latency: Higher than direct connections (100ms+)
  • Cost: Message queue service fees
  • Complexity: Requires queue infrastructure setup

Example 3: WebSocket Transport (Built-in)

The SDK includes a production-ready WebSocket transport. Here’s how it works internally:

#![allow(unused)]
fn main() {
// From src/shared/websocket.rs (simplified)
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use futures::{SinkExt, StreamExt};

pub struct WebSocketTransport {
    config: WebSocketConfig,
    state: Arc<RwLock<ConnectionState>>,
    message_tx: mpsc::Sender<TransportMessage>,
    message_rx: Arc<AsyncMutex<mpsc::Receiver<TransportMessage>>>,
}

#[async_trait]
impl Transport for WebSocketTransport {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        // Serialize to JSON
        let json = serde_json::to_vec(&message)?;

        // Send as WebSocket text frame
        let ws_msg = Message::Text(String::from_utf8(json)?);
        self.sink.send(ws_msg).await?;

        Ok(())
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        // Wait for incoming WebSocket frame
        let mut rx = self.message_rx.lock().await;
        rx.recv().await.ok_or(TransportError::ConnectionClosed)
    }

    // ... reconnection logic, ping/pong handling, etc.
}
}

Key WebSocket Features:

  • Bidirectional: True full-duplex communication
  • Low latency: Direct TCP connection
  • Server push: Notifications without polling
  • Auto-reconnect: Built-in resilience

Example 4: Kafka Transport with Topic Design

Use case: Event-driven, scalable, multi-client/server architectures

Kafka provides a robust foundation for MCP when you need decoupled, event-driven communication. The key challenge is topic design for request-response patterns in a pub/sub system.

Topic Architecture

┌─────────────────────────────────────────────────────────────┐
│                     Kafka Cluster                           │
│                                                             │
│  mcp.requests.*                  mcp.responses.*           │
│  ├─ mcp.requests.global          ├─ mcp.responses.client-A │
│  ├─ mcp.requests.tool-analysis   ├─ mcp.responses.client-B │
│  └─ mcp.requests.data-proc       └─ mcp.responses.client-C │
│                                                             │
│  mcp.server.discovery                                       │
│  └─ Heartbeats from servers with capabilities              │
└─────────────────────────────────────────────────────────────┘

     ▲                │                    ▲                │
     │ Produce        │ Consume            │ Consume        │ Produce
     │ requests       │ requests           │ responses      │ responses
     │                ▼                    │                ▼

┌─────────┐                          ┌──────────────┐
│ Client  │                          │ MCP Server   │
│ (Agent) │                          │ Pool         │
└─────────┘                          └──────────────┘

Key Design Patterns

1. Request Routing:

  • Global topic: mcp.requests.global - Any server can handle
  • Capability-based: mcp.requests.tool-analysis - Only servers with specific tools
  • Dedicated: mcp.requests.server-id-123 - Target specific server instance

2. Response Routing:

  • Client-specific topics: Each client subscribes to mcp.responses.{client-id}
  • Correlation IDs: Message headers contain request-id + client-id
  • TTL: Messages expire after configurable timeout

3. Server Discovery:

  • Servers publish heartbeats to mcp.server.discovery with:
    • Server ID
    • Capabilities (tools, resources, prompts)
    • Load metrics
    • Status (online/busy/draining)

Implementation

#![allow(unused)]
fn main() {
use pmcp::shared::{Transport, TransportMessage};
use pmcp::error::Result;
use async_trait::async_trait;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::message::Message;
use std::time::Duration;
use tokio::sync::mpsc;

/// Kafka-based MCP transport
#[derive(Debug)]
pub struct KafkaTransport {
    /// Kafka producer for sending
    producer: FutureProducer,
    /// Kafka consumer for receiving
    consumer: StreamConsumer,
    /// Client/Server ID for routing
    instance_id: String,
    /// Request topic name
    request_topic: String,
    /// Response topic name (client-specific)
    response_topic: String,
    /// Local message buffer
    message_rx: Arc<Mutex<mpsc::Receiver<TransportMessage>>>,
    message_tx: mpsc::Sender<TransportMessage>,
}

impl KafkaTransport {
    pub async fn new_client(
        brokers: &str,
        client_id: String,
    ) -> Result<Self> {
        use rdkafka::config::ClientConfig;

        // Create producer
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "5000")
            .create()?;

        // Create consumer
        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", &client_id)
            .set("bootstrap.servers", brokers)
            .set("enable.auto.commit", "true")
            .create()?;

        // Subscribe to client-specific response topic
        let response_topic = format!("mcp.responses.{}", client_id);
        consumer.subscribe(&[&response_topic])?;

        let (tx, rx) = mpsc::channel(100);

        let mut transport = Self {
            producer,
            consumer,
            instance_id: client_id,
            request_topic: "mcp.requests.global".to_string(),
            response_topic,
            message_rx: Arc::new(Mutex::new(rx)),
            message_tx: tx,
        };

        // Start background consumer
        transport.start_consumer().await?;

        Ok(transport)
    }

    async fn start_consumer(&mut self) -> Result<()> {
        let consumer = self.consumer.clone();
        let tx = self.message_tx.clone();

        tokio::spawn(async move {
            loop {
                match consumer.recv().await {
                    Ok(msg) => {
                        if let Some(payload) = msg.payload() {
                            // Parse JSON-RPC message
                            if let Ok(transport_msg) =
                                serde_json::from_slice::<TransportMessage>(payload)
                            {
                                let _ = tx.send(transport_msg).await;
                            }
                        }
                    }
                    Err(e) => {
                        tracing::error!("Kafka consumer error: {}", e);
                        tokio::time::sleep(Duration::from_secs(5)).await;
                    }
                }
            }
        });

        Ok(())
    }
}

#[async_trait]
impl Transport for KafkaTransport {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        // Serialize message
        let json = serde_json::to_vec(&message)?;

        // Extract request ID for correlation
        let correlation_id = match &message {
            TransportMessage::Request { id, .. } => format!("{:?}", id),
            _ => uuid::Uuid::new_v4().to_string(),
        };

        // Build Kafka record with headers
        let record = FutureRecord::to(&self.request_topic)
            .payload(&json)
            .key(&correlation_id)
            .headers(rdkafka::message::OwnedHeaders::new()
                .insert(rdkafka::message::Header {
                    key: "client-id",
                    value: Some(self.instance_id.as_bytes()),
                })
                .insert(rdkafka::message::Header {
                    key: "response-topic",
                    value: Some(self.response_topic.as_bytes()),
                }));

        // Send to Kafka
        self.producer
            .send(record, Duration::from_secs(5))
            .await
            .map_err(|(e, _)| pmcp::error::Error::Transport(
                pmcp::error::TransportError::InvalidMessage(e.to_string())
            ))?;

        Ok(())
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        let mut rx = self.message_rx.lock().await;
        rx.recv()
            .await
            .ok_or_else(|| pmcp::error::Error::Transport(
                pmcp::error::TransportError::ConnectionClosed
            ))
    }

    async fn close(&mut self) -> Result<()> {
        // Graceful shutdown
        Ok(())
    }

    fn transport_type(&self) -> &'static str {
        "kafka"
    }
}
}

Server-Side Topic Patterns

#![allow(unused)]
fn main() {
impl KafkaTransport {
    /// Create server transport with capability-based subscription
    pub async fn new_server(
        brokers: &str,
        server_id: String,
        capabilities: Vec<String>,  // ["tool-analysis", "data-proc"]
    ) -> Result<Self> {
        use rdkafka::config::ClientConfig;

        // Create producer for server discovery
        let producer: FutureProducer = ClientConfig::new()
            .set("bootstrap.servers", brokers)
            .set("message.timeout.ms", "5000")
            .create()?;

        // Create consumer
        let consumer: StreamConsumer = ClientConfig::new()
            .set("group.id", &format!("mcp-server-{}", server_id))
            .set("bootstrap.servers", brokers)
            .create()?;

        // Subscribe to relevant request topics
        let mut topics = vec!["mcp.requests.global".to_string()];
        for cap in &capabilities {
            topics.push(format!("mcp.requests.{}", cap));
        }

        consumer.subscribe(&topics.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;

        // Publish server discovery
        Self::publish_discovery(
            &producer,
            &server_id,
            &capabilities,
        ).await?;

        // ... rest of initialization (similar to new_client)
        let (tx, rx) = mpsc::channel(100);

        let mut transport = Self {
            producer,
            consumer,
            instance_id: server_id,
            request_topic: "mcp.requests.global".to_string(),
            response_topic: String::new(), // Server doesn't have response topic
            message_rx: Arc::new(Mutex::new(rx)),
            message_tx: tx,
        };

        transport.start_consumer().await?;
        Ok(transport)
    }

    async fn publish_discovery(
        producer: &FutureProducer,
        server_id: &str,
        capabilities: &[String],
    ) -> Result<()> {
        let discovery_msg = serde_json::json!({
            "server_id": server_id,
            "capabilities": capabilities,
            "status": "online",
            "timestamp": SystemTime::now(),
        });

        let record = FutureRecord::to("mcp.server.discovery")
            .payload(&serde_json::to_vec(&discovery_msg)?);

        producer.send(record, Duration::from_secs(1)).await?;
        Ok(())
    }

    /// Send response back to specific client
    async fn send_response(
        &self,
        response: TransportMessage,
        client_id: &str,
        correlation_id: &str,
    ) -> Result<()> {
        let json = serde_json::to_vec(&response)?;

        let response_topic = format!("mcp.responses.{}", client_id);

        let record = FutureRecord::to(&response_topic)
            .payload(&json)
            .key(correlation_id);

        self.producer
            .send(record, Duration::from_secs(5))
            .await?;

        Ok(())
    }
}
}

Kafka Benefits for MCP

✅ Decoupled Communication:

  • Clients/servers operate independently
  • No direct connections required
  • Timeouts managed at application layer

✅ Event-Driven Architecture:

  • React to MCP requests as events
  • Multiple consumers can process same request stream
  • Event sourcing: Full message history

✅ Scalability:

  • Horizontal scaling: Add more consumer groups
  • Partitioning: Distribute load across brokers
  • Retention: Replay historical requests

✅ Multi-Tenant Support:

  • Topic isolation per client/tenant
  • ACLs for security boundaries
  • Quota management per client

❌ Trade-offs:

  • Higher latency (50-200ms vs 1-5ms for WebSocket)
  • Complex topic design required
  • Kafka infrastructure overhead
  • Request-response correlation complexity

Note on GraphQL: GraphQL APIs belong at the MCP server layer (as tools exposing queries/mutations), not as a custom transport. Use standard transports (HTTP, WebSocket) and build MCP servers that wrap GraphQL backends.

Best Practices

1. Preserve JSON-RPC Message Format

Your transport must not modify MCP messages:

#![allow(unused)]
fn main() {
// ✅ Correct: Transport as dumb pipe
async fn send(&mut self, message: TransportMessage) -> Result<()> {
    let bytes = serde_json::to_vec(&message)?;
    self.underlying_channel.write(&bytes).await?;
    Ok(())
}

// ❌ Wrong: Modifying message structure
async fn send(&mut self, message: TransportMessage) -> Result<()> {
    // Don't do this!
    let mut custom_msg = CustomMessage::from(message);
    custom_msg.add_custom_field("timestamp", SystemTime::now());
    self.underlying_channel.write(&custom_msg).await?;
    Ok(())
}
}

2. Handle Framing Correctly

Ensure message boundaries are preserved:

#![allow(unused)]
fn main() {
// ✅ Correct: Length-prefixed framing
async fn send(&mut self, message: TransportMessage) -> Result<()> {
    let json = serde_json::to_vec(&message)?;

    // Send length prefix (4 bytes, big-endian)
    let len = (json.len() as u32).to_be_bytes();
    self.writer.write_all(&len).await?;

    // Send message
    self.writer.write_all(&json).await?;
    Ok(())
}

async fn receive(&mut self) -> Result<TransportMessage> {
    // Read length prefix
    let mut len_buf = [0u8; 4];
    self.reader.read_exact(&mut len_buf).await?;
    let len = u32::from_be_bytes(len_buf) as usize;

    // Read exact message
    let mut buf = vec![0u8; len];
    self.reader.read_exact(&mut buf).await?;

    serde_json::from_slice(&buf).map_err(Into::into)
}
}

3. Implement Proper Error Handling

Map transport-specific errors to TransportError:

#![allow(unused)]
fn main() {
use pmcp::error::{Error, TransportError};

async fn send(&mut self, message: TransportMessage) -> Result<()> {
    match self.underlying_send(&message).await {
        Ok(()) => Ok(()),
        Err(e) if e.is_connection_error() => {
            Err(Error::Transport(TransportError::ConnectionClosed))
        }
        Err(e) if e.is_timeout() => {
            Err(Error::Timeout(5000))  // 5 seconds
        }
        Err(e) => {
            Err(Error::Transport(TransportError::InvalidMessage(
                format!("Send failed: {}", e)
            )))
        }
    }
}
}

4. Support Concurrent send/receive

Transports must handle concurrent operations:

#![allow(unused)]
fn main() {
// ✅ Correct: Separate channels for send/receive
pub struct MyTransport {
    send_tx: mpsc::Sender<TransportMessage>,
    recv_rx: Arc<Mutex<mpsc::Receiver<TransportMessage>>>,
}

// ❌ Wrong: Single shared state without synchronization
pub struct BadTransport {
    connection: TcpStream,  // Can't safely share between send/receive
}
}

5. Security Considerations

Even for internal transports:

#![allow(unused)]
fn main() {
// ✅ Bind to localhost only
let listener = TcpListener::bind("127.0.0.1:8080").await?;

// ❌ Binding to all interfaces exposes to network
let listener = TcpListener::bind("0.0.0.0:8080").await?;

// ✅ Validate message sizes to prevent DoS
const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;  // 10 MB

async fn receive(&mut self) -> Result<TransportMessage> {
    let len = self.read_length().await?;

    if len > MAX_MESSAGE_SIZE {
        return Err(Error::Transport(TransportError::InvalidMessage(
            format!("Message too large: {} bytes", len)
        )));
    }

    // ... continue reading
}
}

6. Message Encryption (End-to-End Security)

For high-security environments (defense, healthcare, finance), encrypt messages at the transport layer:

⚠️ Important Caveat: This example wraps messages in a custom Notification envelope. Both client and server MUST use EncryptedTransport - this is not interoperable with standard MCP clients/servers. For transparent encryption, use TLS/mTLS at the transport layer instead (e.g., wss:// for WebSocket, HTTPS for HTTP).

This pattern is appropriate when you control both ends and need application-layer encryption with custom key management.

#![allow(unused)]
fn main() {
use aes_gcm::{
    aead::{Aead, KeyInit},
    Aes256Gcm, Nonce,
};
use rand::RngCore;

/// Encrypted transport wrapper
pub struct EncryptedTransport<T: Transport> {
    inner: T,
    cipher: Aes256Gcm,
}

impl<T: Transport> EncryptedTransport<T> {
    /// Create encrypted transport with 256-bit AES-GCM
    pub fn new(inner: T, key: &[u8; 32]) -> Result<Self> {
        let cipher = Aes256Gcm::new(key.into());
        Ok(Self { inner, cipher })
    }

    /// Encrypt message payload
    fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>> {
        // Generate random nonce (12 bytes for AES-GCM)
        let mut nonce_bytes = [0u8; 12];
        rand::rng().fill_bytes(&mut nonce_bytes);
        let nonce = Nonce::from_slice(&nonce_bytes);

        // Encrypt
        let ciphertext = self
            .cipher
            .encrypt(nonce, plaintext)
            .map_err(|e| Error::Transport(TransportError::InvalidMessage(
                format!("Encryption failed: {}", e)
            )))?;

        // Prepend nonce to ciphertext for decryption
        let mut result = nonce_bytes.to_vec();
        result.extend_from_slice(&ciphertext);

        Ok(result)
    }

    /// Decrypt message payload
    fn decrypt(&self, encrypted: &[u8]) -> Result<Vec<u8>> {
        if encrypted.len() < 12 {
            return Err(Error::Transport(TransportError::InvalidMessage(
                "Message too short for nonce".to_string()
            )));
        }

        // Extract nonce and ciphertext
        let (nonce_bytes, ciphertext) = encrypted.split_at(12);
        let nonce = Nonce::from_slice(nonce_bytes);

        // Decrypt
        let plaintext = self
            .cipher
            .decrypt(nonce, ciphertext)
            .map_err(|e| Error::Transport(TransportError::InvalidMessage(
                format!("Decryption failed: {}", e)
            )))?;

        Ok(plaintext)
    }
}

#[async_trait]
impl<T: Transport> Transport for EncryptedTransport<T> {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        // Serialize message
        let plaintext = serde_json::to_vec(&message)?;

        // Encrypt
        let encrypted = self.encrypt(&plaintext)?;

        // Wrap in envelope
        let envelope = TransportMessage::Notification(Notification::Custom {
            method: "encrypted".to_string(),
            params: serde_json::json!({
                "data": base64::encode(&encrypted),
            }),
        });

        // Send via underlying transport
        self.inner.send(envelope).await
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        // Receive encrypted envelope
        let envelope = self.inner.receive().await?;

        // Extract encrypted data
        let encrypted_b64 = match envelope {
            TransportMessage::Notification(Notification::Custom { params, .. }) => {
                params["data"].as_str().ok_or_else(|| {
                    Error::Transport(TransportError::InvalidMessage(
                        "Missing encrypted data".to_string()
                    ))
                })?
            }
            _ => return Err(Error::Transport(TransportError::InvalidMessage(
                "Expected encrypted envelope".to_string()
            ))),
        };

        // Decode and decrypt
        let encrypted = base64::decode(encrypted_b64)
            .map_err(|e| Error::Transport(TransportError::InvalidMessage(e.to_string())))?;

        let plaintext = self.decrypt(&encrypted)?;

        // Deserialize message
        serde_json::from_slice(&plaintext).map_err(Into::into)
    }

    async fn close(&mut self) -> Result<()> {
        self.inner.close().await
    }

    fn is_connected(&self) -> bool {
        self.inner.is_connected()
    }

    fn transport_type(&self) -> &'static str {
        "encrypted"
    }
}

// Usage
let base_transport = HttpTransport::new(/* ... */);
let encryption_key: [u8; 32] = derive_key_from_password("secure-password");
let encrypted_transport = EncryptedTransport::new(base_transport, &encryption_key)?;

let client = Client::new(encrypted_transport);
}

Security Benefits:

  • End-to-end encryption: Only sender/receiver can decrypt
  • Authenticated encryption: AES-GCM provides integrity checks
  • Nonce-based: Each message has unique nonce (prevents replay)
  • Tamper-evident: Modified ciphertext fails decryption

Use Cases:

  • Defense contractor systems (provably encrypted)
  • Healthcare data (HIPAA compliance)
  • Financial transactions (PCI-DSS requirements)
  • Cross-border data transfers (GDPR encryption at rest/in transit)

7. Performance Optimization

For high-throughput scenarios:

#![allow(unused)]
fn main() {
// Message batching
pub struct BatchingTransport {
    pending: Vec<TransportMessage>,
    flush_interval: Duration,
}

impl BatchingTransport {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        self.pending.push(message);

        if self.pending.len() >= 100 {  // Batch size threshold
            self.flush().await?;
        }

        Ok(())
    }

    async fn flush(&mut self) -> Result<()> {
        if self.pending.is_empty() {
            return Ok(());
        }

        // Send all pending messages in one operation
        let batch = std::mem::take(&mut self.pending);
        self.underlying_send_batch(batch).await?;
        Ok(())
    }
}

// Compression for large messages
async fn send_with_compression(
    &mut self,
    message: TransportMessage,
) -> Result<()> {
    let json = serde_json::to_vec(&message)?;

    if json.len() > 1024 {  // Compress large messages
        let compressed = compress(&json)?;
        self.send_compressed(compressed).await?;
    } else {
        self.send_raw(json).await?;
    }

    Ok(())
}
}

Testing Your Transport

Unit Tests

#![allow(unused)]
fn main() {
#[cfg(test)]
mod tests {
    use super::*;
    use pmcp::types::*;

    #[tokio::test]
    async fn test_send_receive() {
        let mut transport = MyTransport::new().await.unwrap();

        // Create test message
        let request = TransportMessage::Request {
            id: RequestId::from(1),
            request: Request::Client(Box::new(ClientRequest::Ping)),
        };

        // Send and receive
        transport.send(request.clone()).await.unwrap();
        let received = transport.receive().await.unwrap();

        // Verify round-trip
        assert!(matches!(received, TransportMessage::Request { .. }));
    }

    #[tokio::test]
    async fn test_error_handling() {
        let mut transport = MyTransport::new().await.unwrap();

        // Close transport
        transport.close().await.unwrap();

        // Verify sends fail after close
        let result = transport.send(/* ... */).await;
        assert!(result.is_err());
    }
}
}

Integration Tests with mcp-tester

Use mcp-tester to validate transport behavior:

# Test custom transport server
cargo build --release --example my_custom_transport_server

# Run integration tests
mcp-tester test <CUSTOM_TRANSPORT_URL> \
  --with-tools \
  --with-resources \
  --format json > results.json

Property-Based Testing

#![allow(unused)]
fn main() {
use proptest::prelude::*;

proptest! {
    #[test]
    fn test_transport_roundtrip(
        id in any::<i64>(),
        method in "[a-z]{1,20}",
    ) {
        tokio_test::block_on(async {
            let mut transport = MyTransport::new().await.unwrap();

            let message = TransportMessage::Request {
                id: RequestId::from(id),
                request: Request::Client(Box::new(
                    ClientRequest::ListTools(ListToolsParams { cursor: None })
                )),
            };

            transport.send(message.clone()).await.unwrap();
            let received = transport.receive().await.unwrap();

            // Verify message integrity
            prop_assert!(matches!(received, TransportMessage::Request { .. }));
        });
    }
}
}

Business Use Cases for Custom Transports

Custom transports require significant development effort (both client and server implementations). Here are 8 business scenarios where the investment pays off:

Business Problem: A risk modeling platform runs MCP tools that perform complex calculations taking 5-30 minutes (credit risk analysis, compliance checks, multi-agent simulations).

Why Async Transport:

┌──────────┐     Request      ┌──────────┐     Job Queue    ┌──────────┐
│  Client  │─────────────────►│   SQS    │─────────────────►│  Lambda  │
│  (Agent) │                  │  Queue   │                  │  Worker  │
└──────────┘                  └──────────┘                  └──────────┘
     │                              ▲                             │
     │ Job ID                       │ Completed                   │ Process
     │ returned                     │ notification                │ 5-30 min
     │ immediately                  └─────────────────────────────┘
     │
     │ Poll/Subscribe
     │ for results
     ▼
┌──────────┐
│ Results  │
│ Queue    │
└──────────┘

Benefits:

  • No timeouts: Client doesn’t wait; gets job ID immediately
  • Job orchestration: Multiple workers process queue in parallel
  • UX improvement: Frontend shows “Processing…” with progress updates
  • Cost optimization: Workers scale based on queue depth

Transport Choice: SQS (AWS), Azure Service Bus, Google Cloud Tasks


2. Regulated/Air-Gapped Environments (Healthcare, Government)

Business Problem: Healthcare organization needs MCP tools to access sensitive patient data inside secure network, but analysts/AI agents run outside that zone (HIPAA compliance).

Why Async Transport:

Public Zone          Security Boundary      Secure Zone
┌──────────┐             │            ┌──────────┐
│ AI Agent │             │            │  MCP     │
│ (Client) │             │            │  Server  │
└──────────┘             │            │  (PHI    │
     │                   │            │  Access) │
     │ Encrypted         │            └──────────┘
     │ request           │                  ▲
     ▼                   │                  │
┌──────────┐             │            ┌──────────┐
│  Kafka   │◄────────────┼───────────►│  Kafka   │
│  Public  │   Firewall  │  Consumer  │  Secure  │
│  Broker  │   Rules     │  Group     │  Broker  │
└──────────┘             │            └──────────┘

Benefits:

  • Security isolation: No direct socket connections across zones
  • Encryption at rest: Kafka stores encrypted messages
  • Audit trail: All requests/responses logged for compliance
  • Policy enforcement: Messages routed based on classification level

Transport Choice: Kafka (with encryption), AWS SQS (cross-account), Azure Event Hubs


3. Burst Load & Elastic Scaling (SaaS)

Business Problem: SaaS provider offers MCP servers for text-to-structured-data conversion. During peak hours (9-11am), thousands of clients issue requests simultaneously.

Why Async Transport:

                    ┌──────────┐
Peak: 10k req/min───►│  Kafka   │
                    │  Topic   │
                    │  (Buffer)│
                    └──────────┘
                          │
              ┌───────────┼───────────┐
              ▼           ▼           ▼
        ┌─────────┐ ┌─────────┐ ┌─────────┐
        │  MCP    │ │  MCP    │ │  MCP    │
        │ Server  │ │ Server  │ │ Server  │
        │ Pod 1   │ │ Pod 2   │ │ Pod N   │
        └─────────┘ └─────────┘ └─────────┘

        Auto-scales based on queue depth (lag)

Benefits:

  • Elastic scaling: Workers scale up/down based on queue lag
  • Cost optimization: Turn off servers when idle (queue empty)
  • Smooth bursts: Queue absorbs spikes; no client rate limit errors
  • Priority queues: High-priority clients use separate topic

Transport Choice: Kafka (for throughput), RabbitMQ (for priority queues), Redis Streams


4. Multi-Tenant, Multi-Region (Enterprise)

Business Problem: Global enterprise has data centers in EU, US, Asia. MCP clients and servers may reside in different regions. Data sovereignty requires EU data stays in EU.

Why Async Transport:

EU Region                  US Region                 Asia Region
┌──────────┐             ┌──────────┐            ┌──────────┐
│  Client  │             │  Client  │            │  Client  │
│ (Germany)│             │  (Ohio)  │            │ (Tokyo)  │
└──────────┘             └──────────┘            └──────────┘
     │                         │                       │
     ▼                         ▼                       ▼
┌──────────┐             ┌──────────┐            ┌──────────┐
│  Kafka   │             │  Kafka   │            │  Kafka   │
│  EU      │             │  US      │            │  Asia    │
└──────────┘             └──────────┘            └──────────┘
     │                         │                       │
     ▼                         ▼                       ▼
┌──────────┐             ┌──────────┐            ┌──────────┐
│  MCP     │             │  MCP     │            │  MCP     │
│  Servers │             │  Servers │            │  Servers │
│  (EU)    │             │  (US)    │            │  (Asia)  │
└──────────┘             └──────────┘            └──────────┘

Benefits:

  • Latency optimization: Process requests close to data source
  • Regulatory compliance: EU data never leaves EU (GDPR)
  • Failover: Route EU requests to US if EU region down
  • Topic-based routing: Geo-tagged messages route automatically

Transport Choice: Kafka (multi-region replication), AWS SQS (cross-region)


5. Cross-System Integration (AI + Legacy)

Business Problem: MCP client acts as AI orchestrator needing to trigger workflows on legacy ERP/CRM systems that cannot maintain live socket connections (batch-oriented, mainframe integration).

Why Async Transport:

┌──────────┐     Modern     ┌──────────┐    Bridge    ┌──────────┐
│  AI      │     MCP        │  Message │    Adapter   │  Legacy  │
│  Agent   │───────────────►│  Queue   │─────────────►│  ERP     │
│ (Claude) │   JSON-RPC     │  (SQS)   │   XML/SOAP   │ (SAP)    │
└──────────┘                └──────────┘              └──────────┘
     ▲                            │                         │
     │                            │                         │
     │ Result                     │ Poll every              │ Batch
     │ notification               │ 30 seconds              │ process
     └────────────────────────────┴─────────────────────────┘

Benefits:

  • Bridge async systems: AI doesn’t need to know if ERP is online
  • Decouple failure domains: AI continues working if ERP down
  • Extend MCP reach: Wrap legacy services with MCP-compatible async interface
  • Protocol translation: Queue adapter converts JSON-RPC ↔ XML/SOAP

Transport Choice: SQS (simple integration), Apache Camel + Kafka (complex routing)


6. High-Security Messaging (Defense, Blockchain)

Business Problem: Defense contractor uses MCP tools to perform sensitive computations that must be provably encrypted, tamper-evident, and non-repudiable (zero-trust architecture).

Why Custom Transport:

┌──────────┐   Encrypted    ┌──────────┐   Encrypted    ┌──────────┐
│ Client   │   MCP          │  Kafka   │   MCP          │  MCP     │
│ (Secret) │───────────────►│  (TLS +  │───────────────►│  Server  │
│  Agent   │   AES-256-GCM  │  ACLs)   │   AES-256-GCM  │ (Secure  │
└──────────┘                └──────────┘                │  Enclave)│
                                   │                    └──────────┘
                                   │
                                   ▼
                            ┌──────────┐
                            │ Immutable│
                            │ Audit    │
                            │ Log      │
                            └──────────┘

Benefits:

  • End-to-end encryption: Payloads encrypted by client, decrypted by server only
  • Non-repudiation: Kafka’s immutable log proves message history
  • Policy enforcement: Custom headers enforce classification levels (TOP SECRET, etc.)
  • Compliance: FIPS 140-2 validated encryption modules

Transport Choice: Encrypted Kafka transport (Example 6 above), AWS KMS + SQS


7. Event-Driven Workflows (Analytics)

Business Problem: Analytics platform uses MCP clients embedded in microservices that react to real-time business events (user signup, transaction alert, IoT sensor data).

Why Async Transport:

Event Sources              Kafka Streams           MCP Processors
┌──────────┐                                      ┌──────────┐
│ User     │─┐                                   ┌►│  MCP     │
│ Signups  │ │            ┌──────────┐          │ │  Tool:   │
└──────────┘ ├───────────►│  Kafka   │──────────┤ │  Enrich  │
             │            │  Topic:  │          │ │  Profile │
┌──────────┐ │            │  events  │          │ └──────────┘
│ Trans-   │─┤            └──────────┘          │
│ actions  │ │                  │               │ ┌──────────┐
└──────────┘ │                  │               └►│  MCP     │
             │                  │                 │  Tool:   │
┌──────────┐ │                  ▼                 │  Score   │
│ IoT      │─┘            ┌──────────┐            │  Risk    │
│ Sensors  │              │  MCP     │            └──────────┘
└──────────┘              │  Clients │
                          │(Consumers)│            ┌──────────┐
                          └──────────┘            ┌►│  MCP     │
                                                  │ │  Tool:   │
                           Process each event     │ │  Alert   │
                           through MCP tools      │ │  Webhook │
                                                  │ └──────────┘
                                                  │
                                                  │ ┌──────────┐
                                                  └►│  MCP     │
                                                    │  Tool:   │
                                                    │  ML      │
                                                    │  Predict │
                                                    └──────────┘

Benefits:

  • Reactive design: Kafka events trigger MCP workflows automatically
  • Backpressure control: MCP servers process events at their own pace
  • Composability: Multiple event-driven MCP clients coordinate on shared bus
  • Stream processing: Kafka Streams integrates with MCP tools for windowing, joins

Transport Choice: Kafka (for event streaming), AWS Kinesis, Apache Pulsar


8. Offline/Intermittent Connectivity (Edge Devices)

Business Problem: Ships, drones, field sensors act as MCP clients but can’t maintain stable network links. They collect data offline and sync when connectivity returns.

Why Async Transport:

Edge Device (Ship)         Satellite Link       Cloud
┌──────────┐                                    ┌──────────┐
│  MCP     │   Offline:                        │  Kafka   │
│  Client  │   Queue locally                   │  Broker  │
│          │        │                           │          │
│  Local   │        ▼                           └──────────┘
│  Queue   │◄──┐ ┌──────┐                            │
│  (SQLite)│   └─│ Net  │                            ▼
└──────────┘     │ Down │                      ┌──────────┐
     │           └──────┘                      │  MCP     │
     │                                         │  Servers │
     │  Online:                                │  (Cloud) │
     │  Sync queue                             └──────────┘
     ▼
┌──────────┐     Network Up     ┌──────────┐
│  Sync    │────────────────────►│  Kafka   │
│  Process │◄────────────────────│  Topic   │
└──────────┘      ACKs           └──────────┘

Benefits:

  • Offline queuing: Requests queued locally (SQLite, LevelDB)
  • Resilience: Automatic retry when connection restored
  • Simplified sync: Kafka acts as durable buffer for unreliable endpoints
  • Conflict resolution: Last-write-wins or custom merge strategies

Transport Choice: Local queue + Kafka sync, MQTT (IoT-optimized), AWS IoT Core


Summary: When to Build Custom Transports

Use CaseLatency ToleranceComplexityROI
Long-running opsHigh (minutes)Medium✅ High
Air-gapped securityMedium (seconds)High✅ High
Burst scalingMedium (100-500ms)Medium✅ High
Multi-regionMedium (100-500ms)High✅ Medium
Legacy integrationHigh (minutes)Medium✅ High
High-securityLow (milliseconds)High✅ Medium
Event-drivenMedium (100-500ms)Medium✅ High
Offline/edgeHigh (minutes-hours)Medium✅ High

Decision Criteria:

  • ✅ Build custom transport if: Regulatory requirements, legacy constraints, or operational patterns prevent standard transports
  • ❌ Avoid if: Standard HTTP/WebSocket/stdio meets needs (99% of cases)

The Right Way to Use Custom Transports: Instead of requiring all clients to implement custom transports, use a gateway that translates between standard and custom transports.

Architecture

Standard MCP Clients          Gateway              Custom Backend
┌──────────┐                                       ┌──────────┐
│ Claude   │    WebSocket     ┌──────────┐  Kafka │  MCP     │
│ Desktop  │◄────────────────►│          │◄──────►│  Server  │
└──────────┘                  │          │        │  Pool    │
                              │  MCP     │        └──────────┘
┌──────────┐                  │ Gateway  │        ┌──────────┐
│ Custom   │     HTTP         │          │  SQS   │  Lambda  │
│ Client   │◄────────────────►│  Bridge  │◄──────►│  Workers │
└──────────┘                  │          │        └──────────┘
                              │  Policy  │        ┌──────────┐
┌──────────┐                  │  Layer   │  HTTP  │  Legacy  │
│  IDE     │    stdio/HTTP    │          │◄──────►│  Backend │
│  Plugin  │◄────────────────►│          │        └──────────┘
└──────────┘                  └──────────┘

Benefits

✅ Client Compatibility:

  • Standard MCP clients work unchanged (Claude Desktop, IDEs)
  • No client-side custom transport implementation needed
  • One gateway serves all clients

✅ Control Point:

  • Authorization: Check permissions before routing
  • DLP: Redact PII, filter sensitive data
  • Rate Limiting: Per-client quotas
  • Schema Validation: Reject malformed requests
  • Audit: Centralized logging of all MCP traffic

✅ Backend Flexibility:

  • Route to different backends based on capability
  • Load balancing across server pool
  • Circuit breakers for failing backends
  • Automatic retries with backoff
  • Protocol translation (JSON-RPC ↔ XML/SOAP)

✅ Observability:

  • Centralized metrics (latency, throughput, errors)
  • Distributed tracing across transports
  • Real-time monitoring dashboards

Implementation

Minimal Bridge (Bidirectional Forwarder):

#![allow(unused)]
fn main() {
use pmcp::shared::{Transport, TransportMessage};
use pmcp::error::Result;
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::Mutex;

/// Bridge that forwards messages between two transports
pub async fn bridge_transports(
    mut client_side: Box<dyn Transport + Send>,
    mut backend_side: Box<dyn Transport + Send>,
) -> Result<()> {
    // Client -> Backend
    let c2b = tokio::spawn(async move {
        loop {
            match client_side.receive().await {
                Ok(msg) => {
                    if let Err(e) = backend_side.send(msg).await {
                        tracing::error!("Failed to forward to backend: {}", e);
                        break;
                    }
                }
                Err(e) => {
                    tracing::info!("Client disconnected: {}", e);
                    break;
                }
            }
        }
        Result::<()>::Ok(())
    });

    // Backend -> Client
    let b2c = tokio::spawn(async move {
        loop {
            match backend_side.receive().await {
                Ok(msg) => {
                    if let Err(e) = client_side.send(msg).await {
                        tracing::error!("Failed to forward to client: {}", e);
                        break;
                    }
                }
                Err(e) => {
                    tracing::info!("Backend disconnected: {}", e);
                    break;
                }
            }
        }
        Result::<()>::Ok(())
    });

    // If either side exits, close the other
    tokio::select! {
        r = c2b => { let _ = r?; }
        r = b2c => { let _ = r?; }
    }

    Ok(())
}

// Usage
let client_transport = WebSocketTransport::new(config)?;
let backend_transport = KafkaTransport::new_client(brokers, client_id).await?;

tokio::spawn(bridge_transports(
    Box::new(client_transport),
    Box::new(backend_transport),
));
}

Policy-Enforcing Gateway:

#![allow(unused)]
fn main() {
/// Transport wrapper that enforces policies
pub struct PolicyTransport<T: Transport> {
    inner: T,
    policy: Arc<PolicyEngine>,
}

#[async_trait]
impl<T: Transport + Send + Sync> Transport for PolicyTransport<T> {
    async fn send(&mut self, msg: TransportMessage) -> Result<()> {
        // Enforce outbound policies (rate limits, quotas)
        self.policy.check_send(&msg).await?;

        // Log for audit
        tracing::info!("Sending: {:?}", msg);

        self.inner.send(msg).await
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        let msg = self.inner.receive().await?;

        // Enforce inbound policies (schema validation, PII redaction)
        let sanitized = self.policy.sanitize(msg).await?;

        // Log for audit
        tracing::info!("Received: {:?}", sanitized);

        Ok(sanitized)
    }

    async fn close(&mut self) -> Result<()> {
        self.inner.close().await
    }

    fn is_connected(&self) -> bool {
        self.inner.is_connected()
    }

    fn transport_type(&self) -> &'static str {
        "policy"
    }
}

// Usage
let base = KafkaTransport::new_client(brokers, client_id).await?;
let policy_engine = Arc::new(PolicyEngine::new());
let policy_transport = PolicyTransport {
    inner: base,
    policy: policy_engine,
};
}

Full Gateway Service:

use pmcp::shared::{Transport, WebSocketTransport, WebSocketConfig};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> Result<()> {
    // Listen for WebSocket connections from clients
    let listener = TcpListener::bind("0.0.0.0:8080").await?;
    tracing::info!("Gateway listening on port 8080");

    loop {
        let (stream, addr) = listener.accept().await?;
        tracing::info!("New client connection from {}", addr);

        tokio::spawn(async move {
            // Create client-facing transport (WebSocket)
            let ws_config = WebSocketConfig { /* ... */ };
            let client_transport = WebSocketTransport::from_stream(stream, ws_config);

            // Create backend transport (Kafka/SQS based on routing logic)
            let backend_transport = select_backend_transport(addr).await?;

            // Wrap in policy layer
            let policy_transport = PolicyTransport {
                inner: backend_transport,
                policy: Arc::new(PolicyEngine::new()),
            };

            // Bridge the two transports
            bridge_transports(
                Box::new(client_transport),
                Box::new(policy_transport),
            ).await
        });
    }
}

async fn select_backend_transport(client_addr: SocketAddr) -> Result<Box<dyn Transport>> {
    // Route based on client, capability, tenant, etc.
    if client_addr.ip().is_loopback() {
        // Local clients -> direct HTTP
        Ok(Box::new(HttpTransport::new(/* ... */)))
    } else {
        // External clients -> Kafka
        let client_id = format!("gateway-{}", uuid::Uuid::new_v4());
        Ok(Box::new(KafkaTransport::new_client(KAFKA_BROKERS, client_id).await?))
    }
}

Routing Strategies

1. Capability-Based:

#![allow(unused)]
fn main() {
async fn route_by_capability(request: &TransportMessage) -> String {
    match request {
        TransportMessage::Request { request, .. } => {
            match request {
                Request::Client(ClientRequest::CallTool(params)) => {
                    // Route to Kafka topic based on tool name
                    if params.name.starts_with("ml_") {
                        "mcp.requests.ml-tools".to_string()
                    } else if params.name.starts_with("data_") {
                        "mcp.requests.data-tools".to_string()
                    } else {
                        "mcp.requests.global".to_string()
                    }
                }
                _ => "mcp.requests.global".to_string(),
            }
        }
        _ => "mcp.requests.global".to_string(),
    }
}
}

2. Load Balancing:

#![allow(unused)]
fn main() {
struct BackendPool {
    backends: Vec<Box<dyn Transport>>,
    current_index: AtomicUsize,
}

impl BackendPool {
    fn get_next(&self) -> &Box<dyn Transport> {
        let idx = self.current_index.fetch_add(1, Ordering::Relaxed);
        &self.backends[idx % self.backends.len()]
    }
}
}

3. Circuit Breaker:

#![allow(unused)]
fn main() {
struct CircuitBreakerTransport<T: Transport> {
    inner: T,
    state: Arc<Mutex<CircuitState>>,
}

enum CircuitState {
    Closed { failures: u32 },
    Open { until: Instant },
    HalfOpen,
}

impl<T: Transport> Transport for CircuitBreakerTransport<T> {
    async fn send(&mut self, msg: TransportMessage) -> Result<()> {
        let state = self.state.lock().await;
        match *state {
            CircuitState::Open { until } if Instant::now() < until => {
                return Err(Error::Transport(TransportError::ConnectionClosed));
            }
            _ => {}
        }
        drop(state);

        match self.inner.send(msg).await {
            Ok(()) => {
                // Success - reset failures
                let mut state = self.state.lock().await;
                *state = CircuitState::Closed { failures: 0 };
                Ok(())
            }
            Err(e) => {
                // Failure - increment counter
                let mut state = self.state.lock().await;
                if let CircuitState::Closed { failures } = *state {
                    if failures + 1 >= 5 {
                        // Trip circuit breaker
                        *state = CircuitState::Open {
                            until: Instant::now() + Duration::from_secs(30),
                        };
                    } else {
                        *state = CircuitState::Closed { failures: failures + 1 };
                    }
                }
                Err(e)
            }
        }
    }
    // ... rest of implementation
}
}

Testing with Gateway

Integration Testing:

# Start gateway
cargo run --bin mcp-gateway

# Test with mcp-tester against gateway (WebSocket)
mcp-tester test ws://localhost:8080 \
  --with-tools \
  --format json > results.json

# Gateway exercises custom backend transport (Kafka/SQS)
# under realistic conditions

Key Advantages:

  • Client compatibility: Standard clients work without modification
  • Flexibility: Change backend transport without client changes
  • Testability: mcp-tester validates end-to-end flow
  • Incremental migration: Gradually move backends to custom transports

Advanced Topics

Connection Pooling

For HTTP-like transports, implement pooling:

#![allow(unused)]
fn main() {
use deadpool::managed::{Manager, Pool, RecycleResult};

struct MyTransportManager;

#[async_trait]
impl Manager for MyTransportManager {
    type Type = MyTransport;
    type Error = Error;

    async fn create(&self) -> Result<MyTransport, Error> {
        MyTransport::connect().await
    }

    async fn recycle(&self, conn: &mut MyTransport) -> RecycleResult<Error> {
        if conn.is_connected() {
            Ok(())
        } else {
            Err(RecycleResult::StaticMessage("Connection lost"))
        }
    }
}

// Usage
let pool: Pool<MyTransportManager> = Pool::builder(MyTransportManager)
    .max_size(10)
    .build()
    .unwrap();

let transport = pool.get().await?;
}

Middleware Support

Wrap transports with cross-cutting concerns:

#![allow(unused)]
fn main() {
pub struct LoggingTransport<T: Transport> {
    inner: T,
}

#[async_trait]
impl<T: Transport> Transport for LoggingTransport<T> {
    async fn send(&mut self, message: TransportMessage) -> Result<()> {
        tracing::info!("Sending message: {:?}", message);
        let start = std::time::Instant::now();

        let result = self.inner.send(message).await;

        tracing::info!("Send completed in {:?}", start.elapsed());
        result
    }

    async fn receive(&mut self) -> Result<TransportMessage> {
        tracing::debug!("Waiting for message...");
        let message = self.inner.receive().await?;
        tracing::info!("Received message: {:?}", message);
        Ok(message)
    }

    async fn close(&mut self) -> Result<()> {
        self.inner.close().await
    }
}
}

Conclusion

Custom transports unlock MCP’s full potential for specialized environments:

Simple Use Cases:

  • ✅ Use in-memory transports for testing
  • ✅ Use built-in transports (HTTP, WebSocket, stdio) for standard cases

Advanced Use Cases:

  • Async messaging (SQS, Kafka) for decoupled architectures
  • Custom protocols when integrating with legacy systems
  • Performance optimizations for high-throughput scenarios

Remember:

  • GraphQL → Build MCP servers with GraphQL tools (not transport)
  • Custom transports require both client and server support
  • Stick to standard transports unless you have specific infrastructure needs
  • Test thoroughly with unit tests, integration tests, and mcp-tester

Additional Resources: