Getting Started¶
Installation¶
Optional extras:
pip install rivers[delta] # Delta Lake IO handler
pip install rivers[pyarrow] # PyArrow Tables / RecordBatchReaders for Delta
pip install rivers[polars] # Polars DataFrames / LazyFrames for Delta
pip install rivers[otel] # OpenTelemetry instrumentation
Your first asset¶
An asset is a function that produces a data artifact. Decorate it with @Asset:
import rivers as rs
@rs.Asset
def users():
return [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
]
Adding dependencies¶
Assets declare dependencies through their function parameters. rivers resolves the DAG automatically — the parameter name must match an upstream asset's name:
@rs.Asset
def active_users(users: list):
return [u for u in users if u["name"] != "Bob"]
@rs.Asset
def user_count(active_users: list):
return len(active_users)
Running assets¶
A CodeRepository registers your assets, resolves the dependency graph, and lets you materialize everything:
repo = rs.CodeRepository(assets=[users, active_users, user_count])
result = repo.materialize()
assert result.success
print(repo.load_node("user_count")) # 1
materialize() returns a RunResult — result.success, result.run_id, result.materialized_assets, result.failed_assets. Asset values are read back through repo.load_node(name), which goes through the asset's IO handler.
For named subsets, use a Job:
job = rs.Job(
name="user_pipeline",
assets=[users, active_users, user_count],
executor=rs.Executor.in_process(),
)
repo = rs.CodeRepository(assets=[users, active_users, user_count], jobs=[job])
result = repo.get_job("user_pipeline").execute() # returns RunResult; read values via load_node
Adding an IO handler¶
By default, asset outputs are passed through a shared in-memory handler. To persist outputs across runs, attach an IO handler:
from obstore.store import LocalStore
io = rs.PickleIOHandler(store=LocalStore(prefix="/tmp/rivers"))
@rs.Asset(io_handler=io)
def users():
return [{"id": 1, "name": "Alice"}]
The PickleIOHandler works with any obstore-compatible backend (local filesystem, S3, GCS, Azure). For typed Arrow data, use the Delta Lake handler.
Parallel execution¶
By default, CodeRepository uses Executor.parallel() (subprocess pool via loky) to run independent assets concurrently. Configure it explicitly:
repo = rs.CodeRepository(
assets=[users, active_users, user_count],
default_executor=rs.Executor.parallel(max_workers=4),
)
For sequential execution (best for debugging), use Executor.in_process():
repo = rs.CodeRepository(
assets=[users, active_users, user_count],
default_executor=rs.Executor.in_process(),
)
For Kubernetes step pods, use Executor.kubernetes() — see the Executors API.
Cleanup with the context manager¶
CodeRepository is a context manager. Exiting the with block calls shutdown(), which closes storage and runs teardown() on every registered resource:
Next steps¶
- Assets — single, multi, graph, and external assets
- Jobs & Executors — bundle assets into jobs and pick an execution strategy
- Partitions — partition assets by time, category, or dimension product
- IO Handlers — persist data with Delta Lake, pickle, or custom handlers
- Schedules & Sensors — drive runs from cron and external events