Multithread Executor Usage Examples
This document demonstrates how to use the Single Thread and MultiThread executors in ZooPipe.
Basic Usage
SingleThreadExecutor (Default)
from zoopipe import CSVInputAdapter, CSVOutputAdapter, Pipe, SingleThreadExecutor
pipe = Pipe(
input_adapter=CSVInputAdapter("input.csv"),
output_adapter=CSVOutputAdapter("output.csv"),
executor=SingleThreadExecutor(batch_size=1000), # Default batch_size
)
with pipe:
pipe.wait()
MultiThreadExecutor
from zoopipe import CSVInputAdapter, JSONOutputAdapter, MultiThreadExecutor, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=JSONOutputAdapter("users.jsonl", format="jsonl"),
executor=MultiThreadExecutor(
max_workers=8, # Number of threads (default: CPU count)
batch_size=2000, # Batch size per thread (default: 1000)
),
)
with pipe:
pipe.wait()
print(f"Processed {pipe.report.total_processed} records")
Two-Tier Parallel Model
ZooPipe uses a dual-layer approach to maximize performance:
- Orchestyration Tier (Engines): Used to scale out across multiple processes or eventually nodes (Ray, Dask). Managed via
PipeManager. - Execution Tier (Executors): Used to scale up within a single node by utilizing multiple Rust threads. Managed via the
executorparameter inPipe.
⚡ Performance Context: ZooPipe vs DataFrames
ZooPipe's executors are designed for "Row-Complex" workloads (hashing, API calls, Pydantic validation), where vectorized engines struggle.
- Vectorized Engines (Polars/Pandas): Excellent for bulk math (SUM, AVG) but incur huge serialization overhead when you need a custom Python function (
map_elements). - ZooPipe Executors: Run Python logic in parallel native streaming batches. This makes them significantly faster (and lighter on RAM) for "chaotic" ETL tasks that require custom logic per row.
Comparison at a Glance
| Level | Component | Scaling Type | Parallelism |
|---|---|---|---|
| Cluster/Node | Engine |
Scaling Out | Python Processes / Distributed |
| Process | Executor |
Scaling Up | Rust Native Threads |
For most high-throughput local workloads, the most powerful pattern is combining both: A PipeManager with 4 workers, where each worker uses a MultiThreadExecutor.
When to Use Each Executor
SingleThreadExecutor
Use when: - Data processing is I/O-bound - Processing simple transformations - Debugging or development - Order preservation is naturally maintained (single stream)
[!NOTE] Even with
SingleThreadExecutor, ZooPipe utilizes background threads for S3 sources via its Hybrid I/O Strategy to prevent GIL blocking during network I/O, while keeping the processing logic on the main thread.
MultiThreadExecutor
Use when: - Validation/transformation is CPU-intensive (e.g., complex Pydantic models) - Maximum throughput is needed across multiple CPU cores - Order of records in the output is not a critical requirement
[!IMPORTANT] The
MultiThreadExecutorprocesses batches in parallel. While throughput is significantly higher, the order of records in the destination may differ from the source.
Performance Tuning
Batch Size
Larger batch sizes reduce overhead but increase memory usage:
# For small records, larger batches
executor = MultiThreadExecutor(max_workers=4, batch_size=5000)
# For large records, smaller batches
executor = MultiThreadExecutor(max_workers=4, batch_size=500)
Thread Count
Match thread count to your workload:
import os
# Use all available cores
executor = MultiThreadExecutor(max_workers=None) # Auto-detect
# Conservative (50% of cores)
executor = MultiThreadExecutor(max_workers=os.cpu_count() // 2)
# Aggressive (2x cores for I/O-bound work)
executor = MultiThreadExecutor(max_workers=os.cpu_count() * 2)
Complete Example with Schema
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, CSVOutputAdapter, MultiThreadExecutor, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
age: int
email: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter("validated_users.csv"),
error_output_adapter=CSVOutputAdapter("errors.csv"),
schema_model=UserSchema,
executor=MultiThreadExecutor(
max_workers=8,
batch_size=2000,
),
)
with pipe:
pipe.wait()
report = pipe.report
print(f"✅ Success: {report.success_count}")
print(f"❌ Errors: {report.error_count}")
print(f"⚡ Speed: {report.items_per_second:.0f} items/sec")