Home
ZooPipe is a lean, ultra-high-performance data processing engine for Python. It leverages a 100% Rust core to handle I/O and orchestration, while keeping the flexibility of Python for schema validation (via Pydantic) and custom data enrichment (via Hooks).
Read the docs for more information.
✨ Key Features
- 🚀 100% Native Rust Engine: The core execution loop, including CSV and JSON parsing/writing, is implemented in Rust for maximum throughput.
- 🔍 Declarative Validation: Use Pydantic models to define and validate your data structures naturally.
- 🪝 Python Hooks: Transform and enrich data at any stage using standard Python functions or classes.
- 🚨 Automated Error Routing: Native support for routing failed records to a dedicated error output.
- 📊 Multiple Format Support: Optimized readers/writers for CSV, JSONL, Parquet, and Iceberg.
- 🔧 Two-Tier Parallelism: Orchestrate across processes or clusters with Engines (Local, Ray, Dask), and scale throughput at the node level with Rust Executors.
- ☁️ Cloud Native: Native S3, GCS, and Azure support, plus native Iceberg Data Lake integration.
⚡ Performance & Benchmarks
Why ZooPipe? Because vectorization isn't always the answer.
Tools like Pandas and Polars are incredible for analytical workloads (groupby, sum, joins) where operations can be vectorized in C/Rust. However, real-world Data Engineering often involves "chaotic ETL": messy custom rules, API calls per row, hashing, conditional cleanup, and complex normalization that forcedly drop down to Python loops.
In these "Heavy ETL" scenarios, ZooPipe outperforms Vectorized DataFrames by 3x-8x.
Key Takeaway: ZooPipe's "Python-First Architecture" with parallel streaming (
PipeManager) avoids the serialization overhead that cripples Polars/Pandas when using Python UDFs (map_elements/apply), and uses 97% less RAM.
⚖️ Is this unfair to Pandas/Polars?
Yes and No.
- Unfair: If your workload is purely analytical (e.g.,
GROUP BY,SUM,JOIN), Polars and Pandas will likely destroy ZooPipe because they can use vectorized C/Rust operations on whole columns at once. - Fair: In real-world Data Engineering, many pipelines are "chaotic". They require custom hashing, API calls per row, conditional normalization, or complex Pydantic validation. In these "Python-UDF heavy" scenarios, vectorization breaks down, and ZooPipe shines by orchestrating parallel Python execution efficiently without the DataFrame overhead.
❓ When to use what?
| Use ZooPipe When... | Use Pandas / Polars When... |
|---|---|
| 🏗️ You have complex, custom Python logic per row (hash, clean, validate). | 🧮 You are doing aggregations (SUM, AVG) or Relational Algebra (JOIN, GROUP BY). |
| 🔄 You are processing streaming data or files larger than RAM. | 💾 Your dataset fits comfortably in RAM (or use LazyFrames). |
| 🛡️ You need strict schema validation (Pydantic) and error handling. | 🔬 You are doing data exploration or statistical analysis. |
| 🚀 You want to mix Rust I/O performance with Python flexibility. | ⚡ Your entire pipeline can be expressed in vectorized expressions. |
🚀 Quick Start
Installation
Using uv (recommended):
Or using pip:
From source:
Simple Example
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, CSVOutputAdapter, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter("processed_users.csv"),
error_output_adapter=CSVOutputAdapter("errors.csv"),
schema_model=UserSchema,
)
# Run the pipe (streaming processing)
pipe.run()
print(f"Finished! Processed {pipe.report.total_processed} items.")
Automatically split large files or manage multiple independent workflows:
from zoopipe import PipeManager, MultiProcessEngine
# Create your pipe as usual (Pipe is purely declarative)
pipe = Pipe(...)
# Automatically parallelize across 4 workers
# MultiProcessEngine() for local, RayEngine() or DaskEngine() for clusters
manager = PipeManager.parallelize_pipe(
pipe,
workers=4,
engine=MultiProcessEngine()
)
# Start, wait, and coordinate (e.g. merge files) automatically
manager.run()
📚 Documentation
Core Concepts
Hooks
Hooks are Python classes that allow you to intercept, transform, and enrich data at different stages of the pipeline.
📘 Read the full Hooks Guide to learn about lifecycle methods (setup, execute, teardown), state management, and advanced patterns like cursor pagination.
Quick Example
from zoopipe import BaseHook
class MyHook(BaseHook):
def execute(self, entries, store):
for entry in entries:
entry["raw_data"]["checked"] = True
return entries
[!IMPORTANT] If you are using a
schema_model, the pipeline will output the contents ofvalidated_datafor successful records. - To modify data before validation, usepre_validation_hooksand modifyentry["raw_data"]. - To modify data after validation (and ensure it reaches the output), usepost_validation_hooksand modifyentry["validated_data"].
Executors
Executors control how ZooPipe scales up within a single node using Rust-managed threads. They are the engine under the hood that drives high throughput.
📘 Read the full Executors Guide to understand the difference between SingleThreadExecutor (debug/ordered) and MultiThreadExecutor (high-throughput).
Input/Output Adapters
File Formats
- CSV Adapters - High-performance CSV reading and writing
- JSON Adapters - JSONL and JSON array format support
- Excel Adapters - Read and write Excel (.xlsx) files
- Parquet Adapters - Columnar storage for analytics and data lakes
- Iceberg Adapters - High-performance Iceberg table reading and writing
- Arrow Adapters - Apache Arrow IPC format for high-throughput interoperability
Databases
- SQL Adapters - Read from and write to SQL databases with batch optimization
- SQL Pagination - High-performance cursor-style pagination for large tables
Messaging Systems
- Kafka Adapters - High-throughput messaging
Advanced
- Python Generator Adapters - In-memory streaming and testing
- Cloud Storage (S3) - Read and write data from Amazon S3 and compatible services
- PipeManager - Run multiple pipes in parallel for distributed processing
- Ray Guide - Zero-config distributed execution on Ray clusters
- Dask Guide - Zero-config distributed execution on Dask clusters
🛠 Architecture
ZooPipe is designed as a thin Python wrapper around a powerful Rust core, featuring a two-tier parallel architecture:
- Orchestration Tier (Python Engines):
- Manage distribution across processes or nodes (e.g.,
MultiProcessEngine). - Handles data sharding, process lifecycle, and metrics aggregation.
- Execution Tier (Rust BatchExecutors):
- Internal Throughput: High-speed processing within a single process.
- Adapters: Native CSV/JSON/SQL Readers and Writers.
- NativePipe: Orchestrates the loop, fetching chunks and routing result batches.
- Executors: Multi-threaded Rust strategies to bypass the GIL within a node.
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.