ZooParallel Engine
The ZooParallelPoolEngine is a specialized execution engine designed for "Heavy ETL" workloads where keeping track of granular progress (rows/sec, memory usage) across multiple processes is critical but typically expensive.
Unlike the default MultiProcessEngine which relies on standard Python multiprocessing.Queue (and thus Pickle serialization) for status reporting, ZooParallel uses shared memory (mmap) to provide zero-latency, zero-serialization observability.
When to use ZooParallel?
- High-Frequency Reporting: You want real-time progress bars without slowing down the workers.
- Chaos Engineering: You are running complex "chaotic" pipelines where you expect frequent failures and need immediate visibility.
- Maximum Performance: You want to squeeze every bit of CPU by removing IPC (Inter-Process Communication) overhead.
Installation
ZooParallel is an optional dependency. Install it with:
Usage
from zoopipe import CSVInputAdapter, JSONOutputAdapter, Pipe, PipeManager
from zoopipe.engines.zooparallel import ZooParallelPoolEngine
pipe = Pipe(
input_adapter=CSVInputAdapter("large_dataset.csv"),
output_adapter=JSONOutputAdapter("output.jsonl")
)
# Run with 4 workers using ZooParallel
with PipeManager.parallelize_pipe(
pipe,
workers=4,
engine=ZooParallelPoolEngine()
) as manager:
manager.run()
How it Works (Architecture)
-
Shared Memory Layout: The engine creates a small, fixed-size binary file in a temporary directory for each worker. This file is memory-mapped (
mmap) into both the Coordinator and the Worker process. -
Binary Protocol: Instead of pickling Python objects, workers write raw C-structs (
long longintegers) directly to the memory map. total_processedsuccess_counterror_count-
ram_bytes -
Lock-Free Monitoring: The Coordinator reads these memory maps periodically. Since the layout is fixed and writes are atomic-aligned, no locks are required. This means the Coordinator can poll status 100 times a second without impacting the Worker's performance.
API Reference
ZooParallelPoolEngine
Parameters:
- n_workers (int, optional): Number of worker processes in the pool. If not provided, it defaults to the number of pipes passed to start().
Methods:
- start(pipes): Spawns the worker pool and initializes shared memory files.
- wait(timeout): Waits for completion.
- shutdown(): Cleans up processes and temporary mmap files.