Iceberg Adapters
ZooPipe provides high-performance Iceberg adapters for working with data lakes. It supports parallel writing to Iceberg tables and high-speed parallel reading with automatic file discovery.
IcebergOutputAdapter
Write data to an Iceberg table using a high-performance Rust-powered Parquet writer and atomic commits.
Basic Usage
from pydantic import BaseModel
from zoopipe import CSVInputAdapter, IcebergOutputAdapter, Pipe
class UserSchema(BaseModel):
user_id: str
username: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=IcebergOutputAdapter(
table_location="examples/output_data/iceberg_table",
catalog_properties={"type": "hadoop"}
),
schema_model=UserSchema,
)
pipe.run()
Parameters
- table_location (
str): The root directory of the Iceberg table. - catalog_properties (
dict, optional): Properties for the Iceberg catalog. Defaults to{"type": "hadoop"}.
Features
- High-Speed Ingestion: Utilizes native Rust Parquet writers (~450k+ rows/s).
- Atomic Commits: Uses the
IcebergCoordinatorto ensure that results from all parallel workers are committed atomically in a single transaction. - Automatic Schema Inference: Infers the Iceberg schema from the first batch of data.
- Metadata Management: Automatically generates
metadata.jsonandversion-hint.textin themetadata/directory.
IcebergInputAdapter
Read data from an Iceberg table with efficient parallel sharding and file discovery.
Basic Usage
from zoopipe import IcebergInputAdapter, JSONOutputAdapter, Pipe, PipeManager
from zoopipe.engines.zoosync import ZoosyncPoolEngine
pipe = Pipe(
input_adapter=IcebergInputAdapter("examples/output_data/iceberg_table"),
output_adapter=JSONOutputAdapter("users.jsonl", format="jsonl"),
)
# Parallel reading across 4 workers
with PipeManager.parallelize_pipe(
pipe, workers=4, engine=ZoosyncPoolEngine()
) as manager:
manager.run()
Parameters
- table_location (
str): The root directory of the Iceberg table. - generate_ids (
bool, default=True): Whether to generate unique IDs for each record. - batch_size (
int, default=1024): Number of rows to read per batch.
Features
- Automatic File Discovery: Scans the
data/directory of the Iceberg table for Parquet files using a fast Rust implementation. - Intelligent Sharding: Automatically distributes files among available workers for maximum parallelism.
- Multi-File Sequential Reading: Each worker uses the
MultiParquetReader(Rust) to stream through its assigned files without overhead. - Extreme Throughput: Capable of scanning Iceberg tables at over 1,000,000 rows/s.
Performance Comparison
| Format | Write Speed (rows/s) | Read Speed (rows/s) |
|---|---|---|
| CSV | ~350,000 | ~400,000 |
| JSONL | ~250,000 | ~300,000 |
| Parquet | ~400,000 | ~800,000 |
| Iceberg | ~450,000 | ~1,000,000+ |
Benchmarks conducted on local M1 Max with 4 parallel workers.
Best Practices
- Clean Runs: When running examples or tests, ensure you clear the
table_locationif you want a fresh start. - Parallelize: Always use
PipeManager.parallelize_pipewithIcebergInputAdapterto leverage multi-core performance. - Zoosync Engine: Recommended for the best inter-process communication performance.