Task¶
Task¶
A unit of computation used inside graph assets. Can be used as a bare decorator or with parameters.
import rivers as rs
@rs.Task
def my_task(data):
return transform(data)
@rs.Task(name="custom_name", tags=["slow"])
def another_task(data):
return heavy_transform(data)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str \| None |
None |
Task name. Defaults to function name. |
tags |
list[str] \| None |
None |
Tags for categorization. |
partitions_def |
PartitionsDefinition \| None |
None |
Partition definition for partitioned tasks. |
partition_mapping |
dict[str, PartitionMapping] \| None |
None |
Mapping from dependency name to partition mapping strategy. |
io_handler |
BaseIOHandler \| str \| None |
None |
IO handler for persisting the task's output. |
Properties:
| Property | Type | Description |
|---|---|---|
name |
str \| None |
Task name (defaults to the wrapped function name). |
tags |
list[str] \| None |
Tags propagated to runs that include this task. |
is_async |
bool |
True when the wrapped function is a coroutine function. |
When called inside an Asset.from_graph() body, a Task records itself into the composition graph and returns an InvokedNodeOutput instead of executing.
Partitioning:
Tasks support the same partitioning features as assets:
parts = rs.PartitionsDefinition.static_(["a", "b", "c"])
# Partitioned task — receives PartitionContext during execution
@rs.Task(partitions_def=parts)
def process(context: rs.AssetExecutionContext, source: str) -> str:
return f"processed-{context.partition_key}"
# Unpartitioned task depending on partitioned asset via AllPartitions
@rs.Task(partition_mapping={"source": rs.PartitionMapping.all_partitions()})
def aggregate(source: int) -> int:
return source + 1
BashTask¶
A task that runs a shell command natively in Rust via std::process::Command. No Python subprocess overhead.
import rivers as rs
# Shell command (interpreted by sh -c)
greet = rs.BashTask(name="greet", command="echo hello")
# Command as list (no shell interpretation)
build = rs.BashTask(name="build", command=["make", "build"])
# With environment and working directory
deploy = rs.BashTask(
name="deploy",
command="./deploy.sh",
env={"STAGE": "prod"},
cwd="/app",
)
result = greet() # "hello"
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Task name. |
command |
str \| list[str] |
required | Shell command string or argv list. |
env |
dict[str, str] \| None |
None |
Additional environment variables. |
cwd |
str \| None |
None |
Working directory. |
tags |
list[str] \| None |
None |
Tags for categorization. |
partition_mapping |
dict[str, PartitionMapping] \| None |
None |
Mapping from dependency name to partition mapping strategy. |
io_handler |
BaseIOHandler \| str \| None |
None |
IO handler for persisting the task's output. |
Properties:
| Property | Type |
|---|---|
name |
str |
command |
str \| list[str] |
env |
dict[str, str] \| None |
cwd |
str \| None |
tags |
list[str] \| None |
Behavior:
command: strruns throughsh -c(shell interpretation, pipes, etc.)command: list[str]runs directly via exec (no shell)- Returns stdout as a string (trailing newline stripped)
- Raises
OSErroron non-zero exit code with stderr in the message - Works inside
Asset.from_graph()composition context
TaskExecutionContext¶
A lightweight execution context for tasks. Similar to AssetExecutionContext but without asset-specific fields (kinds, group, code_version, metadata, output metadata).
import rivers as rs
@rs.Task(tags=["etl"])
def my_task(context: rs.TaskExecutionContext, source: int) -> str:
context.log.info(f"Running task {context.task_name}")
return f"processed: {source}"
# With partitions
parts = rs.PartitionsDefinition.static_(["a", "b"])
@rs.Task(partitions_def=parts)
def partitioned_task(context: rs.TaskExecutionContext) -> str:
return f"key={context.partition_key}"
Parameters (constructor):
| Parameter | Type | Default | Description |
|---|---|---|---|
task_name |
str |
required | Name of the task. |
tags |
list[str] \| None |
None |
Tags for categorization. |
partition |
PartitionContext \| None |
None |
Partition context if task is partitioned. |
Properties:
| Property | Type | Description |
|---|---|---|
task_name |
str |
Name of the task. |
tags |
list[str] \| None |
Tags assigned to the task. |
partition |
PartitionContext \| None |
Partition context. |
has_partition_key |
bool |
Whether a partition key is available. |
partition_key |
str |
The current partition key (raises if not partitioned). |
partition_time_window |
tuple[datetime, datetime] \| None |
Time window for time-based partitions. |
config |
ConfigT |
Config instance (if the task function uses a config type hint). |
log |
logging.Logger |
Logger named rivers.tasks.<task_name>. |
InvokedNodeOutput¶
Returned when calling a Task or Asset inside a graph asset body. Represents a dependency edge in the composition graph.
Properties:
| Property | Type | Description |
|---|---|---|
node_name |
str |
Name of the invoked task or asset. |
output_name |
str |
Name of the output (typically "result"). |
map()¶
Fan task out across each value of this output. The producer must yield an iterable (or a list of DynamicOutput); task is invoked once per element.
| Parameter | Type | Default | Description |
|---|---|---|---|
task |
Task |
required | Task to invoke once per element. |
max_concurrency |
int \| None |
unbounded | Cap on simultaneously fanned-out instances. |
@rs.Task
def chunk_ids() -> list[int]:
return [1, 2, 3, 4]
@rs.Task
def process_id(chunk_id: int) -> int:
return chunk_id * 10
@rs.Asset.from_graph()
def fanout():
ids = chunk_ids()
mapped = ids.map(process_id, max_concurrency=2)
return mapped.collect()
MappedOutput¶
Handle returned by InvokedNodeOutput.map(). Use .collect() to wait for every mapped instance, or .collect_stream() to consume them as they finish.
| Property | Type | Description |
|---|---|---|
node_name |
str |
Producer node whose output is being mapped. |
output_name |
str |
Output identifier on the producer node. |
collect()¶
Wait for all mapped instances to finish; return a single aggregated output for downstream wiring.
collect_stream()¶
Stream mapped instance results as they complete. With ordered=True, emit results in their original mapping-key order; otherwise emit in completion order.
DynamicOutput¶
Wrap a value with an explicit mapping key for dynamic fan-out. When a producer asset returns a list of DynamicOutput, the executor uses .key as the mapping key (instance name) instead of a numeric index.
@rs.Task
def fan_out_with_keys() -> list[rs.DynamicOutput]:
return [
rs.DynamicOutput(key="alpha", value=1),
rs.DynamicOutput(key="beta", value=2),
]
| Property | Type | Description |
|---|---|---|
key |
str |
Mapping key (used as the fan-out instance name). |
value |
Any |
The wrapped output value. |