Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Data Pipeline: Pipeline Handler Overview

Pipeline handlers compose multiple tools into workflows. This chapter demonstrates building data processing pipelines with conditional execution and state management.

Why Pipeline Handlers?

Use pipeline handlers when:

  • Chaining multiple tools together
  • Output of one tool feeds input of next
  • Conditional execution based on results
  • Multi-step workflows with shared state

Don’t use pipeline handlers when:

  • Single tool suffices
  • Complex branching logic (use Native)
  • Real-time streaming required
  • Tools are independent (call separately)

Example: Data Processing Pipeline

forge:
  name: data-pipeline
  version: 0.1.0
  transport: stdio

tools:
  - type: pipeline
    name: process_user_data
    description: "Fetch, validate, transform, and store user data"
    steps:
      - tool: fetch_user
        input:
          user_id: "{{user_id}}"
        output_var: user_data

      - tool: validate_user
        input:
          data: "{{user_data}}"
        output_var: validated
        condition: "user_data"

      - tool: transform_data
        input:
          raw: "{{validated}}"
        output_var: transformed
        condition: "validated"

      - tool: store_data
        input:
          data: "{{transformed}}"
        error_policy: fail_fast
    params:
      user_id:
        type: string
        required: true

Pipeline Anatomy

Steps

steps:
  - tool: step_name        # Tool to execute
    input: {...}           # Input template
    output_var: result     # Store output in variable
    condition: "var_name"  # Execute if variable exists
    error_policy: continue # Or fail_fast

Variable Interpolation

steps:
  - tool: get_data
    input:
      id: "{{request_id}}"
    output_var: data

  - tool: process
    input:
      payload: "{{data}}"  # Uses output from previous step

Error Policies

fail_fast (default): Stop on first error

error_policy: fail_fast

continue: Skip failed steps, continue pipeline

error_policy: continue

Complete Pipeline Example

tools:
  # Individual tools
  - type: http
    name: fetch_weather
    endpoint: "https://api.weather.com/{{city}}"
    method: GET
    params:
      city: { type: string, required: true }

  - type: native
    name: parse_weather
    handler:
      path: handlers::parse_weather
    params:
      raw_data: { type: object, required: true }

  - type: http
    name: send_notification
    endpoint: "https://notify.example.com/send"
    method: POST
    body:
      message: "{{message}}"
    params:
      message: { type: string, required: true }

  # Pipeline composing them
  - type: pipeline
    name: weather_alert
    description: "Fetch weather and send alerts if needed"
    steps:
      - tool: fetch_weather
        input:
          city: "{{city}}"
        output_var: raw_weather

      - tool: parse_weather
        input:
          raw_data: "{{raw_weather}}"
        output_var: weather
        condition: "raw_weather"

      - tool: send_notification
        input:
          message: "Alert: {{weather.condition}} in {{city}}"
        condition: "weather.is_alert"
        error_policy: continue

    params:
      city: { type: string, required: true }

Pipeline Execution Flow

Input: { "city": "Boston" }
  ↓
Step 1: fetch_weather(city="Boston")
  → Output: { "temp": 32, "condition": "snow" }
  → Store in: raw_weather
  ↓
Step 2: parse_weather(raw_data=raw_weather)
  → Condition: raw_weather exists ✓
  → Output: { "is_alert": true, "condition": "Heavy Snow" }
  → Store in: weather
  ↓
Step 3: send_notification(message="Alert: Heavy Snow in Boston")
  → Condition: weather.is_alert=true ✓
  → Output: { "sent": true }
  ↓
Pipeline Result: { "results": [...], "variables": {...} }

Input/Output Structure

Pipeline Input

{
  "variables": {
    "city": "Boston",
    "user_id": "123"
  }
}

Pipeline Output

{
  "results": [
    {
      "tool": "fetch_weather",
      "success": true,
      "output": { "temp": 32, "condition": "snow" },
      "error": null
    },
    {
      "tool": "parse_weather",
      "success": true,
      "output": { "is_alert": true },
      "error": null
    },
    {
      "tool": "send_notification",
      "success": true,
      "output": { "sent": true },
      "error": null
    }
  ],
  "variables": {
    "city": "Boston",
    "raw_weather": {...},
    "weather": {...}
  }
}

Error Handling

Fail Fast (Default)

steps:
  - tool: critical_step
    input: {...}
    # Implicit: error_policy: fail_fast

  - tool: next_step
    input: {...}
    # Won't execute if critical_step fails

Continue on Error

steps:
  - tool: optional_step
    input: {...}
    error_policy: continue  # Pipeline continues even if this fails

  - tool: final_step
    input: {...}
    # Executes regardless of optional_step outcome

Real-World Example: ETL Pipeline

tools:
  - type: pipeline
    name: etl_pipeline
    description: "Extract, Transform, Load data pipeline"
    steps:
      # Extract
      - tool: extract_from_api
        input:
          endpoint: "{{source_url}}"
          api_key: "{{api_key}}"
        output_var: raw_data
        error_policy: fail_fast

      # Transform
      - tool: clean_data
        input:
          data: "{{raw_data}}"
        output_var: cleaned
        condition: "raw_data"

      - tool: enrich_data
        input:
          data: "{{cleaned}}"
        output_var: enriched
        condition: "cleaned"

      - tool: aggregate_data
        input:
          data: "{{enriched}}"
        output_var: aggregated
        condition: "enriched"

      # Load
      - tool: validate_schema
        input:
          data: "{{aggregated}}"
        output_var: validated
        error_policy: fail_fast

      - tool: load_to_database
        input:
          data: "{{validated}}"
          table: "{{target_table}}"
        error_policy: fail_fast

      # Notify
      - tool: send_success_notification
        input:
          message: "ETL completed: {{aggregated.count}} records"
        error_policy: continue

    params:
      source_url: { type: string, required: true }
      api_key: { type: string, required: true }
      target_table: { type: string, required: true }

Performance Characteristics

MetricValue
Dispatch overhead50-100μs per step
Variable lookupO(1) HashMap
Condition evaluation< 1μs
State memory~100B per variable

When to Use Native vs Pipeline

Pipeline Handler - Linear workflows:

type: pipeline
steps:
  - tool: fetch
  - tool: process
  - tool: store

Native Handler - Complex logic:

async fn handle(&self, input: Input) -> Result<Output> {
    let data = fetch().await?;

    if data.requires_processing() {
        let processed = complex_transform(data)?;
        store(processed).await?;
    } else {
        quick_store(data).await?;
    }

    Ok(Output { ... })
}

Next Steps

Chapter 6.1 covers tool composition patterns, including parallel execution and error propagation.


“Pipelines compose tools. Tools compose behavior.” - pforge composition principle