Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

HTTP Error Handling

HTTP handlers must gracefully handle network failures, timeouts, and API errors. This chapter covers retry strategies, circuit breakers, and graceful degradation.

Error Types

Network Errors

{
  "error": "Http: Connection refused"
}

HTTP Status Errors

HTTP handlers return status codes, not errors:

{
  "status": 404,
  "body": { "message": "Not Found" },
  "headers": {...}
}

Client handles status:

if (result.status >= 400) {
  throw new APIError(result.status, result.body);
}

Timeout Errors

{
  "error": "Timeout: Request exceeded 30000ms"
}

Retry Strategies

Exponential Backoff (Native Handler)

use backoff::{ExponentialBackoff, Error as BackoffError};
use std::time::Duration;

async fn handle_with_retry(&self, input: Input) -> Result<Output> {
    let backoff = ExponentialBackoff {
        initial_interval: Duration::from_millis(100),
        multiplier: 2.0,
        max_interval: Duration::from_secs(30),
        max_elapsed_time: Some(Duration::from_mins(5)),
        ..Default::default()
    };

    backoff::retry(backoff, || async {
        match self.client.get(&input.url).send().await {
            Ok(resp) if resp.status().is_success() => Ok(resp),
            Ok(resp) if resp.status().is_server_error() => {
                // Retry 5xx errors
                Err(BackoffError::transient(Error::Http(...)))
            },
            Ok(resp) => {
                // Don't retry 4xx errors
                Err(BackoffError::permanent(Error::Http(...)))
            },
            Err(e) if e.is_timeout() => {
                // Retry timeouts
                Err(BackoffError::transient(Error::from(e)))
            },
            Err(e) => Err(BackoffError::permanent(Error::from(e))),
        }
    }).await
}

Retry with Jitter

use rand::Rng;

async fn retry_with_jitter<F, Fut, T>(
    max_attempts: u32,
    base_delay_ms: u64,
    operation: F,
) -> Result<T>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T>>,
{
    let mut attempt = 0;
    let mut rng = rand::thread_rng();

    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if attempt >= max_attempts - 1 => return Err(e),
            Err(_) => {
                let jitter = rng.gen_range(0..base_delay_ms / 2);
                let delay = (base_delay_ms * 2_u64.pow(attempt)) + jitter;
                tokio::time::sleep(Duration::from_millis(delay)).await;
                attempt += 1;
            }
        }
    }
}

Circuit Breaker Pattern

Implementation

use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::{Instant, Duration};

#[derive(Clone)]
enum CircuitState {
    Closed,
    Open { opened_at: Instant },
    HalfOpen,
}

struct CircuitBreaker {
    state: Arc<RwLock<CircuitState>>,
    failure_threshold: u32,
    timeout: Duration,
    failures: Arc<RwLock<u32>>,
}

impl CircuitBreaker {
    async fn call<F, Fut, T>(&self, operation: F) -> Result<T>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T>>,
    {
        // Check state
        let state = self.state.read().await.clone();

        match state {
            CircuitState::Open { opened_at } => {
                if opened_at.elapsed() > self.timeout {
                    // Transition to HalfOpen
                    *self.state.write().await = CircuitState::HalfOpen;
                } else {
                    return Err(Error::CircuitOpen);
                }
            }
            CircuitState::HalfOpen | CircuitState::Closed => {}
        }

        // Execute operation
        match operation().await {
            Ok(result) => {
                // Success - close circuit
                *self.state.write().await = CircuitState::Closed;
                *self.failures.write().await = 0;
                Ok(result)
            }
            Err(e) => {
                // Failure - increment counter
                let mut failures = self.failures.write().await;
                *failures += 1;

                if *failures >= self.failure_threshold {
                    // Open circuit
                    *self.state.write().await = CircuitState::Open {
                        opened_at: Instant::now(),
                    };
                }

                Err(e)
            }
        }
    }
}

Usage

let breaker = CircuitBreaker::new(
    5,  // failure_threshold
    Duration::from_secs(60),  // timeout
);

let result = breaker.call(|| async {
    self.client.get(&url).send().await
}).await?;

Fallback Patterns

Primary/Secondary Endpoints

async fn handle_with_fallback(&self, input: Input) -> Result<Output> {
    // Try primary endpoint
    match self.client.get(&self.primary_url).send().await {
        Ok(resp) if resp.status().is_success() => {
            return Ok(resp.json().await?);
        }
        Err(e) => {
            tracing::warn!("Primary endpoint failed: {}", e);
        }
        _ => {}
    }

    // Fallback to secondary
    tracing::info!("Using fallback endpoint");
    let resp = self.client.get(&self.fallback_url).send().await?;
    Ok(resp.json().await?)
}

Cached Response Fallback

use lru::LruCache;
use std::sync::Arc;
use tokio::sync::Mutex;

