ZooPipe Hooks Guide
Why Hooks? (vs
df.apply) In vectorized libraries like Pandas/Polars, using.apply()forces serialization overhead per row. ZooPipe Hooks execute inside a Rust-managed parallel thread pool, allowing complex Python logic (API calls, DB queries) to run in efficient parallel streams without the "DataFrame tax".
Hooks are a powerful feature in ZooPipe that allow you to inject custom logic into the pipeline's lifecycle. They enable you to transform data, manage resources, and perform complex enrichments without modifying the core adapter logic.
The BaseHook Class
All hooks must inherit from BaseHook.
from zoopipe import BaseHook, EntryTypedDict, HookStore
class MyHook(BaseHook):
def setup(self, store: HookStore) -> None:
pass
def execute(self, entries: list[EntryTypedDict], store: HookStore) -> list[EntryTypedDict]:
return entries
def teardown(self, store: HookStore) -> None:
pass
Lifecycle Phases
1. Setup (setup)
- When: Called exactly ONCE (globally) before the pipeline begins processing any data.
- Context: Even with
MultiThreadExecutor,setupruns on the main thread. - Purpose: Initialize global resources or configuration.
- Store: The
storepassed here is the global pipeline store. It is NOT passed toexecute.
2. Execute (execute)
- When: Called for every batch of data that passes through the pipeline.
- Purpose: Transform, validate, or enrich the data.
- Store: The
storepassed here is fresh and empty for each batch. It is isolated from other batches and threads. Use it for temporary state within the hook chain for that specific batch. - Thread Safety: Since
storeis local to the batch/thread, it is safe to write to. However, do NOT rely onsetupstore values here.
3. Teardown (teardown)
- When: Called once after all data has been processed (or if the pipeline crashes).
- Purpose: Clean up global resources.
- Store: Receives the global pipeline store (same as
setup).
Example 1: Simple Enrichment Hook
This hook adds a processing timestamp to every record.
import time
from datetime import datetime
from zoopipe import BaseHook, EntryTypedDict, HookStore
class TimestampHook(BaseHook):
def setup(self, store: HookStore) -> None:
# Store the pipeline start time
store["start_time"] = time.time()
print(f"Pipeline started at: {datetime.now()}")
def execute(self, entries: list[EntryTypedDict], store: HookStore) -> list[EntryTypedDict]:
current_time = datetime.now().isoformat()
for entry in entries:
# Modify the raw data dictionary in-place
entry["raw_data"]["processed_at"] = current_time
return entries
def teardown(self, store: HookStore) -> None:
duration = time.time() - store["start_time"]
print(f"Pipeline finished in {duration:.2f} seconds")
Example 2: Advanced SQL Expansion (Pagination Pattern)
This example demonstrates how the SQLExpansionHook works conceptually. It is used in cursor pagination to "hydrate" full records from a batch of IDs.
Scenario: You have a list of user IDs, and you need to fetch their full profiles from a database.
import sqlite3
from zoopipe import BaseHook, EntryTypedDict, HookStore
class UserHydrationHook(BaseHook):
def __init__(self, db_path: str):
super().__init__()
self.db_path = db_path
def setup(self, store: HookStore) -> None:
# In this pattern, setup doesn't open connections because we want
# each batch to be self-contained and thread-safe.
pass
def execute(self, entries: list[EntryTypedDict], store: HookStore) -> list[EntryTypedDict]:
# 2. Open connection locally per batch (or use a thread-safe pool)
conn = sqlite3.connect(self.db_path)
try:
cursor = conn.cursor()
# 3. Extract IDs from the incoming batch
user_ids = [entry["raw_data"]["id"] for entry in entries]
if not user_ids:
return entries
# 4. Perform a bulk fetch
placeholders = ",".join("?" * len(user_ids))
query = f"SELECT id, email, status FROM users WHERE id IN ({placeholders})"
cursor.execute(query, user_ids)
results = {row[0]: row for row in cursor.fetchall()}
# 5. Merge data back into the entries
for entry in entries:
uid = entry["raw_data"]["id"]
if uid in results:
row = results[uid]
entry["raw_data"]["email"] = row[1]
entry["raw_data"]["status"] = row[2]
else:
entry["raw_data"]["error"] = "User not found"
cursor.close()
finally:
# 6. Ensure connection is closed even if errors occur
conn.close()
return entries
def teardown(self, store: HookStore) -> None:
pass
Best Practices
- Thread Safety: The
storepassed toexecuteis unique per batch and safe to modify. However, if you access global resources initialized insetup(like a shared API client), you MUST ensure they are thread-safe or use locks. - Performance vs Safety: Creating resources (like DB connections) inside
executeensures safety but adds overhead. Use connection pooling (initialized insetup) for the best of both worlds. - Error Handling: If an exception is raised in
execute, that batch may fail. Handle expected errors (like missing keys) gracefully if you want the pipeline to continue. - Priorities: You can set
priorityin__init__to control the order in which hooks run (lower numbers run earlier).