Delta Lake¶
The DeltaIOHandler persists asset outputs as Delta Lake tables. It supports PyArrow and Polars data types, partitioned writes, and full MERGE INTO operations.
Setup¶
pip install rivers[delta]
# Plus at least one of:
pip install rivers[pyarrow]
pip install rivers[polars]
Basic usage¶
import polars as pl
import rivers as rs
from rivers.io_handlers.delta import DeltaIOHandler
io = DeltaIOHandler(table_uri="/data/delta")
@rs.Asset(io_handler=io)
def users() -> pl.DataFrame:
return pl.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
})
The handler creates a Delta table at /data/delta/users/.
Supported types¶
| Type | Extra |
|---|---|
pyarrow.Table |
rivers[pyarrow] |
pyarrow.RecordBatchReader |
rivers[pyarrow] |
polars.DataFrame |
rivers[polars] |
polars.LazyFrame |
rivers[polars] |
The type is detected automatically from the object passed to handle_output, and load_input uses the type_hint from the downstream parameter annotation.
Write modes¶
Set the default mode on the handler, or override per-asset via metadata:
# Handler-level default
io = DeltaIOHandler(table_uri="/data/delta", mode="append")
# Per-asset override
@rs.Asset(io_handler=io, metadata={"delta/mode": "overwrite"})
def events() -> pl.DataFrame:
...
| Mode | Behavior |
|---|---|
overwrite |
Replace the table (or partition) |
append |
Add rows to existing table |
error |
Fail if table exists |
ignore |
Skip write if table exists |
merge |
MERGE INTO (see below) |
create_or_replace |
Drop and recreate schema, then append |
Partitioned writes¶
Combine with PartitionsDefinition and partition_expr:
@rs.Asset(
io_handler=io,
partitions_def=rs.PartitionsDefinition.daily(start=datetime(2024, 1, 1)),
metadata={"delta/partition_expr": "date"},
)
def daily_events() -> pl.DataFrame:
...
When writing in overwrite mode with a partition key, the handler generates a predicate to overwrite only the target partition.
Merge operations¶
For upserts, deduplication, and other MERGE INTO patterns:
from rivers.io_handlers.delta import DeltaIOHandler, MergeConfig
io = DeltaIOHandler(
table_uri="/data/delta",
mode="merge",
merge_config=MergeConfig(
merge_type="upsert",
predicate="s.id = t.id",
),
)
Merge types¶
| Type | Behavior |
|---|---|
upsert |
Update matched rows, insert unmatched |
deduplicate_insert |
Insert only unmatched rows |
update_only |
Update matched rows only |
replace_delete_unmatched |
Update matched, delete unmatched in target |
custom |
Full control via MergeOperationsConfig |
Custom merge operations¶
from rivers.io_handlers.delta import (
MergeConfig,
MergeOperationsConfig,
WhenMatchedUpdateAll,
WhenNotMatchedInsertAll,
)
io = DeltaIOHandler(
table_uri="/data/delta",
mode="merge",
merge_config=MergeConfig(
merge_type="custom",
predicate="s.id = t.id",
operations=MergeOperationsConfig(
when_matched_update_all=[
WhenMatchedUpdateAll(predicate="s.updated_at > t.updated_at"),
],
when_not_matched_insert_all=[
WhenNotMatchedInsertAll(),
],
),
),
)
Storage options¶
Pass storage credentials for remote backends:
io = DeltaIOHandler(
table_uri="s3://my-bucket/delta",
storage_options={
"aws_region": "us-east-1",
"aws_access_key_id": "...",
"aws_secret_access_key": "...",
},
)
Table configuration¶
Set Delta table properties:
io = DeltaIOHandler(
table_uri="/data/delta",
table_config={
"delta.deletedFileRetentionDuration": "interval 30 days",
},
)
Override per-asset via metadata:
@rs.Asset(
io_handler=io,
metadata={
"delta/table_configuration": '{"delta.enableChangeDataFeed": "true"}',
},
)
def events() -> pl.DataFrame:
...
Reading data¶
Downstream assets read data by type annotation:
@rs.Asset(io_handler=io)
def summary(users: pl.DataFrame) -> pl.DataFrame:
return users.group_by("region").len()
Column selection¶
Load only specific columns:
@rs.Asset(
io_handler=io,
metadata={"delta/columns": '["id", "name"]'},
)
def user_names(users: pl.DataFrame) -> pl.DataFrame:
...
Time travel¶
Read a specific table version: