SQL Adapters
ZooPipe provides high-performance SQL database adapters built on top of SQLx, a pure Rust SQL toolkit. These adapters enable efficient reading from and writing to SQL databases with optimized batch operations.
Supported Databases
Through SQLx's Any driver, ZooPipe supports:
- SQLite (most commonly used)
- PostgreSQL
- MySQL
- MariaDB
SQLInputAdapter
Read data from SQL databases using either table names or custom queries.
Basic Usage
from zoopipe import JSONOutputAdapter, Pipe, SQLInputAdapter
pipe = Pipe(
input_adapter=SQLInputAdapter(
uri="sqlite:///path/to/database.db",
table_name="users"
),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
Custom Queries
input_adapter = SQLInputAdapter(
uri="postgresql://user:password@localhost/mydb",
query="SELECT id, name, email FROM users WHERE active = true"
)
Parameters
- uri (
str): Database connection string - SQLite:
sqlite:///path/to/db.dborsqlite::memory:for in-memory - PostgreSQL:
postgresql://user:password@host:port/database -
MySQL:
mysql://user:password@host:port/database -
query (
str | None): Custom SQL query to execute - Mutually exclusive with
table_name -
Allows filtering, joins, and complex queries
-
table_name (
str | None): Name of the table to read from - Mutually exclusive with
query -
Creates a simple
SELECT * FROM table_namequery -
generate_ids (
bool, default=True): Whether to generate UUIDs for each record
Connection URI Features
SQLite URIs support additional parameters:
-mode=rwc: Read-write-create mode (creates database if it doesn't exist)
- Parent directories are automatically created if needed
Performance Characteristics
- Streaming row-by-row processing (low memory footprint)
- Asynchronous data fetching using Tokio
- Single database connection per reader
- Type conversion from SQL to Python types (String, Int, Float, Bool)
- NULL values are properly handled and mapped to Python
None - Support for SQL Pagination via specialized adapter.
SQLPaginationInputAdapter
The SQLPaginationInputAdapter is a specialized adapter designed for high-performance, memory-efficient data ingestion from large SQL tables using cursor-style pagination.
How it Works
Unlike standard SQL readers that fetch all rows in a single stream, the pagination adapter:
1. Chunks Data: Iterates through the table in configurable chunk sizes using a primary key or indexed column (e.g., id).
2. Anchor-Based Fetching: For each chunk, it first fetches the "anchors" (the IDs) and then uses an SQLExpansionHook to retrieve the full record data.
3. Resilient Execution: Smaller batches reduce the risk of long-running transaction timeouts and allow for better progress tracking.
Basic Usage
from zoopipe import Pipe, SQLPaginationInputAdapter, JSONOutputAdapter
pipe = Pipe(
input_adapter=SQLPaginationInputAdapter(
table_name="large_events",
id_column="event_id",
chunk_size=10000,
connection_factory=lambda: my_db_connection()
),
output_adapter=JSONOutputAdapter("events.jsonl")
)
Parameters
- table_name (
str): Name of the table to process. - id_column (
str): The primary key or indexed column to use for pagination. - chunk_size (
int): Number of records to process per iteration. - connection_factory (
Callable): A function that returns a new database connection for the expansion hook.
SQLExpansionHook
This hook is automatically integrated by the SQLPaginationInputAdapter. It receives the batch of IDs and executes an optimized SELECT * FROM table WHERE id IN (...) query to hydrate the records before they reach your validation schema or next processing stage.
SQLOutputAdapter
Write data to SQL databases with optimized batch insert operations.
Basic Usage
from zoopipe import CSVInputAdapter, Pipe, SQLOutputAdapter
pipe = Pipe(
input_adapter=CSVInputAdapter("input.csv"),
output_adapter=SQLOutputAdapter(
uri="sqlite:///output.db",
table_name="processed_data",
mode="replace"
),
)
Parameters
-
uri (
str): Database connection string (same format as SQLInputAdapter) -
table_name (
str): Name of the table to write to -
mode (
str, default="replace"): Write mode behavior "replace": Drop existing table and create new one"append": Append to existing table (create if doesn't exist)"fail": Raise error if table already exists
Batch Insert Optimization
The SQLWriter implements high-performance batch operations:
- PostgreSQL: Automatically uses the native binary
COPYprotocol, providing 5-10x faster writes compared toINSERT. - Other Databases: Uses optimized chunked
INSERTstatements. - Batch Size: Default 500 rows per batch (can be increased for higher throughput).
- Transaction: All operations are wrapped in a single transaction.
- Automatic Chunking: Large datasets are automatically split into optimal chunks
- Optimized Design: Minimizes data copying overhead between Python and Rust
Performance Example
from zoopipe import CSVInputAdapter, MultiThreadExecutor, Pipe, SQLOutputAdapter
pipe = Pipe(
input_adapter=CSVInputAdapter("large_dataset.csv"),
output_adapter=SQLOutputAdapter(
uri="sqlite:///output.db?mode=rwc",
table_name="records",
mode="replace"
),
executor=MultiThreadExecutor(max_workers=4, batch_size=2000),
)
This will: 1. Read CSV in parallel batches of 2000 rows 2. Process through Pydantic validation 3. Write to SQLite in optimized batches of 500 rows per INSERT 4. All within a single database transaction
Schema Inference
Table schemas are automatically inferred from the first record:
- All columns are created as TEXT type
- Column names are sorted alphabetically
- Schema is locked after first write
Transaction Behavior
- Each call to
write_batch()uses a single transaction - If any batch fails, the entire transaction is rolled back
- Ensures data consistency and atomicity
Complete Example
CSV to SQL with Validation
import os
import time
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, MultiThreadExecutor, Pipe, SQLOutputAdapter
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
db_path = os.path.abspath("users.db")
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=SQLOutputAdapter(
f"sqlite:{db_path}?mode=rwc",
table_name="users",
mode="replace",
),
schema_model=UserSchema,
executor=MultiThreadExecutor(max_workers=4),
)
pipe.start()
while not pipe.report.is_finished:
print(
f"Processed: {pipe.report.total_processed} | "
f"Speed: {pipe.report.items_per_second:.2f} rows/s"
)
time.sleep(0.5)
print(f"Finished! Wrote {pipe.report.total_processed} records to database")
SQL to JSONL Export
from zoopipe import JSONOutputAdapter, Pipe, SQLInputAdapter
pipe = Pipe(
input_adapter=SQLInputAdapter(
"postgresql://user:pass@localhost/mydb",
query="SELECT * FROM users WHERE created_at > NOW() - INTERVAL '7 days'"
),
output_adapter=JSONOutputAdapter("recent_users.jsonl", format="jsonl"),
)
with pipe:
pipe.wait()
Best Practices
For Reading
- Use specific queries instead of
SELECT *when possible - Add indexes on frequently queried columns
- Consider pagination for very large datasets
- Use read-only database connections when appropriate
For Writing
- Use
MultiThreadExecutorfor large datasets to maximize throughput - Choose appropriate batch sizes based on your data size
- Use
mode="replace"for complete data refreshes - Use
mode="append"for incremental updates - Ensure database has sufficient disk space for write operations
Error Handling
SQL adapters provide clear error messages for common issues:
try:
pipe = Pipe(
input_adapter=SQLInputAdapter(
uri="sqlite:///nonexistent.db",
table_name="users"
),
output_adapter=JSONOutputAdapter("output.jsonl"),
)
pipe.start()
except RuntimeError as e:
print(f"Database error: {e}")
Common errors:
- Connection failed: Invalid URI or database not accessible
- Query failed: SQL syntax error or table doesn't exist
- Batch insert failed: Constraint violation or disk full
- Failed to commit transaction: Transaction conflict or lock timeout
Performance Tips
- Connection Pooling: Each reader/writer uses a dedicated connection
- Batch Size: Default 500 rows per INSERT is optimized for most use cases
- Transactions: All writes in a single transaction for consistency and speed
- Type Conversion: Minimal overhead with direct Rust-to-Python type mapping
- Memory: Streaming architecture keeps memory usage constant regardless of dataset size