Skip to content

Job

Job

A named bundle of assets, multi-assets, graph assets, tasks, and bash tasks executed together as one run. Must be added to a CodeRepository for validation and execution.

import rivers as rs

repo = rs.CodeRepository(
    assets=[asset_a, asset_b],
    jobs=[
        rs.Job(name="pipeline", assets=[asset_a, asset_b], executor=rs.Executor.in_process()),
    ],
)
repo.get_job("pipeline").execute()

Constructor:

Parameter Type Default Description
name str required Unique name for the job within the repository.
assets Sequence[SingleAsset \| MultiAsset \| GraphAsset \| Task \| BashTask] required Nodes the job will materialize.
executor Executor \| None None Override the repository default executor for this job.
allow_incomplete_deps bool False Tolerate missing upstream deps (debug / partial graphs). Production jobs should leave this False.

Methods:

execute()

def execute(
    self,
    partition_key: PartitionKey | None = None,
    tags: list[tuple[str, str]] | None = None,
    config: dict[str, dict[str, Any]] | None = None,
    raise_on_error: bool = True,
) -> RunResult

Run the job synchronously, optionally targeting a single partition. Returns a RunResult. Read materialized values back via repo.load_node(name).

Parameter Type Default Description
partition_key PartitionKey \| None None Partition to materialize. Required for partitioned assets.
tags list[tuple[str, str]] \| None None Run tags applied for queue / observability filtering. rivers/priority is honored for run-queue priority.
config dict[str, dict[str, Any]] \| None None Per-asset config, keyed by asset name.
raise_on_error bool True Raise on first failure instead of returning a failed result.

Raises: ValueError if the job has not been added to a CodeRepository.


Per-asset executor override

Override the executor for individual assets via the rivers/executor metadata key:

@rs.Asset(metadata={"rivers/executor": "in_process"})
def needs_context(context: rs.AssetExecutionContext) -> int:
    return context.asset_name
Value Executor
"in_process" Executor.in_process()
"parallel" Executor.parallel() (default subprocess pool size)

When overrides are present, the executor groups independent steps by level and partitions each level by executor. Steps sharing the same executor within a level still run in parallel (for parallel); steps with different executors in the same level run as separate batches.

For graph asset internal tasks, use rivers/node/executor (it falls back to rivers/executor, then to the default).