PipeManager
PipeManager orchestrates multiple Pipe instances to run in parallel. It uses an Engine abstraction to handle the distribution of work across processes or nodes, making it independent of the underlying execution strategy.
Execution Engines
ZooPipe separates orchestration from execution. PipeManager delegates the lifecycle of pipes to an engine:
MultiProcessEngine(Default): Runs each pipe in a separate Python process. Ideal for bypassing the GIL on a single machine.ZoosyncPoolEngine: Specialized engine using shared memory (mmap) for zero-latency status reporting. Best for "Heavy ETL" and granular monitoring (see ZooSync Guide).RayEngine: Distributed execution across a cluster using Ray. Zero-config dependency management for pip, uv, and poetry (see Ray Guide).DaskEngine: Distributed execution using Dask. Zero-config dependency management for pip, uv, and poetry (see Dask Guide).
The Two-Tier Parallel Model
ZooPipe uses a two-tier approach to maximize processing throughput:
- Tier 1: Horizontal Scaling (Engines):
PipeManageruses anEngineto split work across multiple processes or nodes. This is "Parallelism at the Pipe level". - Tier 2: Vertical Scaling (BatchExecutors): Each individual
Pipeuses aBatchExecutor(Single/Multi-threaded) to process chunks of data. This is "Parallelism at the Record level".
By combining both, you can, for example, run 4 processes in a node, each processing data with 4 threads.
Parallelizing a Single Pipe
The fastest way to scale a single input source is to use PipeManager.parallelize_pipe. It shards the adapters and creates a distributed execution automatically:
from zoopipe.engines.ray import RayEngine
from zoopipe import MultiThreadExecutor
manager = PipeManager.parallelize_pipe(
source_pipe,
workers=4,
engine=RayEngine(),
executor=MultiThreadExecutor(max_workers=4)
)
When to Use PipeManager
Use PipeManager when you need to:
- Process multiple independent data sources simultaneously
- Run the same pipeline on different data partitions in parallel
- Maximize CPU utilization across multiple cores
- Orchestrate complex multi-stage workflows
Basic Usage
from zoopipe import Pipe, PipeManager, CSVInputAdapter, JSONOutputAdapter
from pydantic import BaseModel
class UserSchema(BaseModel):
user_id: str
username: str
email: str
manager = PipeManager(
pipes=[
Pipe(
input_adapter=CSVInputAdapter("data_part_1.csv"),
output_adapter=JSONOutputAdapter("output_1.jsonl"),
schema_model=UserSchema,
),
Pipe(
input_adapter=CSVInputAdapter("data_part_2.csv"),
output_adapter=JSONOutputAdapter("output_2.jsonl"),
schema_model=UserSchema,
),
Pipe(
input_adapter=CSVInputAdapter("data_part_3.csv"),
output_adapter=JSONOutputAdapter("output_3.jsonl"),
schema_model=UserSchema,
),
]
)
### Running a Pipeline
The simplest way to execute a managed pipeline is using the `run()` method, which handles the entire lifecycle:
```python
# Start, wait, and coordinate (including merging if applicable)
manager.run(wait=True, merge=True)
print(f"Total processed: {manager.report.total_processed}")
## Monitoring Progress
PipeManager provides both aggregated and per-pipe reporting:
```python
import time
manager.start()
while not manager.report.is_finished:
print(f"Total: {manager.report.total_processed} | "
f"Speed: {manager.report.items_per_second:.2f} rows/s | "
f"RAM: {manager.report.ram_bytes / 1024 / 1024:.2f} MB")
for i, pipe_report in enumerate(manager.pipe_reports):
print(f" Pipe {i}: {pipe_report.total_processed} processed, "
f"finished: {pipe_report.is_finished}")
time.sleep(1)
print(manager.report)
Context Manager Support
PipeManager can be used as a context manager for automatic start and cleanup:
with PipeManager(pipes=[pipe1, pipe2, pipe3]) as manager:
# Manager starts automatically in __enter__
manager.wait()
print(f"Progress: {manager.report.total_processed}")
API Reference
Constructor
Parameters:
- pipes: List of Pipe instances to run.
- engine: The orchestration engine to use. Defaults to MultiProcessEngine().
Class Methods
parallelize_pipe(...)
@classmethod
def parallelize_pipe(
pipe: Pipe,
workers: int,
merge: bool = True,
executor: BatchExecutor | None = None,
engine: BaseEngine | None = None
) -> PipeManager
Creates a managed parallel pipeline by sharding the adapters of the source pipe.
Parameters:
- workers: Number of parallel shards.
- merge: If True, automatically injects a FileMergeCoordinator for file-based outputs.
- executor: The Rust executor (Single/MultiThread) to use within each worker.
- engine: The engine to handle the worker distribution.
run(...) -> bool
The recommended way to execute the pipeline. It orchestrates the full lifecycle including pre-start and post-finish coordination hooks.
Parameters:
- wait: Whether to wait for execution to finish.
- merge: Legacy compatibility flag (still used to trigger on_finish hooks).
- timeout: Maximum time to wait if wait=True.
start() -> None
Starts all pipes in separate processes. Raises RuntimeError if already running.
wait(timeout: float | None = None) -> bool
Waits for all pipes to complete.
Parameters:
- timeout: Maximum time to wait in seconds (optional)
Returns:
- True if all pipes finished, False if timeout occurred
shutdown(timeout: float = 5.0) -> None
Gracefully shuts down all running pipes.
Parameters:
- timeout: Maximum time to wait for graceful shutdown before forcing termination
get_pipe_report(index: int) -> PipeReport
Gets the report for a specific pipe.
Parameters:
- index: Zero-based index of the pipe
Returns:
- PipeReport with metrics for the specified pipe
Properties
pipes -> list[Pipe]
Returns the list of pipes managed by this instance.
pipe_count -> int
Returns the number of pipes being managed.
is_running -> bool
Returns True if the engine reports that execution is still active.
pipe_reports -> list[PipeReport]
Returns a list of PipeReport objects, one for each pipe.
report -> PipeReport
Returns an aggregated PipeReport combining metrics from all pipes.
PipeReport
Individual pipe report with the following fields:
pipe_index: Zero-based index of the pipetotal_processed: Total records processed by this pipesuccess_count: Number of successfully processed recordserror_count: Number of failed recordsram_bytes: Current RAM usage in bytesis_finished: Whether the pipe has completedhas_error: Whether the pipe encountered an erroris_alive: Whether the pipe process is still alive
The MultiProcessEngine
The default engine uses Python's multiprocessing module.
- True parallelism: Each pipe runs on a separate CPU core.
- Memory isolation: Each pipe has its own memory space.
- Fault isolation: If one pipe crashes, others continue running.
Performance Considerations
When Parallel Execution Helps
- I/O-bound workloads: Reading from or writing to multiple files/databases simultaneously
- Multiple data sources: Processing partitioned data in parallel
- Independent pipelines: Running completely separate data transformations
When Parallel Execution May Not Help
- Single large file: Use a single pipe with
MultiThreadExecutorinstead - Shared resources: Multiple pipes writing to the same database may cause contention
- Memory-constrained systems: Each process has its own memory overhead
Best Practices
-
Partition your data appropriately: Split large datasets into balanced chunks for better load distribution
-
Monitor individual pipes: Use
pipe_reportsto identify bottlenecks or failed pipes -
Handle errors gracefully: Check
has_errorflag in individual pipe reports to detect failures -
Use context managers: Ensure proper cleanup with
withstatement -
Consider memory usage: Each pipe process duplicates Python interpreter and loaded modules
Examples
Processing Partitioned Data
import glob
pipes = []
for csv_file in glob.glob("data_parts/*.csv"):
output_file = csv_file.replace("data_parts", "output").replace(".csv", ".jsonl")
pipes.append(
Pipe(
input_adapter=CSVInputAdapter(csv_file),
output_adapter=JSONOutputAdapter(output_file),
schema_model=UserSchema,
)
)
with PipeManager(pipes=pipes) as manager:
manager.wait()
Different Pipelines in Parallel
manager = PipeManager(
pipes=[
Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter("users_clean.csv"),
schema_model=UserSchema,
),
Pipe(
input_adapter=CSVInputAdapter("orders.csv"),
output_adapter=CSVOutputAdapter("orders_clean.csv"),
schema_model=OrderSchema,
),
Pipe(
input_adapter=CSVInputAdapter("products.csv"),
output_adapter=CSVOutputAdapter("products_clean.csv"),
schema_model=ProductSchema,
),
]
)
manager.start()
manager.wait()
Related Documentation
- Executors Guide - For parallelizing a single pipe
- Ray Guide - Distributed execution with Ray
- Dask Guide - Distributed execution with Dask
- CSV Adapters - Common use case for parallel processing ```