Arrow Adapters
ZooPipe provides Apache Arrow adapters for working with high-performance columnar data in the Arrow IPC (Feather) format.
What is Apache Arrow?
Apache Arrow is a cross-language development platform for in-memory columnar data. It provides:
- Fast Reads: Optimized memory access
- Columnar Format: Optimized for analytical operations
- Interoperability: Share data between Python, Rust, R, Java, etc. without copying
- Efficient Compression: Built-in compression algorithms
- Type Safety: Rich type system with nested and complex types
ArrowInputAdapter
Read data from Arrow IPC files (also known as Feather v2 files).
Basic Usage
from zoopipe import ArrowInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=ArrowInputAdapter("data.arrow"),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
Parameters
- source (
str | pathlib.Path): Path to the Arrow IPC file to read - Supports
.arrow,.feather, or.ipcextensions -
Files created by Pandas, Polars, or other Arrow-compatible tools
-
generate_ids (
bool, default=True): Whether to generate UUIDs for each record
Reading Pandas-Generated Arrow Files
import pandas as pd
df = pd.DataFrame({
'user_id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35]
})
df.to_feather('users.arrow')
from zoopipe import ArrowInputAdapter, CSVOutputAdapter, Pipe
pipe = Pipe(
input_adapter=ArrowInputAdapter("users.arrow"),
output_adapter=CSVOutputAdapter("users.csv"),
)
with pipe:
pipe.wait()
Performance Characteristics
- Memory Efficient: Using memory mapping when possible
- Columnar Reading: Efficient batch processing
- Compression: Automatic decompression (LZ4, ZSTD)
- Type Preservation: Rich type system conversion to Python
- Throughput: Very high (~1M+ rows/s) due to optimized design
ArrowOutputAdapter
Write data to Arrow IPC files for efficient storage and interoperability.
Basic Usage
from zoopipe import ArrowOutputAdapter, CSVInputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=ArrowOutputAdapter("data.arrow"),
)
Parameters
- output (
str | pathlib.Path): Path to the Arrow IPC file to write - Parent directories are automatically created if they don't exist
- Output is compatible with Pandas, Polars, and other Arrow tools
Writing for Pandas Consumption
from zoopipe import ArrowOutputAdapter, JSONInputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=ArrowOutputAdapter("data.arrow"),
)
with pipe:
pipe.wait()
import pandas as pd
df = pd.read_feather("data.arrow")
print(df.head())
Complete Examples
CSV to Arrow Conversion
import time
from pydantic import BaseModel, ConfigDict
from zoopipe import ArrowOutputAdapter, CSVInputAdapter, MultiThreadExecutor, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=ArrowOutputAdapter("users.arrow"),
schema_model=UserSchema,
executor=MultiThreadExecutor(max_workers=4),
)
pipe.start()
while not pipe.report.is_finished:
print(
f"Processed: {pipe.report.total_processed} | "
f"Speed: {pipe.report.items_per_second:.2f} rows/s"
)
time.sleep(0.5)
print(f"Wrote {pipe.report.total_processed} records to Arrow format")
Arrow to JSONL Export
from zoopipe import ArrowInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=ArrowInputAdapter("processed_data.arrow"),
output_adapter=JSONOutputAdapter("export.jsonl", format="jsonl"),
)
with pipe:
pipe.wait()
Multi-Stage Pipeline with Arrow Intermediate
from zoopipe import ArrowInputAdapter, ArrowOutputAdapter, CSVInputAdapter, JSONOutputAdapter, MultiThreadExecutor, Pipe
stage1 = Pipe(
input_adapter=CSVInputAdapter("raw_data.csv"),
output_adapter=ArrowOutputAdapter("intermediate.arrow"),
executor=MultiThreadExecutor(max_workers=4),
)
with stage1:
stage1.wait()
stage2 = Pipe(
input_adapter=ArrowInputAdapter("intermediate.arrow"),
output_adapter=JSONOutputAdapter("final.jsonl", format="jsonl"),
)
with stage2:
stage2.wait()
Arrow Format Benefits
When to Use Arrow
- Interoperability: Share data between Python, Rust, R, etc.
- Performance: Fast reads for analytical workloads
- Type Safety: Rich type system preserves schema
- Compression: Efficient storage with built-in compression
- Analytics: Optimized for columnar operations
Format Comparison
| Feature | Arrow | Parquet | CSV | JSONL |
|---|---|---|---|---|
| Read Speed | ✅✅ Fastest | ✅ Fast | ⚠️ Moderate | ⚠️ Moderate |
| Write Speed | ✅✅ Fastest | ⚠️ Slow | ✅ Fast | ✅ Fast |
| Compression | ✅ Good | ✅✅ Best | ❌ No | ❌ No |
| Schema | ✅ Rich | ✅ Rich | ❌ No | ⚠️ Inferred |
| Streaming | ✅ Yes | ⚠️ Limited | ✅ Yes | ✅ Yes |
| Low Overhead | ✅ Yes | ❌ No | ❌ No | ❌ No |
| Type Safety | ✅✅ Full | ✅✅ Full | ❌ No | ⚠️ Basic |
Use Arrow when: - You need maximum read/write performance - Sharing data between different languages/tools - Working with analytical libraries (Pandas, Polars, Dask) - Type preservation is important
Use Parquet when: - Long-term archival storage (better compression) - Sharing data across organizations (more portable) - Need predicate pushdown for large files
Use CSV/JSONL when: - Human readability is required - Working with external systems that don't support Arrow - Simple data structures without nested types
Integration Examples
With Pandas
import pandas as pd
from zoopipe import ArrowOutputAdapter, CSVInputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=ArrowOutputAdapter("data.arrow"),
)
with pipe:
pipe.wait()
df = pd.read_feather("data.arrow")
df['processed'] = df['value'] * 2
df.to_feather("processed.arrow")
pipe2 = Pipe(
input_adapter=ArrowInputAdapter("processed.arrow"),
output_adapter=CSVOutputAdapter("result.csv"),
)
with pipe2:
pipe2.wait()
With Polars
import polars as pl
from zoopipe import ArrowOutputAdapter, JSONInputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=ArrowOutputAdapter("data.arrow"),
)
with pipe:
pipe.wait()
df = pl.read_ipc("data.arrow")
result = df.filter(pl.col("age") > 18)
result.write_ipc("filtered.arrow")
Best Practices
For Reading
- Leverage Memory Mapping: Arrow reading is extremely fast due to memory mapping
- Type Awareness: Arrow preserves complex types (lists, structs, dates)
- Batch Processing: Use with
MultiThreadExecutorfor parallel processing - Memory Efficient: Streaming reads keep memory usage constant
For Writing
- Use for Intermediate Storage: Arrow is perfect for pipeline stages
- Compression: Arrow automatically compresses data
- Interop: Output is compatible with all Arrow-based tools
- Performance: Fastest write format available in ZooPipe
Advanced Patterns
Data Lake Export
from pathlib import Path
from zoopipe import Pipe, MultiThreadExecutor, SQLInputAdapter, ArrowOutputAdapter
tables = ['users', 'orders', 'products']
for table in tables:
pipe = Pipe(
input_adapter=SQLInputAdapter(
"postgresql://user:pass@localhost/db",
table_name=table
),
output_adapter=ArrowOutputAdapter(f"data_lake/{table}.arrow"),
executor=MultiThreadExecutor(max_workers=8),
)
with pipe:
pipe.wait()
print(f"Exported {table} to Arrow")
High-Performance ETL
from zoopipe import Pipe, MultiThreadExecutor, CSVInputAdapter, ArrowInputAdapter, ArrowOutputAdapter
extract_pipe = Pipe(
input_adapter=CSVInputAdapter("raw_data.csv"),
output_adapter=ArrowOutputAdapter("staging.arrow"),
executor=MultiThreadExecutor(max_workers=8, batch_size=5000),
)
with extract_pipe:
extract_pipe.wait()
Cross-Language Workflow
from zoopipe import ArrowOutputAdapter, CSVInputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("python_data.csv"),
output_adapter=ArrowOutputAdapter("shared_data.arrow"),
)
with pipe:
pipe.wait()
Then in R:
library(arrow)
data <- read_feather("shared_data.arrow")
processed <- data %>% filter(age > 18)
write_feather(processed, "r_processed.arrow")
Back in Python:
pipe2 = Pipe(
input_adapter=ArrowInputAdapter("r_processed.arrow"),
output_adapter=JSONOutputAdapter("final.jsonl", format="jsonl"),
)
with pipe2:
pipe2.wait()
Error Handling
try:
pipe = Pipe(
input_adapter=ArrowInputAdapter("data.arrow"),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
pipe.start()
except Exception as e:
print(f"Error: {e}")
Common errors: - Invalid Arrow File: Corrupted or non-Arrow file - Schema Incompatibility: Type conversion issues - Memory Limits: File too large for available memory - Permission Denied: Can't read input or write output
Native Compression
Arrow IPC files are designed for zero-copy access and typically use internal LZ4 or Zstandard compression. ZooPipe leverages these native format features. User-level transparent compression (e.g., .arrow.gz) is not recommended and not explicitly supported for the IPC format to maintain its performance advantages.
Performance Tips
- Format Advantage: Arrow is the fastest format for read operations
- Batch Size: Larger batches (5000-10000) work well with Arrow
- Multi-Threading: Always use
MultiThreadExecutorfor large Arrow files - Compression: Arrow automatically uses LZ4 compression for optimal balance
- Memory Mapping: Arrow reader uses memory mapping for efficient access
- Type Conversion: Minimal overhead converting Arrow types to Python
Schema Preservation
Arrow preserves complex schemas that other formats lose:
from zoopipe import ArrowOutputAdapter, CSVInputAdapter, Pipe
pipe = Pipe(
input_adapter=ArrowInputAdapter("complex_data.arrow"),
output_adapter=ArrowOutputAdapter("validated_data.arrow"),
)
Arrow preserves: - Integer types (int8, int16, int32, int64, uint) - Floating point (float32, float64) - Temporal types (date, time, timestamp with timezone) - Nested types (lists, structs) - Nullable vs non-nullable columns