Concurrency Control Flow¶
rivers has four distinct concurrency control layers, each operating at a different scope.
Overview¶
| Layer | Scope | Configured via | Enforced by |
|---|---|---|---|
| max_concurrent_runs | How many runs execute at once (global) | RunQueueConfig.max_concurrent_runs |
Coordinator — won't dequeue if at capacity |
| Tag concurrency limits | How many runs with a given tag run at once | RunQueueConfig.tag_concurrency_limits |
Coordinator — skips queued runs whose tags are at limit |
| Executor step parallelism | How many steps execute in parallel within one run | Executor.parallel(max_workers, max_async_concurrent), Executor.kubernetes(max_concurrent_steps) |
Executor — sized worker pool / step-pod scheduler |
| Pools | How many steps use a resource at once (within/across runs) | @Asset(pool="X") + storage.set_pool_limit("X", N) |
Executor — PoolGuard blocks before step execution |
The first two are run-level gates (before execution starts). The other two are step-level gates (during execution). A run can pass the coordinator's checks but its individual steps still wait for the executor's step budget and pool slots. Executor step parallelism is per-run — pools are global across runs.
Flow Diagram¶
USER / SCHEDULER / SENSOR
│
┌──────────────┴──────────────┐
│ │
repo.materialize() daemon.submit_run()
(direct execution) (queued execution)
│ │
│ ┌────────▼────────┐
│ │ RunRecord │
│ │ status: Queued │
│ │ tags: [...] │
│ │ priority: N │
│ └────────┬────────┘
│ │
│ ┌──────────────▼──────────────┐
│ │ RunQueueCoordinator.tick() │
│ │ (daemon polling loop) │
│ ├─────────────────────────────┤
│ │ │
│ │ 1. GC expired pool leases │
│ │ │
│ │ 2. Check max_concurrent_runs│
│ │ in_progress >= max? │
│ │ YES → skip cycle │
│ │ │
│ │ 3. Tag concurrency limits │
│ │ for each queued run: │
│ │ - count tags of │
│ │ in-progress runs │
│ │ - if run's tags exceed │
│ │ TagConcurrencyLimit │
│ │ → skip (stays Queued) │
│ │ - else → dequeue it │
│ │ │
│ │ 4. Queued → NotStarted │
│ └──────────┬──────────────────┘
│ │
│ daemon picks up NotStarted runs
│ and calls materialize(run_id=...)
│ │
├─────────────────────────┘
│
▼
┌─────────────────────┐
│ executor.execute_plan() LEVEL 1 LEVEL 2
│ │ ┌──────────┐ ┌──────────┐
│ steps grouped │ │ step_a │ │ step_c │
│ by DAG level │────────▶│ step_b │───▶│ step_d │──▶ ...
│ │ │ (parallel│ │ │
│ │ │ within) │ │ │
└─────────────────────┘ └────┬─────┘ └──────────┘
│
┌───────────────────┤ for each step:
│ │
▼ ▼
no pool config? has @Asset(pool="X")?
────────────── ─────────────────────
execute directly ┌──────────────────────────┐
│ PoolGuard.acquire() │
│ │
│ claim_async_poll(): │
│ storage.claim_slots() │
│ Claimed? → continue │
│ Pending? → sleep+retry│
│ (configurable via │
│ RIVERS_ env vars) │
│ │
│ spawn lease renewal │
│ (every lease_dur/3) │
└───────────┬──────────────┘
│
▼
execute the step
(InProcess / loky /
async JoinSet)
│
▼
┌──────────────────────────┐
│ PoolGuard.release() │
│ - abort renewal task │
│ - free_concurrency_slots│
└──────────────────────────┘
│
▼
┌────────────────────┐
│ end of execute_plan│
│ defense-in-depth: │
│ free_slots_for_run │
└────────────────────┘
Layer Details¶
1. max_concurrent_runs (Global Run Gate)¶
The simplest limit: at most N runs execute simultaneously. The coordinator checks in_progress_count >= max before dequeuing anything. If at capacity, the entire tick is skipped.
- Default: 10
- Config:
RunQueueConfig(max_concurrent_runs=10) - Set to -1 for unlimited.
2. Tag Concurrency Limits (Per-Tag Run Gate)¶
Runs carry tags (key-value pairs). Tag concurrency limits restrict how many runs with a specific tag can be in-progress at once. The coordinator scans queued runs in priority order, skipping any whose tags would exceed a limit.
- Config:
TagConcurrencyLimit(key="env", value="prod", limit=2)— at most 2 runs taggedenv:prod - Per-unique-value mode:
TagConcurrencyLimit(key="tenant", per_unique_value=True, limit=1)— at most 1 run per distinct tenant value - Blocked runs stay
Queuedand are re-evaluated on the next tick.
3. Executor Step Parallelism (Per-Run Step Cap)¶
Within a single run, the executor controls how many steps from the same DAG level run in parallel. This is independent from the coordinator (which gates run admission) and from pools (which gate steps that share a resource across runs).
Executor.parallel(max_workers, max_async_concurrent) — the loky-backed executor.
max_workers(defaultos.cpu_count()) — subprocess pool size for sync steps. Sync steps from the same level fan out across this pool; if the level is wider thanmax_workers, the surplus queues inside the pool until a worker is free.max_async_concurrent(default unbounded) — cap on concurrently-scheduled async-task steps. Useful when async steps hit a rate-limited service.
Executor.kubernetes(..., max_concurrent_steps=N) — the K8s-backed executor.
max_concurrent_steps(default unbounded) — cap on concurrently scheduled step pods within a single run. Without this, a wide DAG level can saturate node capacity, image-pull bandwidth, or your control-plane API quota.
Executor.in_process() has no knobs — steps run serially in the calling process (effective parallelism = 1).
These caps stack with pools rather than replacing them. If max_workers=8 and a step claims a pool with limit=2, that step is bounded to 2 concurrent instances (pool wins) while other steps from the same level continue filling the remaining 6 worker slots.
The executor cap is also per-run, so two runs each with max_workers=8 can together produce 16 concurrent sync steps. If you need a global step cap across runs, use a pool instead.
4. Pools (Step-Level Resource Gate)¶
Pools limit how many steps use a shared resource concurrently, within and across runs. Unlike the run-level gates, pools are enforced during execution by the executor.
Configuration:
# On the asset
@Asset(pool="database", pool_slots=2)
def heavy_query(): ...
# On storage (before or during execution)
storage.set_pool_limit("database", limit=5, lease_duration="5m")
Claim protocol:
- All-or-none atomic claim across multiple pools (multi-pool steps)
- Sentinel transaction prevents concurrent claims from exceeding limits
- Lease-based expiry: crashed processes stop renewing, slots auto-expire
- Background renewal at
lease_duration / 3intervals viaPoolGuard - Run-level cleanup (
free_concurrency_slots_for_run) at end of execution as defense-in-depth - Coordinator GC (
free_expired_leases) on every tick sweeps stale rows
Environment variables:
The claim polling loop can be tuned via environment variables. Values are human-readable durations (e.g. 30s, 5m, 1h30m), parsed by the humantime crate.
| Variable | Default | Description |
|---|---|---|
RIVERS_CLAIM_TIMEOUT |
10m |
Maximum time a step will wait for pool slots before failing. |
RIVERS_CLAIM_POLL_INTERVAL |
1s |
Base interval between claim attempts. |
RIVERS_CLAIM_POLL_JITTER |
500ms |
Maximum random jitter added per attempt to avoid correlated retries. Set to 0ms to disable. |
With the defaults, a step waiting for pool slots will timeout after 10 minutes. To allow longer waits (e.g. for long-running batch workloads), increase RIVERS_CLAIM_TIMEOUT. To fail faster, lower it.
Per-backend behavior:
| Backend | Pool step execution |
|---|---|
| InProcess | Sequential: claim → execute → release per step |
| Async | Each JoinSet task acquires its own PoolGuard concurrently |
| Parallel | Pool steps run in loky subprocesses via claim-gated JoinSet; non-pool steps use the standard batch pipeline |
Mapped (fan-out) steps claim per instance, not per parent step.
Observability Events (Phase 3a)¶
Every concurrency lifecycle transition emits a storage event through the EventWriter pipeline:
| Event | Emitted by | Metadata |
|---|---|---|
RunQueued |
submit_run, automation conditions |
priority |
RunDequeued |
Coordinator tick | priority |
StepSlotClaimed |
PoolGuard (after successful claim) | pools (comma-separated) |
StepSlotWaiting |
PoolGuard (first pending attempt) | reason (block reason) |
StepSlotRenewed |
PoolGuard lease renewal task | — |
StepSlotReleased |
PoolGuard release | — |
Block reasons are persisted on RunRecord.block_reason by the coordinator when a queued run is blocked by global run limits or tag concurrency limits. Cleared on dequeue.
CLI Commands (Phase 3b)¶
Operators can inspect and manage queues and pools from the terminal:
Pool commands¶
| Command | Description |
|---|---|
rivers pools list |
List all configured pools with limit, claimed/pending counts, lease duration |
rivers pools info <pool> |
Detailed pool info including active slot holders (run_id, step_key, slots, lease expiry) |
rivers pools set <pool> <limit> |
Set (upsert) pool slot limit and lease duration |
Queue commands¶
| Command | Description |
|---|---|
rivers queue list |
List all queued runs sorted by priority (desc) then queue time (asc), with block reason |
rivers queue cancel <run_id> |
Cancel a queued run (Queued → Canceled) |
rivers queue why <run_id> |
Show why a run is queued: block reason, queue position, priority, tags |
All commands accept --storage-path (default .rivers/storage/) to specify the embedded storage location.
UI Dashboards (Phase 3c)¶
The rivers-ui web interface provides full visibility into concurrency pool state and the run queue.
Pool Dashboard (/pools)¶
- Summary stats: total pools, claimed/total slots, pending step count
- Pool cards: each pool shows a utilization bar (green <60%, yellow 60-85%, red >85%), claimed/limit ratio, lease duration, pending count badge
- Slot holders: expandable per-pool table showing active holders (run_id, step_key, slots consumed, claimed timestamp, lease expiry with color-coded warning for near-expiry/expired leases)
Queue View (/queue)¶
- Queued runs list: sorted by priority (descending) then queue time (ascending)
- Each run shows queue position, run_id, job name, priority badge, queued-since time, block reason (if blocked), and asset chips
- Auto-refreshes every 5 seconds
Run Detail — Concurrency Tab¶
The run detail page (/runs/:id) includes a Concurrency tab showing:
- Run-level queue events:
RunQueuedandRunDequeuedevents with timestamps - Per-step concurrency events: grouped by asset, showing
StepSlotClaimed,StepSlotWaiting,StepSlotReleased, andStepSlotRenewedevents with block reasons - Block reason display: queued runs show their block reason in the run header detail grid