Skip to content

Delta Lake

The DeltaIOHandler persists asset outputs as Delta Lake tables. It supports PyArrow and Polars data types, partitioned writes, and full MERGE INTO operations.

Setup

pip install rivers[delta]

# Plus at least one of:
pip install rivers[pyarrow]
pip install rivers[polars]

Basic usage

import polars as pl
import rivers as rs
from rivers.io_handlers.delta import DeltaIOHandler

io = DeltaIOHandler(table_uri="/data/delta")

@rs.Asset(io_handler=io)
def users() -> pl.DataFrame:
    return pl.DataFrame({
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Carol"],
    })

The handler creates a Delta table at /data/delta/users/.

Supported types

Type Extra
pyarrow.Table rivers[pyarrow]
pyarrow.RecordBatchReader rivers[pyarrow]
polars.DataFrame rivers[polars]
polars.LazyFrame rivers[polars]

The type is detected automatically from the object passed to handle_output, and load_input uses the type_hint from the downstream parameter annotation.

Write modes

Set the default mode on the handler, or override per-asset via metadata:

# Handler-level default
io = DeltaIOHandler(table_uri="/data/delta", mode="append")

# Per-asset override
@rs.Asset(io_handler=io, metadata={"delta/mode": "overwrite"})
def events() -> pl.DataFrame:
    ...
Mode Behavior
overwrite Replace the table (or partition)
append Add rows to existing table
error Fail if table exists
ignore Skip write if table exists
merge MERGE INTO (see below)
create_or_replace Drop and recreate schema, then append

Partitioned writes

Combine with PartitionsDefinition and partition_expr:

@rs.Asset(
    io_handler=io,
    partitions_def=rs.PartitionsDefinition.daily(start=datetime(2024, 1, 1)),
    metadata={"delta/partition_expr": "date"},
)
def daily_events() -> pl.DataFrame:
    ...

When writing in overwrite mode with a partition key, the handler generates a predicate to overwrite only the target partition.

Merge operations

For upserts, deduplication, and other MERGE INTO patterns:

from rivers.io_handlers.delta import DeltaIOHandler, MergeConfig

io = DeltaIOHandler(
    table_uri="/data/delta",
    mode="merge",
    merge_config=MergeConfig(
        merge_type="upsert",
        predicate="s.id = t.id",
    ),
)

Merge types

Type Behavior
upsert Update matched rows, insert unmatched
deduplicate_insert Insert only unmatched rows
update_only Update matched rows only
replace_delete_unmatched Update matched, delete unmatched in target
custom Full control via MergeOperationsConfig

Custom merge operations

from rivers.io_handlers.delta import (
    MergeConfig,
    MergeOperationsConfig,
    WhenMatchedUpdateAll,
    WhenNotMatchedInsertAll,
)

io = DeltaIOHandler(
    table_uri="/data/delta",
    mode="merge",
    merge_config=MergeConfig(
        merge_type="custom",
        predicate="s.id = t.id",
        operations=MergeOperationsConfig(
            when_matched_update_all=[
                WhenMatchedUpdateAll(predicate="s.updated_at > t.updated_at"),
            ],
            when_not_matched_insert_all=[
                WhenNotMatchedInsertAll(),
            ],
        ),
    ),
)

Storage options

Pass storage credentials for remote backends:

io = DeltaIOHandler(
    table_uri="s3://my-bucket/delta",
    storage_options={
        "aws_region": "us-east-1",
        "aws_access_key_id": "...",
        "aws_secret_access_key": "...",
    },
)

Table configuration

Set Delta table properties:

io = DeltaIOHandler(
    table_uri="/data/delta",
    table_config={
        "delta.deletedFileRetentionDuration": "interval 30 days",
    },
)

Override per-asset via metadata:

@rs.Asset(
    io_handler=io,
    metadata={
        "delta/table_configuration": '{"delta.enableChangeDataFeed": "true"}',
    },
)
def events() -> pl.DataFrame:
    ...

Reading data

Downstream assets read data by type annotation:

@rs.Asset(io_handler=io)
def summary(users: pl.DataFrame) -> pl.DataFrame:
    return users.group_by("region").len()

Column selection

Load only specific columns:

@rs.Asset(
    io_handler=io,
    metadata={"delta/columns": '["id", "name"]'},
)
def user_names(users: pl.DataFrame) -> pl.DataFrame:
    ...

Time travel

Read a specific table version:

@rs.Asset(io_handler=io, metadata={"delta/version": "3"})
def historical(users: pl.DataFrame) -> pl.DataFrame:
    ...