Resources¶
Resources are shared, injectable dependencies (database connections, API clients, etc.) available across assets, tasks, schedules, and sensors.
Defining a resource¶
Extend rivers.Resource (a pydantic_settings.BaseSettings subclass) with optional lifecycle hooks:
import rivers as rs
class DatabaseResource(rs.Resource):
connection_string: str # explicit or from CONNECTION_STRING env var
pool_size: int = 5
def setup(self):
self._pool = create_pool(self.connection_string, self.pool_size)
def teardown(self):
self._pool.close()
setup() and teardown() are optional — only override them if your resource needs initialization or cleanup.
Registering resources¶
Pass resources to CodeRepository as a dict:
repo = rs.CodeRepository(
assets=[my_asset],
resources={
"db": DatabaseResource(connection_string="postgresql://..."),
"api": APIClient(base_url="https://api.example.com"),
},
)
Injection¶
Resources are injected by matching parameter names to resource keys:
@rs.Asset
def my_asset(context: rs.AssetExecutionContext, db: DatabaseResource, api: APIClient):
data = api.fetch("/data")
db._pool.execute("INSERT ...", data)
return data
The injection resolution order is:
- Context (first param, if it matches a context type)
self— self-dependency- Upstream asset name match (in-memory result)
- Upstream asset name match (via io_handler)
- Resource key match
- Error — unknown parameter
Asset names take precedence over resource keys. rivers warns at resolve time if a resource key shadows an asset name.
Schedule and sensor injection¶
Schedules and sensors also support resource injection:
@rs.Schedule(cron_schedule="0 * * * *", job_name="hourly_job")
def my_schedule(context: rs.ScheduleEvaluationContext, db: DatabaseResource):
if db.check_condition():
return rs.RunRequest()
return rs.SkipReason("No new data")
Lifecycle¶
CodeRepository.__init__()
└─ stores raw resource instances
CodeRepository.resolve()
└─ calls resource.setup() for each resource
materialize() / evaluate_schedule() / evaluate_sensor()
└─ injects resource instances into function calls
CodeRepository shutdown (context manager __exit__ or shutdown())
└─ calls resource.teardown() for each resource
Resources are initialized once at resolve time and shared across all executions. They are not re-created per run.
IOHandler as resource reference¶
io_handler on an asset can reference a resource by string key:
class S3Handler(rs.BaseIOHandler):
bucket: str
prefix: str
def handle_output(self, context, obj): ...
def load_input(self, context): ...
@rs.Asset(io_handler="s3") # references resources["s3"]
def my_asset():
return data
repo = rs.CodeRepository(
assets=[my_asset],
resources={"s3": S3Handler(bucket="my-bucket", prefix="assets/")},
)
String references are resolved to the actual handler at resolve() time.
Testing¶
Override resources in tests by passing a different dict:
def test_my_pipeline():
mock_db = DatabaseResource(connection_string="sqlite:///:memory:")
repo = rs.CodeRepository(
assets=[my_asset],
resources={"db": mock_db},
)
result = repo.materialize()
assert result.success
Validation¶
rivers validates all resource references at resolve time:
- Asset/task function parameters that don't match an upstream asset or resource key raise
ConfigurationError - Schedule/sensor evaluation function parameters that don't match a resource key raise
ConfigurationError - IOHandler string references that don't match a resource key raise
ConfigurationError