Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Parallel Development

This chapter covers strategies for parallel development when working with the Sovereign AI Stack, including distributed computing patterns with repartir.

Overview

Parallel development in the stack operates at multiple levels:

  1. Code-level parallelism: Rayon, SIMD, GPU compute
  2. Task-level parallelism: repartir work-stealing scheduler
  3. Machine-level parallelism: Distributed execution across nodes
  4. Team-level parallelism: Concurrent development workflows

Code-Level Parallelism

SIMD with Trueno

#![allow(unused)]
fn main() {
use trueno::Vector;

// Automatic SIMD (AVX2/AVX-512/NEON)
let a = Vector::from_slice(&[1.0, 2.0, 3.0, 4.0]);
let b = Vector::from_slice(&[5.0, 6.0, 7.0, 8.0]);
let result = a.add(&b)?;  // SIMD-accelerated
}

GPU with wgpu

#![allow(unused)]
fn main() {
use repartir::executor::gpu::GpuExecutor;

let gpu = GpuExecutor::new().await?;
println!("Using: {} ({} compute units)",
    gpu.device_name(),
    gpu.capacity()
);
}

Task-Level Parallelism

Work-Stealing with repartir

The Blumofe & Leiserson work-stealing algorithm provides efficient load balancing:

#![allow(unused)]
fn main() {
use repartir::{Pool, task::{Task, Backend}};

let pool = Pool::builder()
    .cpu_workers(num_cpus::get())
    .build()?;

// Tasks automatically distributed across workers
for chunk in data.chunks(1000) {
    let task = Task::builder()
        .binary("./process")
        .arg(format!("--data={:?}", chunk))
        .backend(Backend::Cpu)
        .build()?;

    pool.submit(task).await?;
}
}

Backend Selection Strategy

Workload SizeComplexityRecommended Backend
< 1K elementsAnyScalar (no overhead)
1K - 100KLow/MediumSIMD (trueno)
> 100KHigh (O(n²)+)GPU (wgpu)
> 10MAnyDistributed (repartir remote)

Machine-Level Parallelism

Multi-Node Deployment

┌─────────────────────────────────────────────────────────────┐
│                    Coordinator Node                         │
│                    (batuta orchestration)                   │
├─────────────────────────────────────────────────────────────┤
│                    repartir RemoteExecutor                  │
├───────────────┬───────────────┬───────────────┬─────────────┤
│   Worker 1    │   Worker 2    │   Worker 3    │   Worker N  │
│   GPU + CPU   │   GPU + CPU   │   GPU + CPU   │   GPU + CPU │
└───────────────┴───────────────┴───────────────┴─────────────┘

Setting Up Workers

# On each worker node
cargo install repartir --features remote

# Start worker daemon
repartir-worker --bind 0.0.0.0:9000

# With TLS (production)
repartir-worker --bind 0.0.0.0:9443 \
    --cert ./certs/server.pem \
    --key ./certs/server.key

Coordinator Code

#![allow(unused)]
fn main() {
use repartir::executor::remote::RemoteExecutor;

let workers = vec![
    "10.0.0.1:9000",
    "10.0.0.2:9000",
    "10.0.0.3:9000",
];

let executor = RemoteExecutor::builder()
    .add_workers(&workers)
    .build()
    .await?;

// Tasks distributed automatically
for task in tasks {
    let result = executor.execute(task).await?;
}
}

Team-Level Parallelism

Git Workflow for Parallel Development

main ─────────────────────────────────────────────────►
       │                    │                    │
       ▼                    ▼                    ▼
   feature/ml-model    feature/api-v2    feature/gpu-opt
       │                    │                    │
       └────────────────────┴────────────────────┘
                            │
                            ▼
                    Integration Branch
                            │
                            ▼
                      CI/CD Pipeline
                            │
                            ▼
                          main

Module Boundaries

Structure code for parallel development:

src/
├── core/           # Stable, shared code
│   ├── types.rs
│   └── traits.rs
├── ml/             # Team A: ML features
│   ├── training.rs
│   └── inference.rs
├── api/            # Team B: API features
│   ├── handlers.rs
│   └── routes.rs
└── compute/        # Team C: Compute optimization
    ├── simd.rs
    └── gpu.rs

Batuta Stack Workflow

# Check component health (parallel-safe)
batuta stack check

# Quality gate before merge
batuta stack gate

# Version status
batuta stack versions

Performance Patterns

Amdahl’s Law Considerations

Speedup = 1 / ((1 - P) + P/N)

Where:
  P = Parallel fraction of code
  N = Number of processors
AlgorithmParallel Fraction8-Node Speedup
Random Forest0.955.9x
K-Means0.854.4x
Linear Regression0.905.0x
Neural Network0.925.4x

Communication Overhead

Minimize cross-node communication:

#![allow(unused)]
fn main() {
// BAD: Fine-grained tasks (high overhead)
for item in items {
    executor.execute(process_one(item)).await?;
}

// GOOD: Coarse-grained tasks (batch processing)
for chunk in items.chunks(10_000) {
    executor.execute(process_batch(chunk)).await?;
}
}

Monitoring & Debugging

TUI Dashboard

# Monitor distributed job flow
cargo run --bin job-flow --features tui,remote

Logging

#![allow(unused)]
fn main() {
use tracing::{info, debug, span, Level};

let span = span!(Level::INFO, "distributed_task", node = %node_id);
let _guard = span.enter();

info!("Submitting task to {}", node_id);
debug!("Task payload: {:?}", task);
}

Metrics Collection

#![allow(unused)]
fn main() {
use std::time::Instant;

let start = Instant::now();
let result = executor.execute(task).await?;
let duration = start.elapsed();

metrics::histogram!("task_duration_ms", duration.as_millis() as f64);
metrics::counter!("tasks_completed", 1);
}

Best Practices

1. Profile Before Parallelizing

# Use pmat for analysis
pmat check . --analyze-complexity

# Identify hot paths
cargo flamegraph --root

2. Start with Coarse Granularity

Begin with large tasks, then refine if needed.

3. Handle Failures Gracefully

#![allow(unused)]
fn main() {
match executor.execute(task).await {
    Ok(result) if result.is_success() => {
        // Process result
    }
    Ok(result) => {
        // Task failed, retry or skip
        log::warn!("Task failed: {:?}", result.stderr_str());
    }
    Err(e) => {
        // Network/system error, may retry
        log::error!("Execution error: {}", e);
    }
}
}

4. Use Checkpointing for Long Jobs

#![allow(unused)]
fn main() {
use repartir::checkpoint::CheckpointManager;

let checkpoint = CheckpointManager::new("./checkpoints")?;

for epoch in start_epoch..total_epochs {
    // Train epoch
    train_epoch(epoch).await?;

    // Checkpoint after each epoch
    checkpoint.save(&format!("epoch_{}", epoch), &state).await?;
}
}

Navigate: Table of Contents | Code Review | Knowledge Transfer