CSV Adapters
ZooPipe provides ultra-fast CSV readers and writers built entirely in Rust. These adapters are optimized for maximum throughput and minimal memory overhead.
CSVInputAdapter
Read CSV files with configurable delimiters, quotes, and field handling.
Basic Usage
from zoopipe import CSVInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
Parameters
-
source (
str | pathlib.Path): Path to the CSV file to read -
delimiter (
str, default=","): Field delimiter character -
Common values:
,(comma),\t(tab),;(semicolon),|(pipe) -
quotechar (
str, default="\""): Quote character for escaping fields -
Used when fields contain the delimiter or newlines
-
skip_rows (
int, default=0): Number of rows to skip before reading headers -
Useful for skipping metadata or comment lines at the top of the file
-
fieldnames (
list[str] | None, default=None): Custom field names - If
None, uses first row as headers -
If provided, treats first row as data
-
generate_ids (
bool, default=True): Whether to generate UUIDs for each record
Custom Delimiters
tab_adapter = CSVInputAdapter(
"data.tsv",
delimiter="\t"
)
semicolon_adapter = CSVInputAdapter(
"data.csv",
delimiter=";"
)
Skip Header Rows
Custom Field Names
CSVOutputAdapter
Write data to CSV files with high performance batch operations.
Basic Usage
from zoopipe import CSVInputAdapter, CSVOutputAdapter, Pipe, JSONInputAdapter
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=CSVOutputAdapter("output.csv"),
)
Parameters
- output (
str | pathlib.Path): Path to the CSV file to write -
Parent directories are automatically created if they don't exist
-
delimiter (
str, default=","): Field delimiter character -
quotechar (
str, default="\""): Quote character for escaping fields -
fieldnames (
list[str] | None, default=None): Explicit field ordering - If
None, field names are inferred from the first record and sorted alphabetically - If provided, only these fields are written in the specified order
Custom Field Order
adapter = CSVOutputAdapter(
"output.csv",
fieldnames=["user_id", "username", "email", "created_at"]
)
This ensures the CSV columns appear in the exact order specified, regardless of the order in the input data.
Complete Example
CSV Processing with Validation
import time
from pydantic import BaseModel, ConfigDict, EmailStr
from zoopipe import CSVInputAdapter, CSVOutputAdapter, MultiThreadExecutor, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: EmailStr
age: int
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=CSVOutputAdapter(
"validated_users.csv",
fieldnames=["user_id", "username", "email", "age"]
),
error_output_adapter=CSVOutputAdapter("errors.csv"),
schema_model=UserSchema,
executor=MultiThreadExecutor(max_workers=8, batch_size=2000),
)
pipe.start()
while not pipe.report.is_finished:
print(
f"Processed: {pipe.report.total_processed} | "
f"Speed: {pipe.report.items_per_second:.2f} rows/s | "
f"Errors: {pipe.report.total_errors}"
)
time.sleep(0.5)
print(f"Finished! Processed {pipe.report.total_processed} records")
TSV to CSV Conversion
from zoopipe import CSVInputAdapter, CSVOutputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("data.tsv", delimiter="\t"),
output_adapter=CSVOutputAdapter("data.csv", delimiter=","),
)
with pipe:
pipe.wait()
Performance Characteristics
Reading
- 100% Rust Implementation: Zero Python overhead during parsing
- Streaming: Constant memory usage regardless of file size
- Type Handling: All fields are read as strings (type conversion handled by Pydantic)
- Quote Handling: Proper RFC 4180 CSV escaping and unescaping
- Hybrid I/O Strategy: Uses synchronous read for local files to restore high performance, and background thread streaming for remote S3 files to prevent GIL blocking.
Writing
- Batch Operations: Efficient buffered writes
- Automatic Quoting: Fields containing delimiters or newlines are automatically quoted
- Directory Creation: Parent directories are created automatically
- Field Ordering: Consistent column ordering via sorted or explicit fieldnames
Best Practices
For Reading
- Use
skip_rowsto ignore metadata lines at the top of files - Specify
fieldnamesexplicitly if your CSV doesn't have headers - Use the default delimiter (
,) when possible for maximum performance - Let Pydantic handle type conversion instead of pre-processing
For Writing
- Specify
fieldnamesexplicitly for consistent column ordering - Use
MultiThreadExecutorfor large datasets - Choose appropriate
delimiterbased on your data (avoid delimiters that appear in values) - Use the error output to route invalid records for later review
Common Patterns
Data Cleaning Pipeline
from pydantic import BaseModel, field_validator
class CleanedData(BaseModel):
name: str
email: str
@field_validator('email')
def lowercase_email(cls, v):
return v.lower()
pipe = Pipe(
input_adapter=CSVInputAdapter("raw_data.csv"),
output_adapter=CSVOutputAdapter("cleaned_data.csv"),
error_output_adapter=CSVOutputAdapter("rejected_data.csv"),
schema_model=CleanedData,
)
Merging CSV Files
from pathlib import Path
from zoopipe import CSVInputAdapter, CSVOutputAdapter, Pipe
for i, csv_file in enumerate(Path("input_dir").glob("*.csv")):
pipe = Pipe(
input_adapter=CSVInputAdapter(csv_file),
output_adapter=CSVOutputAdapter(
"merged_output.csv",
fieldnames=["id", "name", "value"]
),
)
with pipe:
pipe.wait()
Format Standardization
pipe = Pipe(
input_adapter=CSVInputAdapter(
"messy_data.csv",
delimiter=";",
quotechar="'",
skip_rows=2
),
output_adapter=CSVOutputAdapter(
"standard_data.csv",
delimiter=",",
quotechar='"'
),
)
Error Handling
CSV adapters provide clear error messages for common issues:
try:
pipe = Pipe(
input_adapter=CSVInputAdapter("nonexistent.csv"),
output_adapter=CSVOutputAdapter("output.csv"),
)
pipe.start()
except Exception as e:
print(f"Error: {e}")
Common errors: - File not found - Permission denied - Invalid UTF-8 encoding - Malformed CSV (unclosed quotes, inconsistent columns)
Compression
ZooPipe supports transparent compression and decompression for CSV files. The compression format is automatically inferred from the file extension:
filename.csv.gz: Gzip compressionfilename.csv.zst: Zstandard compression (recommended for speed)
# Automatic decompression for input and compression for output
pipe = Pipe(
input_adapter=CSVInputAdapter("large_data.csv.zst"),
output_adapter=CSVOutputAdapter("output.csv.gz"),
)
Performance Tips
- Use MultiThreadExecutor: For files > 10MB, multi-threading provides significant speedup
- Batch Size: Default 2000 is optimal for most use cases
- Memory Usage: Constant ~50-100MB regardless of file size due to streaming
- SSD vs HDD: CSV reading is I/O bound, SSD provides 3-5x better performance
- Compression: Use
.zst(Zstandard) for a good balance of speed and size. ZooPipe handles it natively.