IO Handlers¶
BaseIOHandler¶
Abstract base class for IO handlers. Extends pydantic_settings.BaseSettings, so handler configuration can be resolved from environment variables, .env files, or explicit kwargs.
All IO handlers must inherit from BaseIOHandler. Duck-typing is not supported.
from rivers import BaseIOHandler
class MyHandler(BaseIOHandler):
def handle_output(self, context: OutputContext, obj: Any) -> None:
...
def load_input(self, context: InputContext) -> Any:
...
Abstract methods:
| Method | Signature |
|---|---|
handle_output |
(context: OutputContext, obj: Any) -> None |
load_input |
(context: InputContext) -> Any |
OutputContext¶
Passed to handle_output() with information about the asset being written.
Attributes:
| Attribute | Type | Description |
|---|---|---|
asset_name |
str |
Name of the asset. |
asset_metadata |
dict[str, str] \| None |
Static metadata from asset definition. |
partition |
PartitionContext \| None |
Partition info, if partitioned. |
type_hint |
type \| None |
Return type annotation. |
output_metadata |
dict[str, MetadataValue] \| None |
Metadata attached during execution. |
Methods:
def add_output_metadata(
self,
metadata: dict[str, str | int | float | bool | None | MetadataValue],
) -> None
Attach runtime metadata to the output. Values are automatically converted to MetadataValue instances.
Record a content-addressable data-version string for this output. Used by automation conditions like data_version_changed().
InputContext¶
Passed to load_input() with information about the asset being loaded.
Attributes:
| Attribute | Type | Description |
|---|---|---|
asset_name |
str |
Name of the upstream asset. |
downstream_asset |
str |
Name of the asset requesting the load. |
asset_metadata |
dict[str, str] \| None |
Static metadata from the upstream asset. |
partition |
PartitionContext \| None |
Partition info, if partitioned. |
type_hint |
type \| None |
Type annotation on the downstream parameter. |
InMemoryIOHandler¶
Stores outputs in a Python dictionary. No persistence across runs. This is the default IO handler — assigned automatically to all assets and tasks that don't have an explicit handler.
No constructor parameters. Cannot survive a process boundary — switch to PickleIOHandler or DeltaIOHandler if you use Executor.parallel() or Executor.kubernetes().
Output metadata: {"storage": "memory", "size_bytes": <int>}
PickleIOHandler¶
Persists outputs as pickle files via any obstore-compatible backend.
from obstore.store import LocalStore
io = rs.PickleIOHandler(
store=LocalStore(prefix="/data/assets"),
prefix="v1",
)
Constructor:
| Parameter | Type | Default | Description |
|---|---|---|---|
store |
ObjectStore |
required | Any obstore backend (Local, S3, GCS, Azure, Memory). |
prefix |
str |
"" |
Path prefix for all keys. |
Output metadata: {"path": <str>, "serializer": "pickle", "size_bytes": <int>, "write_duration_s": <float>}
rivers.testing¶
Test-only Storage factories that wrap each instance in a dedicated tokio runtime, so dropping the Storage synchronously drains its router task — releasing the RocksDB file lock (for embedded backends) and tearing down in-memory state before control returns.
Use these in pytest fixtures; production code should keep using Storage.memory() / Storage.embedded() / Storage.connect(), which share the global IO runtime.
import pytest
from rivers.testing import memory_storage, embedded_storage
@pytest.fixture
def storage():
return memory_storage()
@pytest.fixture
def embedded(tmp_path):
return embedded_storage(str(tmp_path / "db"))
| Function | Returns |
|---|---|
memory_storage() |
Storage — in-memory with sync-shutdown on drop. |
embedded_storage(path) |
Storage — embedded RocksDB at path with sync-shutdown on drop. |