JSON Adapters
ZooPipe provides Rust-based JSON readers and writers optimized for both JSONL (newline-delimited JSON) and JSON array formats.
JSONInputAdapter
Read JSONL files with automatic line-by-line streaming.
Basic Usage
from zoopipe import CSVOutputAdapter, JSONInputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=CSVOutputAdapter("output.csv"),
)
Parameters
- source (
str | pathlib.Path): Path to the JSONL file to read - Each line must contain a valid JSON object
- Blank lines are skipped automatically
JSONL Format
The adapter expects newline-delimited JSON (JSONL) format:
{"user_id": "1", "name": "Alice", "email": "alice@example.com"}
{"user_id": "2", "name": "Bob", "email": "bob@example.com"}
{"user_id": "3", "name": "Charlie", "email": "charlie@example.com"}
Each line is parsed independently, enabling streaming of arbitrarily large files.
Performance Characteristics
- Streaming: Constant memory usage regardless of file size
- Line-by-line Parsing: Each JSON object is parsed independently
- Error Handling: Invalid JSON lines trigger clear error messages
- Throughput: Similar to CSV (~200k+ rows/s)
- Hybrid I/O Strategy: Restores performance for local files by using synchronous reads and background threads for S3 streams.
JSONOutputAdapter
Write data to JSON files in either JSONL or pretty-printed array format.
Basic Usage (JSONL)
from zoopipe import CSVInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
Parameters
- output (
str | pathlib.Path): Path to the JSON file to write -
Parent directories are automatically created if they don't exist
-
format (
str, default="array"): Output format "jsonl": Newline-delimited JSON (one object per line)-
"array": JSON array with all objects in a single array -
indent (
int | None, default=None): Indentation level for pretty-printing None: Compact, single-line output2,4, etc.: Pretty-printed with specified indent
JSONL Format (Recommended)
Output:
{"user_id":"1","name":"Alice","email":"alice@example.com"}
{"user_id":"2","name":"Bob","email":"bob@example.com"}
JSONL is ideal for: - Large datasets (streaming-friendly) - Log files and data pipelines - Line-by-line processing tools
Array Format
Output:
[{"user_id":"1","name":"Alice","email":"alice@example.com"},{"user_id":"2","name":"Bob","email":"bob@example.com"}]
Pretty-Printed Array
Output:
[
{
"user_id": "1",
"name": "Alice",
"email": "alice@example.com"
},
{
"user_id": "2",
"name": "Bob",
"email": "bob@example.com"
}
]
Complete Examples
CSV to JSONL Conversion
import time
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, JSONOutputAdapter, MultiThreadExecutor, Pipe
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=JSONOutputAdapter("users.jsonl", format="jsonl"),
schema_model=UserSchema,
executor=MultiThreadExecutor(max_workers=4),
)
pipe.start()
while not pipe.report.is_finished:
print(f"Processed: {pipe.report.total_processed} rows")
time.sleep(0.5)
JSONL to Pretty JSON
from zoopipe import JSONInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=JSONOutputAdapter("data.json", format="array", indent=2),
)
with pipe:
pipe.wait()
JSONL Validation and Filtering
from pydantic import BaseModel, field_validator
class ValidatedRecord(BaseModel):
id: str
value: float
@field_validator('value')
def positive_value(cls, v):
if v <= 0:
raise ValueError("value must be positive")
return v
pipe = Pipe(
input_adapter=JSONInputAdapter("raw_data.jsonl"),
output_adapter=JSONOutputAdapter("valid_data.jsonl", format="jsonl"),
error_output_adapter=JSONOutputAdapter("errors.jsonl", format="jsonl"),
schema_model=ValidatedRecord,
)
Format Comparison
| Feature | JSONL | Array | Pretty Array |
|---|---|---|---|
| File Size | Smallest | Medium | Largest |
| Streamable | ✅ Yes | ❌ No | ❌ No |
| Human-Readable | ⚠️ Moderate | ✅ Yes | ✅✅ Very |
| Memory Usage | Constant | High* | High* |
| Line-by-Line Tools | ✅ Yes | ❌ No | ❌ No |
| API Response | ❌ No | ✅ Yes | ✅ Yes |
* Array formats load entire file into memory
Best Practices
For Reading
- Validate JSONL Format: Ensure each line contains a complete JSON object
- Handle Encoding: Files should be UTF-8 encoded
- Use Error Output: Route malformed JSON to error output for debugging
- Large Files: JSONL format enables streaming of arbitrarily large datasets
For Writing
- Choose JSONL for Data Pipelines: Streaming-friendly and most efficient
- Choose Array for APIs: Better for small datasets and web services
- Use Indent for Debugging: Pretty-print during development, compact in production
- Field Ordering: JSON objects have consistent field ordering (sorted alphabetically)
Common Patterns
API Data Export
from zoopipe import JSONInputAdapter, JSONOutputAdapter, Pipe, SQLInputAdapter
pipe = Pipe(
input_adapter=SQLInputAdapter(
"postgresql://user:pass@localhost/db",
query="SELECT id, name, email FROM users LIMIT 1000"
),
output_adapter=JSONOutputAdapter("api_export.json", format="array", indent=2),
)
Log File Processing
from zoopipe import CSVOutputAdapter, JSONInputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("application.log.jsonl"),
output_adapter=CSVOutputAdapter("log_summary.csv"),
)
Data Migration
from zoopipe import JSONInputAdapter, MultiThreadExecutor, Pipe, SQLOutputAdapter
pipe = Pipe(
input_adapter=JSONInputAdapter("legacy_data.jsonl"),
output_adapter=SQLOutputAdapter(
"sqlite:///new_database.db",
table_name="migrated_data",
mode="replace"
),
executor=MultiThreadExecutor(max_workers=4),
)
Error Handling
try:
pipe = Pipe(
input_adapter=JSONInputAdapter("data.jsonl"),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
pipe.start()
except Exception as e:
print(f"Error: {e}")
Common errors: - Invalid JSON: Malformed JSON object on a specific line - File Not Found: Input file doesn't exist - Permission Denied: Can't read input or write output - Encoding Error: Non-UTF-8 characters in file
Compression
Similar to CSV, JSON adapters support transparent compression and decompression for JSONL files based on file extension:
.jsonl.gz: Gzip compression.jsonl.zst: Zstandard compression
pipe = Pipe(
input_adapter=JSONInputAdapter("logs.jsonl.zst"),
output_adapter=JSONOutputAdapter("archived.jsonl.gz", format="jsonl"),
)
Performance Tips
- JSONL vs Array: JSONL is 2-3x faster for large datasets
- Compact Output: Avoid
indentin production for better performance - Batch Size: Default 2000 rows works well for most JSONL files
- Multi-Threading: Use
MultiThreadExecutorfor files > 50MB - Memory: JSONL maintains constant memory usage; Array format loads entire output into memory
Integration Examples
With Pandas
import pandas as pd
from zoopipe import JSONInputAdapter, JSONOutputAdapter, Pipe
pipe = Pipe(
input_adapter=JSONInputAdapter("input.jsonl"),
output_adapter=JSONOutputAdapter("filtered.jsonl", format="jsonl"),
)
with pipe:
pipe.wait()
df = pd.read_json("filtered.jsonl", lines=True)
print(df.head())
With jq (Command Line)
# Process JSONL output with jq
cat output.jsonl | jq '.email'
# Filter and transform
cat output.jsonl | jq 'select(.age > 18) | {id, name}'