Skip to content

Partitions

PartitionKey

Identifies which partition(s) to materialize.

PartitionKey.single()

PartitionKey.single(key: str | list[str]) -> PartitionKey.Single

Create a single-dimension partition key.

# Single value
rs.PartitionKey.single("2024-01-15")

# Multiple values
rs.PartitionKey.single(["2024-01-15", "2024-01-16"])

PartitionKey.Single attributes:

Attribute Type
key list[str]

PartitionKey.multi()

PartitionKey.multi(keys: dict[str, str | list[str]]) -> PartitionKey.Multi

Create a multi-dimension partition key.

rs.PartitionKey.multi({"date": "2024-01-15", "region": ["us", "eu"]})

PartitionKey.Multi attributes:

Attribute Type
keys dict[str, list[str]]

PartitionsDefinition

Defines how an asset is partitioned.

PartitionsDefinition.static_()

PartitionsDefinition.static_(keys: list[str]) -> PartitionsDefinition.Static

Fixed set of partition keys.

rs.PartitionsDefinition.static_(["us", "eu", "asia"])

Static attributes: keys: list[str]

PartitionsDefinition.daily()

PartitionsDefinition.daily(
    start: datetime,
    end: datetime | None = None,
    fmt: str | None = None,
) -> PartitionsDefinition.TimeWindow

Daily partitions. Default format: "%Y-%m-%d".

PartitionsDefinition.hourly()

PartitionsDefinition.hourly(
    start: datetime,
    end: datetime | None = None,
    fmt: str | None = None,
) -> PartitionsDefinition.TimeWindow

Hourly partitions. Default format: "%Y-%m-%d-%H:%M".

PartitionsDefinition.time_window()

PartitionsDefinition.time_window(
    start: datetime,
    cron_schedule: str | None = None,
    interval_seconds: float | None = None,
    end: datetime | None = None,
    fmt: str | None = None,
) -> PartitionsDefinition.TimeWindow

Custom time-window partitions. Exactly one of cron_schedule or interval_seconds is required. cron_schedule accepts 5 fields (min hour dom mon dow) or 6 fields (sec min hour dom mon dow) — seconds are optional.

TimeWindow attributes:

Attribute Type
cron_schedule str \| None
interval_seconds float \| None
start datetime
end datetime \| None
fmt str

PartitionsDefinition.multi()

PartitionsDefinition.multi(
    dimensions: dict[str, PartitionsDefinition],
) -> PartitionsDefinition.Multi

Cartesian product of named child dimensions. Any number of dimensions is supported (Static, TimeWindow, Dynamic); the only restrictions are that there must be at least one dimension and no Multi may be nested inside another Multi.

Multi attributes: dimensions: list[tuple[str, PartitionsDefinition]]

PartitionsDefinition.dynamic()

PartitionsDefinition.dynamic(name: str) -> PartitionsDefinition.Dynamic

Runtime-extensible partitions. Keys are stored in Storage and managed via:

  • storage.add_dynamic_partitions(name, keys)
  • storage.delete_dynamic_partition(name, key)
  • storage.get_dynamic_partitions(name)
  • storage.has_dynamic_partition(name, key)
pd = rs.PartitionsDefinition.dynamic("customers")

@rs.Asset(partitions_def=pd)
def per_customer(context: rs.AssetExecutionContext):
    return load(context.partition_key)

repo.storage.add_dynamic_partitions("customers", ["acme", "globex"])

Dynamic attributes: name: str

Methods

def get_partition_keys(self) -> list[PartitionKey]

Enumerate all partition keys.

def validate_partition_key(self, key: PartitionKey) -> bool

Check if a key is valid for this definition.


PartitionContext

Runtime partition information available on IO context objects.

PartitionContext(keys: list[PartitionKey], definition: PartitionsDefinition)

Attributes:

Attribute Type Description
keys list[PartitionKey] All partition keys this step is responsible for.
key PartitionKey Convenience accessor for keys[0] — the canonical / first key.
definition PartitionsDefinition The definition the keys belong to.

Methods:

def time_window(self) -> tuple[datetime, datetime] | None

Returns the half-open (start, end) time window for the current key, or None if not a time-window partition.


PartitionKeyRange

An inclusive range of partition keys for backfills and lookups.

PartitionKeyRange.single()

PartitionKeyRange.single(from_key: str, to_key: str) -> PartitionKeyRange

Single-dimension range from from_key to to_key (inclusive).

PartitionKeyRange.multi()

PartitionKeyRange.multi(
    dimensions: dict[str, tuple[str, str] | list[str]],
) -> PartitionKeyRange

Multi-dimension range. Each dimension is either a (from, to) tuple or an explicit list of keys.


PartitionMapping

Controls how upstream partition keys map to downstream partition keys.

PartitionMapping.identity()

Same partition key passes through.

PartitionMapping.all_partitions()

All upstream partitions are loaded.

PartitionMapping.static_(mapping)

PartitionMapping.static_(mapping: dict[str, str]) -> PartitionMapping.Static

Explicit key-to-key mapping.

PartitionMapping.time_window(offset)

PartitionMapping.time_window(offset: int) -> PartitionMapping.TimeWindow

Offset by N time periods (e.g., -1 for previous day).

PartitionMapping.specific_partitions(partition_keys)

PartitionMapping.specific_partitions(partition_keys: list[str]) -> PartitionMapping.SpecificPartitions

Maps all downstream partitions to a specific set of upstream partition keys. Every downstream partition depends on the same named upstream partitions regardless of its own key.

# Downstream always depends on upstream partitions "a" and "b"
rs.PartitionMapping.specific_partitions(["a", "b"])

PartitionMapping.for_keys(selectors)

PartitionMapping.for_keys(
    selectors: list[PartitionKey | PartitionKeyRange],
) -> PartitionMapping.ForKeys

Maps an unpartitioned upstream to specific downstream partition keys. When the downstream partition key matches a selector, the upstream is loaded; otherwise the parameter receives None.

Only valid on edges where the downstream is partitioned and the upstream is unpartitioned.

# Load this upstream only for partition "a"
rs.PartitionMapping.for_keys([rs.PartitionKey.single("a")])

# Load for a range of time-window partitions
rs.PartitionMapping.for_keys([
    rs.PartitionKeyRange.single(from_key="2024-01-01", to_key="2024-06-30"),
])

ForKeys attributes:

Attribute Type
selectors list[PartitionKey \| PartitionKeyRange]

PartitionMapping.subset()

PartitionMapping.subset() -> PartitionMapping.Subset

Maps a partitioned upstream whose keys are a subset of the downstream's keys. When the downstream partition key exists in the upstream, the upstream is loaded with that key; otherwise the parameter receives None.

Only valid on edges where both sides are partitioned with the same partition type, and the upstream keys are a subset of the downstream keys.

rs.PartitionMapping.subset()

PartitionMapping.multi(dimension_mappings)

PartitionMapping.multi(
    dimension_mappings: dict[str, PartitionMapping | tuple[str, PartitionMapping]]
) -> PartitionMapping.Multi

Maps individual dimensions of a multi-dimensional partition. Each key is a dimension name, and the value is either a PartitionMapping (same-name dimension) or a (target_dimension_name, mapping) tuple (cross-dimension mapping).

PartitionMapping.multi_to_single(dimension_name)

PartitionMapping.multi_to_single(
    dimension_name: str,
    partition_mapping: PartitionMapping | None = None,
) -> PartitionMapping.MultiToSingle

Maps from a multi-dimensional upstream to a single-dimensional downstream by extracting one dimension. The optional partition_mapping is applied within that dimension.