struct CachedHandler {
    client: Client,
    cache: Arc<Mutex<LruCache<String, serde_json::Value>>>,
}

impl CachedHandler {
    async fn handle(&self, input: Input) -> Result<Output> {
        let cache_key = format!("{}-{}", input.resource, input.id);

        // Try API
        match self.client.get(&input.url).send().await {
            Ok(resp) if resp.status().is_success() => {
                let data: serde_json::Value = resp.json().await?;

                // Update cache
                self.cache.lock().await.put(cache_key.clone(), data.clone());

                Ok(Output { data })
            }
            _ => {
                // Fallback to cache
                if let Some(cached) = self.cache.lock().await.get(&cache_key) {
                    tracing::warn!("Using cached response");
                    return Ok(Output { data: cached.clone() });
                }

                Err(Error::Unavailable)
            }
        }
    }
}

Rate Limiting

Token Bucket Implementation

use std::time::Instant;

struct TokenBucket {
    tokens: f64,
    capacity: f64,
    rate: f64,  // tokens per second
    last_refill: Instant,
}

impl TokenBucket {
    async fn acquire(&mut self) -> Result<()> {
        let now = Instant::now();
        let elapsed = now.duration_since(self.last_refill).as_secs_f64();

        // Refill tokens
        self.tokens = (self.tokens + elapsed * self.rate).min(self.capacity);
        self.last_refill = now;

        if self.tokens >= 1.0 {
            self.tokens -= 1.0;
            Ok(())
        } else {
            let wait_time = ((1.0 - self.tokens) / self.rate) * 1000.0;
            tokio::time::sleep(Duration::from_millis(wait_time as u64)).await;
            self.tokens = 0.0;
            Ok(())
        }
    }
}

// Usage
async fn handle(&self, input: Input) -> Result<Output> {
    self.rate_limiter.lock().await.acquire().await?;
    let resp = self.client.get(&input.url).send().await?;
    Ok(resp.json().await?)
}

Timeout Management

Adaptive Timeouts

use std::collections::VecDeque;

struct AdaptiveTimeout {
    latencies: VecDeque<Duration>,
    window_size: usize,
}

impl AdaptiveTimeout {
    fn get_timeout(&self) -> Duration {
        if self.latencies.is_empty() {
            return Duration::from_secs(30);  // Default
        }

        let avg: Duration = self.latencies.iter().sum::<Duration>() / self.latencies.len() as u32;
        avg * 3  // 3x average latency
    }

    fn record(&mut self, latency: Duration) {
        self.latencies.push_back(latency);
        if self.latencies.len() > self.window_size {
            self.latencies.pop_front();
        }
    }
}

async fn handle(&self, input: Input) -> Result<Output> {
    let timeout_duration = self.adaptive_timeout.lock().await.get_timeout();
    let start = Instant::now();

    let result = tokio::time::timeout(
        timeout_duration,
        self.client.get(&input.url).send()
    ).await??;

    self.adaptive_timeout.lock().await.record(start.elapsed());
    Ok(result.json().await?)
}

Error Recovery Patterns

Pattern 1: Retry-Then-Circuit

async fn robust_call(&self, input: Input) -> Result<Output> {
    // Try with retries
    let result = retry_with_backoff(3, || async {
        self.client.get(&input.url).send().await
    }).await;

    // If retries exhausted, open circuit
    match result {
        Ok(resp) => Ok(resp.json().await?),
        Err(_) => {
            self.circuit_breaker.open();
            Err(Error::Unavailable)
        }
    }
}

Pattern 2: Parallel Requests

async fn parallel_fallback(&self, input: Input) -> Result<Output> {
    let primary = self.client.get(&self.primary_url).send();
    let secondary = self.client.get(&self.secondary_url).send();

    // Use first successful response
    tokio::select! {
        Ok(resp) = primary => Ok(resp.json().await?),
        Ok(resp) = secondary => {
            tracing::info!("Used secondary endpoint");
            Ok(resp.json().await?)
        },
        else => Err(Error::Unavailable),
    }
}

Testing Error Scenarios

Mock Network Failures

#[tokio::test]
async fn test_retry_on_failure() {
    let mock_server = MockServer::start().await;

    // Fail twice, succeed third time
    mock_server.register_as_sequence(vec![
        Mock::given(method("GET"))
            .respond_with(ResponseTemplate::new(500)),
        Mock::given(method("GET"))
            .respond_with(ResponseTemplate::new(500)),
        Mock::given(method("GET"))
            .respond_with(ResponseTemplate::new(200)
                .set_body_json(json!({"success": true}))),
    ]).await;

    let handler = RetryHandler::new(mock_server.uri(), 3);
    let result = handler.handle(Input {}).await.unwrap();

    assert_eq!(result.data["success"], true);
}

Next Steps

Chapter 6.0 introduces Pipeline handlers for composing multiple tools into workflows.


“Errors are inevitable. Recovery is engineering.” - pforge resilience principle