Streaming & Memory (Examples 21-30)

This section covers constant-memory streaming for large datasets.

Example 21: Streaming Constant Memory

use alimentar::streaming::{StreamingDataset, MemorySource};

let batches = /* source batches */;
let source = MemorySource::new(batches)?;
let streaming = StreamingDataset::new(Box::new(source), 16);

for batch in streaming {
    process_batch(batch);
}

Examples 22-23: Chained Sources and Memory Source

use alimentar::streaming::{StreamingDataset, ChainedSource, MemorySource};

// Chain multiple sources
let source1 = MemorySource::new(batches1)?;
let source2 = MemorySource::new(batches2)?;
let chained = ChainedSource::new(vec![
    Box::new(source1),
    Box::new(source2),
]);
let streaming = StreamingDataset::new(Box::new(chained), 16);

Examples 24-25: Parquet Streaming and Buffer Tuning

use alimentar::streaming::{StreamingDataset, ParquetSource};

// Stream parquet row groups
let source = ParquetSource::new("large.parquet")?
    .row_group_size(1024);
let streaming = StreamingDataset::new(Box::new(source), 8);

// Buffer size tuning
let streaming = StreamingDataset::builder()
    .source(source)
    .buffer_size(32)
    .build()?;

Examples 26-27: Async Prefetch and Backpressure

use alimentar::async_prefetch::AsyncPrefetchBuilder;

let prefetch = AsyncPrefetchBuilder::new(batches)
    .prefetch_size(4)
    .build()?;

// With backpressure
let streaming = StreamingDataset::new(Box::new(source), 16)
    .with_backpressure(8);

Examples 28-29: Iterator Reset and Memory Profile

use alimentar::streaming::StreamingDataset;

// Deterministic reset
let mut streaming = StreamingDataset::new(source, 16);
let first_pass: Vec<_> = streaming.by_ref().take(10).collect();
streaming.reset();
let second_pass: Vec<_> = streaming.by_ref().take(10).collect();

// Memory profiling
let streaming = StreamingDataset::new(source, 16)
    .with_memory_tracking(true);
println!("Peak memory: {} bytes", streaming.peak_memory());

Example 30: 10GB Dataset Test

use alimentar::streaming::{StreamingDataset, ParquetSource};

// Stream without loading entire dataset
let source = ParquetSource::new("10gb.parquet")?;
let streaming = StreamingDataset::new(Box::new(source), 16);

let mut total_rows = 0;
for batch in streaming {
    total_rows += batch.num_rows();
}
println!("Processed {} rows with constant memory", total_rows);

Key Concepts

  • Constant memory: Never loads full dataset
  • Buffer size: Controls memory/throughput tradeoff
  • Backpressure: Prevents producer outrunning consumer
  • Row groups: Parquet-native streaming unit