Streaming Inference
Status: Verified | Idempotent: Yes | Coverage: 95%+
Run Command
cargo run --example api_streaming_inference
Code
//! # Recipe: Streaming Model Inference
//!
//! Contract: contracts/recipe-iiur-v1.yaml
//! **Category**: API Integration
//! **Isolation Level**: Full
//! **Idempotency**: Guaranteed
//! **Dependencies**: None (default features)
//!
//! ## QA Checklist
//! 1. [x] `cargo run` succeeds (Exit Code 0)
//! 2. [x] `cargo test` passes
//! 3. [x] Deterministic output (Verified)
//! 4. [x] No temp files leaked
//! 5. [x] Memory usage stable
//! 6. [x] WASM compatible (N/A)
//! 7. [x] Clippy clean
//! 8. [x] Rustfmt standard
//! 9. [x] No `unwrap()` in logic
//! 10. [x] Proptests pass (100+ cases)
//!
//! ## Learning Objective
//! Stream model outputs token-by-token (simulated).
//!
//! ## Run Command
//! ```bash
//! cargo run --example api_streaming_inference
//! ```
//!
//!
//! ## Format Variants
//! ```bash
//! apr serve model.apr # APR native format
//! apr serve model.gguf # GGUF (llama.cpp compatible)
//! apr serve model.safetensors # SafeTensors (HuggingFace)
//! ```
//! ## References
//! - Crankshaw, D. et al. (2017). *Clipper: A Low-Latency Online Prediction Serving System*. NSDI. arXiv:1612.03079
use apr_cookbook::prelude::*;
use std::collections::VecDeque;
fn main() -> Result<()> {
let mut ctx = RecipeContext::new("api_streaming_inference")?;
println!("=== Recipe: {} ===", ctx.name());
println!("Streaming model inference (simulated)");
println!();
// Create streaming inference session
let mut session = StreamingSession::new(StreamConfig {
max_tokens: 20,
temperature: 0.7,
buffer_size: 4,
});
// Input prompt
let prompt = "The quick brown fox";
println!("Prompt: {}", prompt);
println!();
// Initialize stream
session.start(prompt);
ctx.record_metric("prompt_tokens", prompt.split_whitespace().count() as i64);
// Stream tokens
println!("Streaming output:");
print!(" ");
let mut total_tokens = 0;
while let Some(token) = session.next_token() {
print!("{} ", token);
total_tokens += 1;
}
println!();
ctx.record_metric("output_tokens", total_tokens);
ctx.record_metric("total_chunks", session.chunk_count() as i64);
println!();
println!("Statistics:");
println!(" Total tokens: {}", total_tokens);
println!(" Chunks sent: {}", session.chunk_count());
println!(
" Avg tokens/chunk: {:.1}",
total_tokens as f64 / session.chunk_count() as f64
);
// Save streaming log
let log_path = ctx.path("stream_log.txt");
session.save_log(&log_path)?;
println!();
println!("Stream log saved to: {:?}", log_path);
Ok(())
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct StreamConfig {
max_tokens: usize,
temperature: f32,
buffer_size: usize,
}
#[derive(Debug)]
struct StreamingSession {
config: StreamConfig,
buffer: VecDeque<String>,
tokens_generated: usize,
chunks_sent: usize,
seed: u64,
log: Vec<String>,
}
impl StreamingSession {
fn new(config: StreamConfig) -> Self {
Self {
config,
buffer: VecDeque::new(),
tokens_generated: 0,
chunks_sent: 0,
seed: 42,
log: Vec::new(),
}
}
fn start(&mut self, prompt: &str) {
self.log.push(format!("START: {}", prompt));
// Pre-fill buffer with mock tokens
self.refill_buffer();
}
fn next_token(&mut self) -> Option<String> {
if self.tokens_generated >= self.config.max_tokens {
return None;
}
// Refill buffer if needed
if self.buffer.is_empty() {
self.refill_buffer();
self.chunks_sent += 1;
}
let token = self.buffer.pop_front()?;
self.tokens_generated += 1;
self.log
.push(format!("TOKEN[{}]: {}", self.tokens_generated, token));
Some(token)
}
fn refill_buffer(&mut self) {
// Deterministic mock token generation
let tokens = [
"jumps", "over", "the", "lazy", "dog", "and", "runs", "through", "the", "forest",
"with", "great", "speed", "while", "hunting", "for", "food", "in", "the", "wild",
];
for i in 0..self.config.buffer_size {
let idx = (self.seed as usize + self.tokens_generated + i) % tokens.len();
self.buffer.push_back(tokens[idx].to_string());
}
}
fn chunk_count(&self) -> usize {
self.chunks_sent.max(1)
}
fn save_log(&self, path: &std::path::Path) -> Result<()> {
let content = self.log.join("\n");
std::fs::write(path, content)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_session_creation() {
let session = StreamingSession::new(StreamConfig {
max_tokens: 10,
temperature: 1.0,
buffer_size: 4,
});
assert_eq!(session.tokens_generated, 0);
assert!(session.buffer.is_empty());
}
#[test]
fn test_token_generation() {
let mut session = StreamingSession::new(StreamConfig {
max_tokens: 5,
temperature: 1.0,
buffer_size: 2,
});
session.start("test");
let mut tokens = Vec::new();
while let Some(token) = session.next_token() {
tokens.push(token);
}
assert_eq!(tokens.len(), 5);
}
#[test]
fn test_deterministic_output() {
let config = StreamConfig {
max_tokens: 10,
temperature: 1.0,
buffer_size: 4,
};
let mut session1 = StreamingSession::new(config.clone());
let mut session2 = StreamingSession::new(config);
session1.start("test");
session2.start("test");
let tokens1: Vec<_> = std::iter::from_fn(|| session1.next_token()).collect();
let tokens2: Vec<_> = std::iter::from_fn(|| session2.next_token()).collect();
assert_eq!(tokens1, tokens2);
}
#[test]
fn test_max_tokens_limit() {
let mut session = StreamingSession::new(StreamConfig {
max_tokens: 3,
temperature: 1.0,
buffer_size: 10,
});
session.start("test");
let count = std::iter::from_fn(|| session.next_token()).count();
assert_eq!(count, 3);
}
#[test]
fn test_log_save() {
let ctx = RecipeContext::new("test_stream_log").unwrap();
let path = ctx.path("log.txt");
let mut session = StreamingSession::new(StreamConfig {
max_tokens: 2,
temperature: 1.0,
buffer_size: 2,
});
session.start("hello");
while session.next_token().is_some() {}
session.save_log(&path).unwrap();
assert!(path.exists());
}
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_respects_max_tokens(max_tokens in 1usize..50) {
let mut session = StreamingSession::new(StreamConfig {
max_tokens,
temperature: 1.0,
buffer_size: 4,
});
session.start("test");
let count = std::iter::from_fn(|| session.next_token()).count();
prop_assert_eq!(count, max_tokens);
}
#[test]
fn prop_tokens_not_empty(max_tokens in 1usize..20, buffer_size in 1usize..10) {
let mut session = StreamingSession::new(StreamConfig {
max_tokens,
temperature: 1.0,
buffer_size,
});
session.start("test");
while let Some(token) = session.next_token() {
prop_assert!(!token.is_empty());
}
}
}
}