Python Generator Adapters
ZooPipe provides Python Generator adapters for in-memory streaming and bidirectional data flow. These adapters enable you to read from Python iterables and write to Python generators, perfect for testing, streaming APIs, and in-memory processing.
Overview
Python Generator adapters provide a bridge between ZooPipe's high-performance pipeline and native Python iterables:
- PyGeneratorInputAdapter: Read data from any Python iterable (lists, generators, iterators)
- PyGeneratorOutputAdapter: Write data to a Python generator that you can iterate over
This enables powerful patterns like: - Testing: Create test data on-the-fly without files - Streaming APIs: Process data from API responses in memory - Real-time Processing: Stream data between pipeline stages - In-Memory ETL: Transform data without I/O overhead
PyGeneratorInputAdapter
Read data from any Python iterable as input to your pipeline.
Basic Usage
from zoopipe import CSVOutputAdapter, Pipe, PyGeneratorInputAdapter
data = [
{"user_id": "1", "name": "Alice", "email": "alice@example.com"},
{"user_id": "2", "name": "Bob", "email": "bob@example.com"},
{"user_id": "3", "name": "Charlie", "email": "charlie@example.com"},
]
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(data),
output_adapter=CSVOutputAdapter("output.csv"),
)
with pipe:
pipe.wait()
Parameters
- iterable (
typing.Iterable[typing.Any]): Any Python iterable - Lists:
[{...}, {...}] - Generators:
(item for item in source) - Iterators:
iter(collection) -
Custom iterables: Any object implementing
__iter__ -
generate_ids (
bool, default=True): Whether to generate UUIDs for each record
Reading from Generators
def data_generator():
for i in range(1000):
yield {
"id": str(i),
"value": i * 2,
"category": "even" if i % 2 == 0 else "odd"
}
from zoopipe import JSONOutputAdapter, PyGeneratorInputAdapter, Pipe
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(data_generator()),
output_adapter=JSONOutputAdapter("output.jsonl", format="jsonl"),
)
with pipe:
pipe.wait()
Reading from API Responses
import requests
def fetch_api_data():
response = requests.get("https://api.example.com/users")
for user in response.json():
yield user
from zoopipe import CSVOutputAdapter, PyGeneratorInputAdapter, Pipe
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(fetch_api_data()),
output_adapter=CSVOutputAdapter("api_users.csv"),
)
with pipe:
pipe.wait()
PyGeneratorOutputAdapter
Write pipeline results to a Python generator that you can iterate over.
Basic Usage
from zoopipe import CSVInputAdapter, Pipe, PyGeneratorOutputAdapter
output_adapter = PyGeneratorOutputAdapter(queue_size=100)
pipe = Pipe(
input_adapter=CSVInputAdapter("input.csv"),
output_adapter=output_adapter,
)
pipe.start()
for record in output_adapter:
print(record)
Parameters
- queue_size (
int, default=1000): Internal queue size for buffering - Larger values: Better throughput, higher memory usage
- Smaller values: Lower memory usage, potential backpressure
- Default 1000 is optimal for most use cases
Consuming Results in Real-Time
from pydantic import BaseModel, ConfigDict
from zoopipe import CSVInputAdapter, Pipe, PyGeneratorOutputAdapter
class UserSchema(BaseModel):
model_config = ConfigDict(extra="ignore")
user_id: str
username: str
email: str
output_adapter = PyGeneratorOutputAdapter(queue_size=50)
pipe = Pipe(
input_adapter=CSVInputAdapter("users.csv"),
output_adapter=output_adapter,
schema_model=UserSchema,
)
pipe.start()
for validated_user in output_adapter:
print(f"Processing: {validated_user['username']}")
Iteration Pattern
The PyGeneratorOutputAdapter is both an adapter and an iterator:
output_adapter = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=CSVInputAdapter("data.csv"),
output_adapter=output_adapter,
)
pipe.start()
for item in output_adapter:
if item['value'] > 100:
print(f"High value item: {item}")
Complete Examples
In-Memory Testing
from pydantic import BaseModel, EmailStr
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
class UserSchema(BaseModel):
user_id: str
email: EmailStr
test_data = [
{"user_id": "1", "email": "valid@example.com"},
{"user_id": "2", "email": "invalid-email"},
{"user_id": "3", "email": "another@example.com"},
]
input_adapter = PyGeneratorInputAdapter(test_data)
output_adapter = PyGeneratorOutputAdapter(queue_size=10)
error_adapter = PyGeneratorOutputAdapter(queue_size=10)
pipe = Pipe(
input_adapter=input_adapter,
output_adapter=output_adapter,
error_output_adapter=error_adapter,
schema_model=UserSchema,
)
pipe.start()
valid_records = list(output_adapter)
error_records = list(error_adapter)
print(f"Valid: {len(valid_records)}")
print(f"Errors: {len(error_records)}")
assert len(valid_records) == 2
assert len(error_records) == 1
Streaming API Data
import time
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
def simulated_stream():
for i in range(100):
yield {"id": i, "timestamp": time.time(), "value": i * 10}
time.sleep(0.1)
output_adapter = PyGeneratorOutputAdapter(queue_size=20)
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(simulated_stream()),
output_adapter=output_adapter,
)
pipe.start()
for event in output_adapter:
print(f"Event {event['id']}: {event['value']}")
Multi-Stage In-Memory Pipeline
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
raw_data = [{"id": i, "value": i} for i in range(1000)]
stage1_output = PyGeneratorOutputAdapter(queue_size=100)
stage1 = Pipe(
input_adapter=PyGeneratorInputAdapter(raw_data),
output_adapter=stage1_output,
)
stage1.start()
stage1_results = list(stage1_output)
stage2_output = PyGeneratorOutputAdapter(queue_size=100)
stage2 = Pipe(
input_adapter=PyGeneratorInputAdapter(stage1_results),
output_adapter=stage2_output,
)
stage2.start()
final_results = list(stage2_output)
print(f"Processed {len(final_results)} records through 2 stages")
Use Cases
1. Unit Testing
Create test data without creating files:
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
def test_pipeline_validation():
test_data = [
{"id": "1", "value": "10"},
{"id": "2", "value": "invalid"},
]
output = PyGeneratorOutputAdapter()
errors = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(test_data),
output_adapter=output,
error_output_adapter=errors,
)
pipe.start()
results = list(output)
error_list = list(errors)
assert len(results) == 1
assert len(error_list) == 1
2. Real-Time Streaming
Process streaming data as it arrives:
def websocket_stream():
while True:
message = websocket.receive()
if message is None:
break
yield message
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(websocket_stream()),
output_adapter=output,
)
pipe.start()
for processed_message in output:
send_to_downstream(processed_message)
3. Data Transformation
Transform data between Python objects:
source_data = database.query("SELECT * FROM users")
output = PyGeneratorOutputAdapter(queue_size=500)
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(source_data),
output_adapter=output,
schema_model=UserSchema,
)
pipe.start()
for validated_user in output:
cache.set(validated_user['user_id'], validated_user)
4. API Response Processing
def paginated_api_call():
page = 1
while True:
response = requests.get(f"https://api.example.com/data?page={page}")
data = response.json()
if not data:
break
for item in data:
yield item
page += 1
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(paginated_api_call()),
output_adapter=output,
)
pipe.start()
all_items = list(output)
Best Practices
For PyGeneratorInputAdapter
- Memory Awareness: Large lists consume memory; use generators for large datasets
- Lazy Evaluation: Prefer generators over lists for memory efficiency
- Error Handling: Ensure your iterable doesn't raise exceptions unexpectedly
- Type Consistency: All yielded items should have consistent structure
For PyGeneratorOutputAdapter
- Queue Size: Adjust
queue_sizebased on your memory constraints - Consumption Pattern: Start consuming immediately after
pipe.start() - Backpressure: Smaller queue sizes create backpressure, controlling memory
- Complete Iteration: Always consume the entire generator or pipeline may hang
Performance Characteristics
PyGeneratorInputAdapter
- Zero I/O: No file system overhead
- Memory Bound: Limited by Python object creation
- Throughput: ~100k-500k items/s depending on object complexity
- Lazy: Only materializes data as needed (if using generators)
PyGeneratorOutputAdapter
- Buffered: Internal queue prevents pipeline stalls
- Blocking: Iteration blocks until data is available
- Complete: Iteration ends when pipeline finishes
- Thread-Safe: Can be consumed from same thread as pipeline
Common Patterns
Testing with Fixtures
import pytest
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
@pytest.fixture
def sample_data():
return [
{"id": "1", "value": 100},
{"id": "2", "value": 200},
]
def test_processing(sample_data):
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(sample_data),
output_adapter=output,
)
pipe.start()
results = list(output)
assert len(results) == 2
Infinite Streams
import time
def infinite_sensor_data():
while True:
yield {"timestamp": time.time(), "temperature": random.uniform(20, 30)}
time.sleep(1)
output = PyGeneratorOutputAdapter(queue_size=10)
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(infinite_sensor_data()),
output_adapter=output,
)
pipe.start()
for reading in output:
print(f"Temperature: {reading['temperature']:.2f}°C")
if reading['temperature'] > 28:
print("Warning: High temperature!")
Batch Processing in Memory
def process_in_batches(data, batch_size=1000):
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(batch),
output_adapter=output,
)
pipe.start()
yield from output
large_dataset = [{"id": i} for i in range(10000)]
for processed_item in process_in_batches(large_dataset):
print(processed_item)
Error Handling
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
output = PyGeneratorOutputAdapter()
errors = PyGeneratorOutputAdapter()
try:
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(data_source),
output_adapter=output,
error_output_adapter=errors,
)
pipe.start()
for item in output:
process_item(item)
error_list = list(errors)
if error_list:
print(f"Encountered {len(error_list)} errors")
except Exception as e:
print(f"Pipeline error: {e}")
Common errors:
- Generator Already Exhausted: Can only iterate once over the output
- Pipeline Still Running: Don't consume before calling pipe.start()
- Queue Full: Increase queue_size if pipeline stalls
- Infinite Iteration: Pipeline won't finish if input is infinite
Integration Examples
With Pandas
import pandas as pd
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
df = pd.read_csv("data.csv")
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(df.to_dict('records')),
output_adapter=output,
)
pipe.start()
processed_records = list(output)
processed_df = pd.DataFrame(processed_records)
With asyncio
import asyncio
from zoopipe import Pipe, PyGeneratorInputAdapter, PyGeneratorOutputAdapter
async def async_data_source():
for i in range(100):
await asyncio.sleep(0.1)
yield {"id": i, "data": f"item_{i}"}
async def process_async():
data = [item async for item in async_data_source()]
output = PyGeneratorOutputAdapter()
pipe = Pipe(
input_adapter=PyGeneratorInputAdapter(data),
output_adapter=output,
)
pipe.start()
return list(output)
results = asyncio.run(process_async())
Performance Tips
- Use Generators: Prefer generators over lists for large datasets
- Queue Size: Tune
queue_sizebased on your throughput needs - Batch Processing: Process large datasets in smaller batches
- Memory Management: Monitor memory usage with infinite streams
- Early Consumption: Start consuming output immediately after
pipe.start()