Backfills¶
Backfills let you reprocess a range of partitions for one or more assets. This is useful when you need to recompute historical data after a bug fix, schema change, or when onboarding a new asset that depends on existing partitioned data.
When to use backfills¶
- Reprocess partitions after fixing a bug in asset logic
- Fill in data for a newly added partitioned asset
- Recompute downstream assets after upstream schema changes
- Re-run failed partitions from a previous execution
Backfill strategies¶
The BackfillStrategy controls how partition keys are grouped into runs.
MultiRun (default)¶
Creates one run per partition key. This gives maximum granularity -- if one partition fails, the others are unaffected.
import rivers as rs
repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-31"),
strategy=rs.BackfillStrategy.multi_run(),
)
With 31 daily partitions, this creates 31 separate runs.
SingleRun¶
Batches all partition keys into a single run. Useful when you want to minimize run overhead or when the asset logic handles multiple partitions efficiently.
repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-31"),
strategy=rs.BackfillStrategy.single_run(),
)
With 31 daily partitions, this creates 1 run that processes all 31 partitions.
Reporting per-partition failures with mark_partition_failed¶
A SingleRun step is invoked once with all the backfill's keys in context.partition.keys. By default the run treats every key as succeeded if the function returns, and as failed if it raises — there's no granularity below the step boundary.
context.mark_partition_failed(partition_key, error) lets the function record failures for individual partitions while keeping the rest as successes. Use it when you can isolate per-key errors inside a vectorized run:
@rs.Asset(
partitions_def=rs.PartitionsDefinition.daily(start=datetime(2024, 1, 1)),
backfill_strategy=rs.BackfillStrategy.single_run(),
)
def daily_events(context: rs.AssetExecutionContext):
for key in context.partition.keys:
try:
process_day(key)
except Exception as exc:
context.mark_partition_failed(key, str(exc))
context.log.warning("partition %s failed: %s", key, exc)
# Returning normally — partitions not marked failed are recorded as succeeded.
Semantics:
- Only valid in batched runs (
SingleRunand the single-run dimensions ofPerDimension). Calling it on a non-batched run is a no-op since each key already has its own run. - The key must be in
context.partition.keys; passing an unrelated key raisesExecutionError("partition key … is not in this context's partition keys"). - Failures recorded this way roll up into the same
BackfillStatus.failed_partitions/BackfillResult.failedcounters as full-run failures — the UI andrepo.get_backfill()see them identically. - The function may still raise to signal a whole-run failure;
mark_partition_failedis for the partial-failure case where you want some keys preserved as successes.
PerDimension¶
For multi-dimensional partitions, gives per-dimension control. Dimensions in multi_run are iterated across runs; dimensions in single_run are batched within each run.
repo.backfill(
selection=["regional_events"],
partition_range=rs.PartitionKeyRange.multi({
"date": ("2024-01-01", "2024-01-07"),
"region": ["us", "eu", "asia"],
}),
strategy=rs.BackfillStrategy.per_dimension(
multi_run=["date"],
single_run=["region"],
),
)
This creates 7 runs (one per date), each processing all 3 regions within a single run.
Partition key ranges¶
Specify which partitions to backfill using PartitionKeyRange.
Single-dimension range¶
Multi-dimension range¶
Dimension values can be a (from, to) tuple for ranges or a list of explicit keys.
Running a backfill¶
Use repo.backfill() to launch a backfill:
import rivers as rs
from datetime import datetime
@rs.Asset(
partitions_def=rs.PartitionsDefinition.daily(start=datetime(2024, 1, 1)),
)
def daily_events():
...
repo = rs.CodeRepository(assets=[daily_events])
repo.resolve()
result = repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-15"),
strategy=rs.BackfillStrategy.multi_run(),
failure_policy="continue",
max_concurrency=4,
tags=[("team", "data-eng")],
block=True,
)
print(f"Backfill {result.backfill_id}: {result.status}")
print(f"Completed: {result.completed}/{result.num_partitions}")
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
selection |
list[str] |
All assets | Asset keys to backfill |
partition_keys |
list[PartitionKey] |
None | Explicit partition keys |
partition_range |
PartitionKeyRange |
None | Range of partition keys |
strategy |
BackfillStrategy |
None | How to group partitions into runs |
failure_policy |
str |
"continue" |
"continue" or "stop_on_failure" |
max_concurrency |
int |
4 |
Max concurrent runs |
tags |
list[tuple[str, str]] |
None | Tags to attach to the backfill and its runs |
config |
dict |
None | Per-asset config overrides |
block |
bool |
True |
Wait for completion |
dry_run |
bool |
False |
Preview without executing |
Dry-run preview¶
Set dry_run=True to see what a backfill would do without actually executing:
result = repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-31"),
dry_run=True,
)
print(f"Would create {result.num_runs} runs for {result.num_partitions} partitions")
print(f"Partition keys: {result.partition_keys}")
Failure policies¶
Control what happens when a partition fails:
continue(default) -- other partitions keep running. Failed partitions are recorded and can be retried.stop_on_failure-- stop the backfill immediately when any partition fails. Remaining partitions are marked as canceled.
repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-31"),
failure_policy="stop_on_failure",
)
Config overrides¶
Pass per-asset configuration overrides to the backfill:
repo.backfill(
selection=["daily_events"],
partition_range=rs.PartitionKeyRange.single("2024-01-01", "2024-01-31"),
config={"daily_events": {"batch_size": 5000}},
)
Asset-level backfill strategy¶
You can set a default backfill strategy on an asset using the backfill_strategy parameter on @Asset. This strategy is used when no explicit strategy is passed to repo.backfill().
@rs.Asset(
partitions_def=rs.PartitionsDefinition.daily(start=datetime(2024, 1, 1)),
backfill_strategy=rs.BackfillStrategy.single_run(),
)
def daily_events():
...
Strategy precedence¶
- Explicit
strategypassed torepo.backfill()(highest priority) backfill_strategyon the@Assetdecorator- Default (
MultiRun)
Monitoring backfills¶
Programmatic¶
# Check status of a running backfill
status = repo.get_backfill(result.backfill_id)
print(f"Status: {status.status}")
print(f"Progress: {status.completed_partitions}/{status.total_partitions}")
print(f"Failed: {status.failed_partitions}, canceled: {status.canceled_partitions}")
# Cancel a running backfill
repo.cancel_backfill(result.backfill_id)
# Re-launch failed/canceled partitions of a previous backfill
result = repo.rerun_backfill(result.backfill_id, block=True)
Web UI¶
The rivers web UI provides a dedicated Backfills page at /backfills that shows all backfills with their status, progress, and associated runs.