Assets¶
Asset¶
Decorator for defining assets. Can be used bare or with parameters.
# Bare decorator
@rs.Asset
def my_asset():
return data
# With parameters
@rs.Asset(name="custom_name", tags=["etl"], kinds="table", group="pipeline")
def my_asset():
return data
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str \| None |
None |
Asset name. Defaults to function name. |
tags |
list[str] \| None |
None |
Tags for categorization. |
kinds |
str \| list[str] \| None |
None |
Asset kind(s) (e.g. "table", ["table", "delta"]). |
group |
str \| None |
None |
Group name for organization. |
code_version |
str \| None |
None |
Version string for change detection. |
io_handler |
BaseIOHandler \| None |
None |
IO handler for persistence. |
metadata |
dict[str, str] \| None |
None |
Static metadata passed to IO handlers. |
partitions_def |
PartitionsDefinition \| None |
None |
Partition definition. |
deps |
list[DepDef] \| None |
None |
Input and lineage-only dependencies. Created via AssetDef.input() and AssetDef.dep(). |
hooks |
list[Hook] \| None |
None |
Success/failure hooks for this asset. |
automation_condition |
AutomationCondition \| None |
None |
Declarative automation condition. |
backfill_strategy |
BackfillStrategy \| None |
None |
Default strategy when this asset is included in a backfill. |
pool |
str \| list[str] \| None |
None |
Concurrency pool(s) this asset belongs to. |
pool_slots |
int \| dict[str, int] \| None |
None |
Slots consumed per pool (default 1). |
Properties:
| Property | Type | Description |
|---|---|---|
name |
str |
Asset name (function name or explicit name=). |
tags |
list[str] \| None |
Tags. |
kinds |
list[str] |
Kind(s), always a list. |
group |
str \| None |
Group name. |
code_version |
str \| None |
Code version string. |
metadata |
dict[str, str] \| None |
Static metadata. |
partitions_def |
PartitionsDefinition \| None |
Partition definition. |
partition_mapping |
dict[str, PartitionMapping] \| None |
Per-dep partition mappings (derived from deps). |
hooks |
list[Hook] \| None |
Attached hooks. |
automation_condition |
AutomationCondition \| None |
Automation condition. |
pool |
list[tuple[str, int]] |
Normalized pool membership: (pool_key, slots) pairs. |
observe_fn |
Callable \| None |
Observation function (external assets only). |
is_async |
bool |
True when the wrapped function is a coroutine function. |
is_single |
bool |
True for SingleAsset. |
is_multi |
bool |
True for MultiAsset. |
is_graph |
bool |
True for GraphAsset. |
is_external |
bool |
True for ExternalAsset. |
Asset.from_multi()¶
Create a multi-output asset:
Parameters: Same as Asset plus:
| Parameter | Type | Description |
|---|---|---|
wraps |
Callable \| None |
The function to wrap. |
output_defs |
list[AssetDef] |
Output definitions for each output. |
partitions_def |
PartitionsDefinition \| None |
Top-level partition definition applied to all outputs. Takes precedence over per-output AssetDef.partitions_def. |
deps |
list[DepDef] |
Input and lineage-only dependencies. Created via AssetDef.input() and AssetDef.dep(). |
Top-level partitions_def¶
When set on from_multi, the partition definition applies to every output, overriding any partitions_def set on individual AssetDef entries:
pd = rs.PartitionsDefinition.static_(["a", "b", "c"])
@rs.Asset.from_multi(
partitions_def=pd,
output_defs=[
rs.AssetDef("x", io_handler=handler),
rs.AssetDef("y", io_handler=handler),
],
)
def multi(context: rs.AssetExecutionContext):
yield rs.Output(value=10, output_name="x")
yield rs.Output(value=20, output_name="y")
Without a top-level partitions_def, per-output definitions are allowed if they share the same variant type (e.g. all Static) and have at least one overlapping key. Mixed variant types (e.g. Static + Daily) are rejected.
deps parameter¶
The deps parameter declares upstream dependencies with fine-grained control over partition mapping, IO handler, and metadata. Dependencies are created via AssetDef.input() (data dependency) or AssetDef.dep() (lineage-only graph edge):
@rs.Asset.from_multi(
partitions_def=pd_multi,
output_defs=[rs.AssetDef("out", io_handler=handler)],
deps=[
rs.AssetDef.input("data_source", partition_mapping=rs.PartitionMapping.static_({"a": "1"})),
rs.AssetDef.dep("trigger"), # ordering edge, no data loaded
],
)
def multi(data_source: int):
yield rs.Output(value=data_source * 2, output_name="out")
See AssetDef.input(), AssetDef.dep(), and DepDef below.
Asset.from_graph()¶
Create a graph asset that composes tasks into a sub-DAG. Internal tasks are namespaced as {graph_name}/{task_name} and execute as independent plan steps. The return value determines the final node whose output becomes the graph asset's output.
Parameters: Same as Asset plus:
| Parameter | Type | Default | Description |
|---|---|---|---|
node_io_handler |
BaseIOHandler \| str \| None |
None |
IO handler for internal tasks. Falls back to io_handler, then default. |
deps is inherited from Asset — partition mappings, IO handler overrides, and metadata overrides are propagated to internal tasks.
Internal task executor is controlled via rivers/node/executor metadata (falls back to rivers/executor, then default). See Graph Assets guide for full details.
Asset.external()¶
Create an external asset — a data source managed outside rivers. It participates in the dependency graph but is never materialized. Downstream assets load its data via io_handler.load_input().
# Direct call — no observation function
source = Asset.external(
name="source_table",
io_handler=my_handler,
kinds="table",
metadata={"path": "s3://bucket/table"},
)
# As decorator — the function becomes the observation function
@Asset.external(io_handler=my_handler)
def source_table(context: rs.AssetExecutionContext):
context.add_output_metadata({
"row_count": rs.MetadataValue.int(42_000),
})
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str \| None |
None |
Asset name. Required unless used as decorator. |
io_handler |
BaseIOHandler |
required | IO handler for loading data. |
tags |
list[str] \| None |
None |
Tags for categorization. |
kinds |
str \| list[str] \| None |
None |
Asset kind(s) (e.g. "table"). |
group |
str \| None |
None |
Group name for organization. |
metadata |
dict[str, str] \| None |
None |
Static metadata passed to IO handlers. |
partitions_def |
PartitionsDefinition \| None |
None |
Partition definition. |
automation_condition |
AutomationCondition \| None |
None |
Declarative automation condition. |
The optional observation function receives an AssetExecutionContext and can call context.add_output_metadata(...) to record metadata about the external data source (e.g., row count, last modified timestamp). Trigger observations via CodeRepository.observe().
SingleAsset¶
Concrete type returned when @Asset decorates a function. Inherits every property from Asset (no extra attributes).
MultiAsset¶
Returned by Asset.from_multi(). Inherits every property from Asset plus:
| Property | Type | Description |
|---|---|---|
output_defs |
list[AssetDef] |
Per-output asset definitions declared on this multi-asset. |
GraphAsset¶
Returned by Asset.from_graph(). Inherits every property from Asset (no extra attributes).
ExternalAsset¶
Returned by Asset.external(). Inherits every property from Asset. The observe_fn property exposes the callable attached when Asset.external is used as a decorator.
AssetDef¶
Output definition for multi-assets.
Parameters:
| Parameter | Type | Default |
|---|---|---|
name |
str |
required |
tags |
list[str] \| None |
None |
kinds |
str \| list[str] \| None |
None |
group |
str \| None |
None |
code_version |
str \| None |
None |
io_handler |
BaseIOHandler \| str \| None |
None |
metadata |
dict[str, str] \| None |
None |
partitions_def |
PartitionsDefinition \| None |
None |
partition_mapping |
dict[str \| AssetDef, PartitionMapping] \| None |
None |
pool |
str \| list[str] \| None |
None |
pool_slots |
int \| dict[str, int] \| None |
None |
AssetDef.input()¶
Create a data dependency for use in the deps parameter of @Asset(...), Asset.from_multi(), or Asset.from_graph(). The upstream asset's data is loaded and passed as a function parameter (matched by name).
AssetDef.input(
name: str,
partition_mapping: PartitionMapping | None = None,
io_handler: BaseIOHandler | str | None = None,
metadata: dict[str, str] | None = None,
) -> DepDef
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Upstream asset name. Must match a parameter on the decorated function. |
partition_mapping |
PartitionMapping \| None |
None |
Partition mapping for this dependency. |
io_handler |
BaseIOHandler \| str \| None |
None |
Override the upstream's IO handler when loading this input. |
metadata |
dict[str, str] \| None |
None |
Override the upstream's metadata in InputContext.asset_metadata. Does not mutate the upstream asset. |
deps=[
# Load "source" with a custom partition mapping and IO handler override
rs.AssetDef.input(
"source",
partition_mapping=rs.PartitionMapping.static_({"a": "1", "b": "2"}),
io_handler=CustomLoader(),
),
# Load "config" with metadata override (controls how the IO handler reads)
rs.AssetDef.input("config", metadata={"columns": "a,b,c"}),
]
The io_handler override only affects how the downstream multi-asset loads this input — it does not change the upstream asset's output handler. Similarly, metadata replaces (not merges) the upstream's metadata for this load context only.
AssetDef.dep()¶
Create a lineage-only dependency for use in the deps parameter of @Asset(...), Asset.from_multi(), or Asset.from_graph(). Adds a graph edge (ordering guarantee) without loading any data. The name does not need to match a function parameter.
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Upstream asset name. |
partition_mapping |
PartitionMapping \| None |
None |
Partition mapping for this dependency. |
DepDef¶
Dependency definition returned by AssetDef.input() and AssetDef.dep(). Used in the deps parameter of Asset.from_multi().
Attributes:
| Attribute | Type | Description |
|---|---|---|
name |
str |
Upstream asset name. |
partition_mapping |
PartitionMapping \| None |
Partition mapping for this dependency. |
metadata |
dict[str, str] \| None |
Metadata override (input deps only). |
is_input |
bool |
True for AssetDef.input(), False for AssetDef.dep(). |
SelfDependency[T]¶
Generic wrapper for self-referencing assets. When an asset parameter is named self with a SelfDependency[T] type hint, the executor loads the asset's own previous output via its io_handler before execution.
@rs.Asset(io_handler=my_handler)
def incremental(self: rs.SelfDependency[list]) -> list:
prev = self.get_inner() # list | None
if prev is None:
return [1, 2, 3]
return prev + [4, 5, 6]
- No graph edge is created for
self— no cycle - On first run (no persisted data),
get_inner()returnsNone - On subsequent runs,
get_inner()returns the previously stored output of typeT - An IO handler is required — the asset's own
io_handleris preferred, otherwise the defaultInMemoryIOHandleris used. RaisesConfigurationErroronly if neither is available.
Methods:
| Method | Returns | Description |
|---|---|---|
get_inner() |
T \| None |
The previously stored output, or None on first run. |
Output¶
Per-asset result type for materializations. Return from an @Asset function to declaratively carry a value alongside metadata, data version, and tags.
@rs.Asset(io_handler=handler)
def my_asset() -> rs.Output:
df = compute_data()
return rs.Output(
value=df,
metadata={"row_count": rs.MetadataValue.int(len(df))},
data_version="v2.0",
tags=["validated"],
)
The IO handler receives the unwrapped value, not the Output object. If combined with context.add_output_metadata(), both are merged — Output metadata takes precedence on key conflicts.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
value |
Any |
None |
The output data (passed to IO handler and downstream assets). |
metadata |
dict[str, MetadataValue] \| None |
None |
Metadata to record with the materialization event. |
data_version |
str \| None |
None |
Explicit data version (overrides auto UUID). |
tags |
list[str] \| None |
None |
Tags to record with the event. |
Observation¶
Per-asset result type for external asset observations. Return from an observation function to declaratively carry metadata and data version.
@rs.Asset.external(io_handler=handler)
def source_table(context: rs.AssetExecutionContext):
return rs.Observation(
metadata={"row_count": rs.MetadataValue.int(150_000)},
data_version="2024-03-16",
)
When returned, an Observation event is emitted to storage. Can be combined with context.add_output_metadata() — Observation metadata takes precedence on conflict.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
metadata |
dict[str, MetadataValue] \| None |
None |
Metadata to record with the observation event. |
data_version |
str \| None |
None |
The observed data version. |
Materialization¶
Per-asset result type for assets that manage their own persistence. Return from an @Asset function when the body has already written its output to its destination — e.g. an HTTP push, a message-queue emit, or a direct write to an external table — and there is nothing meaningful to round-trip through an IO handler.
@rs.Asset
def push_to_api(rows: list[dict]) -> rs.Materialization:
response = requests.post(API_URL, json=rows)
return rs.Materialization(
metadata={"status_code": rs.MetadataValue.int(response.status_code)},
data_version=response.headers["ETag"],
)
When returned, a Materialization event is emitted to storage with the supplied metadata, data version, and tags — but handle_output is never called. Use this in place of Output(value) when:
- The asset is a terminal side-effecting node (push, emit, external write).
- You want provenance and observability (data version, metadata) without persisting a value rivers can later
load_input.
This is the recommended pattern for opting an asset out of the IO handler framework. It works uniformly across Executor.in_process(), Executor.parallel(), and Executor.kubernetes() — the discriminator lives at the return type, so every executor takes the same code path.
Downstream consumers cannot load_input an output produced via Materialization — by design. Treat such assets as terminal in the graph.
Can be combined with context.add_output_metadata() and context.register_data_version() — Materialization metadata and data version take precedence on conflict.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
metadata |
dict[str, MetadataValue] \| None |
None |
Metadata to record with the materialization event. |
data_version |
str \| None |
None |
Explicit data version (overrides auto UUID). |
tags |
list[str] \| None |
None |
Tags to record with the event. |
output_name |
str \| None |
None |
For multi-asset generator yields, identifies which output this belongs to. |