Multi-GPU Inference
Status: Verified | Idempotent: Yes | Coverage: 95%+
Run Command
cargo run --example gpu_multi_gpu_inference
Code
//! # Recipe: Multi-GPU Inference
//!
//! Contract: contracts/recipe-iiur-v1.yaml, contracts/flash-attention-v1.yaml
//! **Category**: GPU Acceleration
//! **Isolation Level**: Full
//! **Idempotency**: Guaranteed
//! **Dependencies**: None (default features)
//!
//! ## QA Checklist
//! 1. [x] `cargo run` succeeds (Exit Code 0)
//! 2. [x] `cargo test` passes
//! 3. [x] Deterministic output (Verified)
//! 4. [x] No temp files leaked
//! 5. [x] Memory usage stable
//! 6. [x] WASM compatible (N/A)
//! 7. [x] Clippy clean
//! 8. [x] Rustfmt standard
//! 9. [x] No `unwrap()` in logic
//! 10. [x] Proptests pass (100+ cases)
//!
//! ## Learning Objective
//! Distribute inference across multiple GPUs.
//!
//! ## Run Command
//! ```bash
//! cargo run --example gpu_multi_gpu_inference
//! ```
//!
//!
//! ## Format Variants
//! ```bash
//! apr run --device gpu model.apr # APR native format
//! apr run --device gpu model.gguf # GGUF (llama.cpp compatible)
//! apr run --device gpu model.safetensors # SafeTensors (HuggingFace)
//! ```
//! ## References
//! - Dao, T. et al. (2022). *FlashAttention: Fast and Memory-Efficient Exact Attention*. NeurIPS. arXiv:2205.14135
use apr_cookbook::prelude::*;
use serde::{Deserialize, Serialize};
fn main() -> Result<()> {
let mut ctx = RecipeContext::new("gpu_multi_gpu_inference")?;
println!("=== Recipe: {} ===", ctx.name());
println!("Multi-GPU inference distribution");
println!();
// Detect GPUs
let gpus = detect_gpus();
ctx.record_metric("gpu_count", gpus.len() as i64);
println!("Detected GPUs:");
for gpu in &gpus {
println!(" GPU {}: {} ({}GB)", gpu.id, gpu.name, gpu.memory_gb);
}
println!();
// Configure multi-GPU strategy
let strategies = vec![
DistributionStrategy::DataParallel,
DistributionStrategy::PipelineParallel,
DistributionStrategy::TensorParallel,
];
// Model config
let model_config = ModelConfig {
total_params_b: 7.0, // 7B parameter model
layers: 32,
batch_size: 64,
};
println!(
"Model: {:.0}B parameters, {} layers",
model_config.total_params_b, model_config.layers
);
println!("Batch size: {}", model_config.batch_size);
println!();
println!("Strategy Comparison ({} GPUs):", gpus.len());
println!("{:-<70}", "");
println!(
"{:<20} {:>12} {:>12} {:>12} {:>10}",
"Strategy", "Time(ms)", "Throughput", "Efficiency", "Memory/GPU"
);
println!("{:-<70}", "");
let mut results = Vec::new();
for strategy in &strategies {
let result = benchmark_strategy(&gpus, &model_config, *strategy)?;
results.push(result.clone());
println!(
"{:<20} {:>10.2}ms {:>10.0}/s {:>10.0}% {:>8}GB",
format!("{:?}", strategy),
result.total_time_ms,
result.throughput,
result.efficiency * 100.0,
result.memory_per_gpu_gb
);
}
println!("{:-<70}", "");
// Best strategy
let best = results
.iter()
.max_by(|a, b| {
a.throughput
.partial_cmp(&b.throughput)
.unwrap_or(std::cmp::Ordering::Equal)
})
.ok_or_else(|| CookbookError::invalid_format("No results"))?;
ctx.record_float_metric("best_throughput", best.throughput);
ctx.record_float_metric("best_efficiency", best.efficiency);
println!();
println!("Best Strategy: {:?}", best.strategy);
println!(" Throughput: {:.0} samples/sec", best.throughput);
println!(" Efficiency: {:.0}%", best.efficiency * 100.0);
// Scaling analysis
println!();
println!("Scaling Analysis:");
let single_gpu_throughput = benchmark_strategy(
&gpus[..1],
&model_config,
DistributionStrategy::DataParallel,
)?
.throughput;
let multi_gpu_throughput = best.throughput;
let scaling_factor = multi_gpu_throughput / single_gpu_throughput;
println!(" Single GPU: {:.0} samples/sec", single_gpu_throughput);
println!(
" {} GPUs: {:.0} samples/sec",
gpus.len(),
multi_gpu_throughput
);
println!(
" Scaling factor: {:.2}x (ideal: {}x)",
scaling_factor,
gpus.len()
);
// Save results
let results_path = ctx.path("multi_gpu_benchmark.json");
save_results(&results_path, &results)?;
println!();
println!("Results saved to: {:?}", results_path);
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GpuDevice {
id: u32,
name: String,
memory_gb: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ModelConfig {
total_params_b: f64,
layers: u32,
batch_size: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
enum DistributionStrategy {
DataParallel,
PipelineParallel,
TensorParallel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BenchmarkResult {
strategy: DistributionStrategy,
total_time_ms: f64,
throughput: f64,
efficiency: f64,
memory_per_gpu_gb: u32,
}
fn detect_gpus() -> Vec<GpuDevice> {
// Simulated 4-GPU setup
(0..4)
.map(|id| GpuDevice {
id,
name: format!("GPU {} (Simulated)", id),
memory_gb: 24,
})
.collect()
}
fn benchmark_strategy(
gpus: &[GpuDevice],
model: &ModelConfig,
strategy: DistributionStrategy,
) -> Result<BenchmarkResult> {
let gpu_count = gpus.len() as f64;
// Base time for single GPU
let base_time_ms = model.total_params_b * 10.0 * f64::from(model.batch_size) / 1000.0;
// Strategy-specific performance characteristics
let (speedup, _overhead, memory_factor) = match strategy {
DistributionStrategy::DataParallel => {
// Good scaling but communication overhead
let overhead = 1.0 + 0.1 * (gpu_count - 1.0);
(gpu_count / overhead, overhead, 1.0)
}
DistributionStrategy::PipelineParallel => {
// Linear memory scaling but bubble overhead
let bubble_overhead = 1.0 + (gpu_count - 1.0) / f64::from(model.layers);
(
gpu_count / bubble_overhead,
bubble_overhead,
1.0 / gpu_count,
)
}
DistributionStrategy::TensorParallel => {
// Best for large models but high communication
let comm_overhead = 1.0 + 0.15 * (gpu_count - 1.0);
(gpu_count / comm_overhead, comm_overhead, 1.0 / gpu_count)
}
};
let total_time = base_time_ms / speedup;
let throughput = (f64::from(model.batch_size) / total_time) * 1000.0;
let efficiency = speedup / gpu_count;
let base_memory = (model.total_params_b * 2.0) as u32; // ~2GB per B params
let memory_per_gpu = ((f64::from(base_memory) * memory_factor) as u32).max(1);
Ok(BenchmarkResult {
strategy,
total_time_ms: total_time,
throughput,
efficiency,
memory_per_gpu_gb: memory_per_gpu,
})
}
fn save_results(path: &std::path::Path, results: &[BenchmarkResult]) -> Result<()> {
let json = serde_json::to_string_pretty(results)
.map_err(|e| CookbookError::Serialization(e.to_string()))?;
std::fs::write(path, json)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_detect_gpus() {
let gpus = detect_gpus();
assert_eq!(gpus.len(), 4);
}
#[test]
fn test_data_parallel() {
let gpus = detect_gpus();
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: 32,
};
let result = benchmark_strategy(&gpus, &model, DistributionStrategy::DataParallel).unwrap();
assert!(result.throughput > 0.0);
assert!(result.efficiency > 0.0 && result.efficiency <= 1.0);
}
#[test]
fn test_pipeline_parallel_memory() {
let gpus = detect_gpus();
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: 32,
};
let data_parallel =
benchmark_strategy(&gpus, &model, DistributionStrategy::DataParallel).unwrap();
let pipeline =
benchmark_strategy(&gpus, &model, DistributionStrategy::PipelineParallel).unwrap();
// Pipeline parallel should use less memory per GPU
assert!(pipeline.memory_per_gpu_gb <= data_parallel.memory_per_gpu_gb);
}
#[test]
fn test_more_gpus_more_throughput() {
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: 32,
};
let gpus_2: Vec<_> = detect_gpus().into_iter().take(2).collect();
let gpus_4 = detect_gpus();
let result_2 =
benchmark_strategy(&gpus_2, &model, DistributionStrategy::DataParallel).unwrap();
let result_4 =
benchmark_strategy(&gpus_4, &model, DistributionStrategy::DataParallel).unwrap();
assert!(result_4.throughput > result_2.throughput);
}
#[test]
fn test_deterministic() {
let gpus = detect_gpus();
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: 32,
};
let r1 = benchmark_strategy(&gpus, &model, DistributionStrategy::TensorParallel).unwrap();
let r2 = benchmark_strategy(&gpus, &model, DistributionStrategy::TensorParallel).unwrap();
assert_eq!(r1.throughput, r2.throughput);
}
#[test]
fn test_save_results() {
let ctx = RecipeContext::new("test_multi_gpu_save").unwrap();
let path = ctx.path("results.json");
let results = vec![BenchmarkResult {
strategy: DistributionStrategy::DataParallel,
total_time_ms: 10.0,
throughput: 100.0,
efficiency: 0.9,
memory_per_gpu_gb: 12,
}];
save_results(&path, &results).unwrap();
assert!(path.exists());
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_efficiency_bounded(batch in 1u32..128) {
let gpus = detect_gpus();
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: batch,
};
for strategy in [
DistributionStrategy::DataParallel,
DistributionStrategy::PipelineParallel,
DistributionStrategy::TensorParallel,
] {
let result = benchmark_strategy(&gpus, &model, strategy).unwrap();
prop_assert!(result.efficiency > 0.0);
prop_assert!(result.efficiency <= 1.0);
}
}
#[test]
fn prop_throughput_positive(batch in 1u32..64) {
let gpus = detect_gpus();
let model = ModelConfig {
total_params_b: 7.0,
layers: 32,
batch_size: batch,
};
let result = benchmark_strategy(&gpus, &model, DistributionStrategy::DataParallel).unwrap();
prop_assert!(result.throughput > 0.0);
}
}
}