Delta Lake
DeltaIOHandler
Persists asset outputs as Delta Lake tables.
from rivers.io_handlers.delta import DeltaIOHandler
io = DeltaIOHandler(
table_uri="/data/delta",
mode="overwrite",
)
Constructor:
| Parameter |
Type |
Default |
Description |
table_uri |
str |
required |
Base URI for Delta tables. Each asset creates a subdirectory. |
mode |
str |
"overwrite" |
Write mode: "overwrite", "append", "error", "ignore", "merge", "create_or_replace". |
schema_mode |
str \| None |
None |
Schema evolution: "overwrite" or "merge". |
storage_options |
dict[str, str] \| None |
None |
Credentials for remote storage. |
writer_properties |
WriterProperties \| None |
None |
Parquet writer settings. |
commit_properties |
CommitProperties \| None |
None |
Delta commit settings. |
table_config |
dict[str, str] \| None |
None |
Delta table properties. |
merge_config |
MergeConfig \| None |
None |
Merge configuration (required when mode="merge"). |
Asset metadata overrides:
These metadata keys override handler defaults per-asset:
| Key |
Type |
Description |
delta/mode |
str |
Write mode override. |
delta/schema_mode |
str |
Schema mode override. |
delta/partition_expr |
str \| JSON dict |
Partition column mapping. |
delta/table_configuration |
JSON str |
Table properties override. |
delta/writer_properties |
JSON str |
Writer properties override. |
delta/commit_properties |
JSON str |
Commit properties override. |
delta/merge_predicate |
str |
Override merge predicate for this asset. |
delta/columns |
JSON list |
Column selection for reads. |
delta/version |
str |
Table version for time travel reads. |
Output metadata:
| Key |
Type |
Description |
delta/table_uri |
str |
Full table URI. |
delta/mode |
str |
Write mode used. |
delta/num_rows |
int |
Total rows in table after write. |
delta/size_bytes |
int |
Total table size in bytes. |
delta/write_duration_s |
float |
Write duration in seconds. |
delta/version |
int |
Delta table version after write. |
rivers/schema |
Schema |
Arrow schema of the written table. |
DeltaTypeHandler
Abstract base for adding type support to DeltaIOHandler.
from rivers.io_handlers.delta.base import DeltaTypeHandler
class MyTypeHandler(DeltaTypeHandler[MyType]):
@property
def supported_types(self) -> Sequence[type[MyType]]:
return [MyType]
def to_arrow(self, obj: MyType) -> RecordBatchReader:
...
def load_input(self, table_uri, storage_options, predicate,
target_type, columns=None, version=None) -> MyType:
...
Abstract members:
| Member |
Description |
supported_types |
Property returning list of types this handler supports. |
to_arrow(obj) |
Convert object to arro3.core.RecordBatchReader. |
load_input(...) |
Load data from a Delta table. |
PartitionExpr
Maps partition dimensions to Delta table column names.
from rivers.io_handlers.delta import PartitionExpr
# Single dimension
expr = PartitionExpr(expr="date")
# Multi-dimensional
expr = PartitionExpr(expr={"date": "event_date", "region": "region_code"})
Attributes:
| Attribute |
Type |
Description |
expr |
str \| dict[str, str] |
Column name or dimension-to-column mapping. |
Properties:
| Property |
Type |
Description |
partition_columns |
list[str] |
List of Delta column names. |
MergeConfig
Configuration for MERGE INTO operations.
from rivers.io_handlers.delta import MergeConfig
config = MergeConfig(
merge_type="upsert",
predicate="s.id = t.id",
source_alias="s",
target_alias="t",
)
Attributes:
| Attribute |
Type |
Default |
Description |
merge_type |
str |
required |
One of: "deduplicate_insert", "update_only", "upsert", "replace_delete_unmatched", "custom". |
predicate |
str |
required |
SQL merge condition. |
source_alias |
str |
"s" |
Alias for the source table. |
target_alias |
str |
"t" |
Alias for the target table. |
error_on_type_mismatch |
bool |
True |
Fail if source/target schemas differ. |
operations |
MergeOperationsConfig \| None |
None |
Required when merge_type="custom". |
MergeOperationsConfig
Fine-grained control over MERGE clauses.
Attributes:
| Attribute |
Type |
when_not_matched_insert |
list[WhenNotMatchedInsert] \| None |
when_not_matched_insert_all |
list[WhenNotMatchedInsertAll] \| None |
when_matched_update |
list[WhenMatchedUpdate] \| None |
when_matched_update_all |
list[WhenMatchedUpdateAll] \| None |
when_matched_delete |
list[WhenMatchedDelete] \| None |
when_not_matched_by_source_delete |
list[WhenNotMatchedBySourceDelete] \| None |
when_not_matched_by_source_update |
list[WhenNotMatchedBySourceUpdate] \| None |
Merge operation classes
WhenNotMatchedInsert
| Attribute |
Type |
predicate |
str \| None |
updates |
dict[str, str] |
WhenNotMatchedInsertAll
| Attribute |
Type |
predicate |
str \| None |
except_cols |
list[str] \| None |
WhenMatchedUpdate
| Attribute |
Type |
predicate |
str \| None |
updates |
dict[str, str] |
WhenMatchedUpdateAll
| Attribute |
Type |
predicate |
str \| None |
except_cols |
list[str] \| None |
WhenMatchedDelete
| Attribute |
Type |
predicate |
str \| None |
WhenNotMatchedBySourceDelete
| Attribute |
Type |
predicate |
str \| None |
WhenNotMatchedBySourceUpdate
| Attribute |
Type |
predicate |
str \| None |
updates |
dict[str, str] |