Configuration¶
Config gives each asset or task structured, validated settings via Pydantic models.
BaseModel (static config)¶
Use pydantic.BaseModel when values are known at definition time:
from pydantic import BaseModel
import rivers as rs
class ThresholdConfig(BaseModel):
min_value: float = 0.0
max_value: float = 1.0
@rs.Asset
def filtered_data(context: rs.AssetExecutionContext[ThresholdConfig]):
config = context.config
return [x for x in range(100) if config.min_value <= x <= config.max_value]
BaseSettings (env-aware config)¶
Use pydantic_settings.BaseSettings when values come from environment variables:
from pydantic_settings import BaseSettings
import rivers as rs
class PipelineConfig(BaseSettings):
api_key: str # resolved from API_KEY env var
batch_size: int = 100
@rs.Asset
def api_data(context: rs.AssetExecutionContext[PipelineConfig]):
config = context.config
return fetch_data(config.api_key, batch_size=config.batch_size)
Materialize-time overrides¶
Override config values when calling materialize():
repo.materialize(
selection=["filtered_data"],
config={"filtered_data": {"min_value": 10, "max_value": 50}},
)
Overrides are merged with defaults at instantiation time. For BaseSettings, env vars are resolved first, then overrides take precedence.
Tasks¶
Tasks support config the same way as assets:
@rs.Task
def my_task(context: rs.TaskExecutionContext[ThresholdConfig]):
config = context.config
...
Schedules and sensors¶
Schedule and Sensor evaluation functions also accept a generic config type. The same BaseModel / BaseSettings pattern works on ScheduleEvaluationContext[ConfigT] and SensorEvaluationContext[ConfigT]:
class CronConfig(BaseModel):
selection: list[str] = ["nightly_etl"]
@rs.Schedule(cron_schedule="0 2 * * *", job_name="nightly")
def nightly(context: rs.ScheduleEvaluationContext[CronConfig]):
return rs.RunRequest(tags={"selection": ",".join(context.config.selection)})
class SensorConfig(BaseSettings):
inbox_url: str # resolved from INBOX_URL env var
poll_batch_size: int = 50
@rs.Sensor(job_name="ingest", minimum_interval="30s")
def inbox_sensor(context: rs.SensorEvaluationContext[SensorConfig]):
config = context.config
new_files = list_new_files(config.inbox_url, limit=config.poll_batch_size)
if not new_files:
return rs.SkipReason("inbox empty")
return rs.SensorResult(
run_requests=[rs.RunRequest(tags={"file": f}) for f in new_files],
cursor=new_files[-1],
)
Typed context generics¶
AssetExecutionContext[ConfigT] and TaskExecutionContext[ConfigT] accept a generic type parameter that serves two purposes: it gives IDE auto-completion on context.config, and it tells rivers which config class to instantiate at runtime. The config type is derived from the annotation on the context parameter — no separate config= argument is needed.
When to use which¶
BaseModel |
BaseSettings |
|
|---|---|---|
| Values from | Explicit overrides + defaults only | Env vars, .env files, secrets, overrides, defaults |
| Use when | Config is static/known at definition time | Config varies by environment (dev/staging/prod) |
| Dependencies | pydantic (bundled with rivers) |
pydantic-settings (bundled with rivers) |