or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

incremental.mddocs/

Incremental Loading

Configure incremental data loading with cursor-based tracking, deduplication, and state management for efficient data synchronization.

Capabilities

Incremental Class

class Incremental:
    """
    Configures incremental loading for a resource using cursor-based tracking.

    Args:
        cursor_path: JSON path to cursor field in data items (e.g., "updated_at", "$.id", "data.timestamp")
        initial_value: Starting cursor value (used on first run)
        last_value_func: Function to determine last cursor value (max, min, or custom callable)
        primary_key: Primary key column(s) for deduplication with merge write disposition
        end_value: End cursor value for bounded incremental loading
        row_order: Row ordering relative to cursor ("asc" or "desc")
        allow_external_schedulers: Enable Airflow scheduler integration
        on_cursor_value_missing: Action when cursor field missing in item:
            - "raise": Raise exception (default)
            - "include": Include item without cursor
            - "exclude": Exclude item without cursor
        lag: Lag/attribution window as timedelta or numeric value
        range_start: Start boundary type ("open" or "closed")
        range_end: End boundary type ("open" or "closed")

    Attributes:
        last_value: Current cursor position (read-only)
    """

    def __init__(
        self,
        cursor_path: str,
        initial_value: Any = None,
        last_value_func: Callable[[Any], Any] = max,
        primary_key: Union[str, List[str]] = None,
        end_value: Any = None,
        row_order: Literal["asc", "desc"] = "asc",
        allow_external_schedulers: bool = False,
        on_cursor_value_missing: Literal["raise", "include", "exclude"] = "raise",
        lag: Union[timedelta, int, float] = None,
        range_start: Literal["open", "closed"] = "closed",
        range_end: Literal["open", "closed"] = "closed"
    ): ...

    @property
    def last_value(self) -> Any:
        """
        Current cursor value.

        Returns:
            Last cursor value from state
        """

    @classmethod
    def from_existing_state(
        cls,
        source_name: str,
        resource_name: str,
        cursor_path: str = None
    ) -> "Incremental":
        """
        Creates Incremental from existing pipeline state.

        Args:
            source_name: Source name
            resource_name: Resource name
            cursor_path: Cursor path (optional if already in state)

        Returns:
            Incremental instance with restored state
        """

    def merge(self, other: "Incremental") -> "Incremental":
        """
        Merges two Incremental instances.

        Args:
            other: Another Incremental instance

        Returns:
            Merged Incremental instance
        """

    def copy(self) -> "Incremental":
        """
        Creates a deep copy.

        Returns:
            Copied Incremental instance
        """

    def get_cursor_column_name(self) -> str:
        """
        Gets the cursor column name.

        Returns:
            Column name extracted from cursor_path
        """

Incremental Alias

# Commonly imported as incremental (lowercase)
from dlt.sources import incremental
# Same as: from dlt.extract.incremental import Incremental

Usage Examples

Basic Incremental Loading

import dlt
from dlt.sources import incremental

@dlt.resource
def users(updated_at=incremental("updated_at")):
    # On first run: loads all data
    # On subsequent runs: loads only data where updated_at > last_value
    last_update = updated_at.last_value or "1970-01-01"

    for user in fetch_users(since=last_update):
        yield user

pipeline = dlt.pipeline(destination="duckdb", dataset_name="users_db")
pipeline.run(users())

Incremental with Merge

@dlt.resource(
    write_disposition="merge",
    primary_key="id"
)
def orders(created_at=incremental("created_at")):
    """
    Loads new orders and updates existing ones.
    Uses merge write disposition to deduplicate by primary key.
    """
    last_created = created_at.last_value

    for order in fetch_orders(since=last_created):
        yield order

pipeline.run(orders())

Incremental with Initial Value

from datetime import datetime, timedelta

@dlt.resource
def events(
    event_time=incremental(
        "event_time",
        initial_value=datetime(2024, 1, 1)
    )
):
    """
    Starts loading from 2024-01-01 on first run.
    """
    for event in fetch_events(since=event_time.last_value):
        yield event

Descending Incremental (Min Function)

@dlt.resource
def decreasing_ids(
    id=incremental(
        "id",
        initial_value=1000000,
        last_value_func=min,
        row_order="desc"
    )
):
    """
    Loads items with decreasing IDs.
    Uses min() to track the lowest seen ID.
    """
    for item in fetch_items_before(id.last_value):
        yield item

Bounded Incremental Loading

from datetime import datetime

@dlt.resource
def historical_data(
    timestamp=incremental(
        "timestamp",
        initial_value=datetime(2024, 1, 1),
        end_value=datetime(2024, 12, 31)
    )
):
    """
    Loads data between start and end dates.
    Stops when cursor reaches end_value.
    """
    for record in fetch_records(
        start=timestamp.last_value,
        end=datetime(2024, 12, 31)
    ):
        yield record

Incremental with Lag

from datetime import timedelta

@dlt.resource
def delayed_events(
    event_time=incremental(
        "event_time",
        lag=timedelta(hours=2)
    )
):
    """
    Applies 2-hour attribution window.
    Only loads events older than (now - 2 hours) to account for late-arriving data.
    """
    for event in fetch_events(until=event_time.last_value):
        yield event

Handling Missing Cursor Values

@dlt.resource
def optional_timestamps(
    updated=incremental(
        "updated_at",
        on_cursor_value_missing="include"  # Include items without cursor
    )
):
    """
    Includes items even if they don't have the cursor field.
    Options: "raise" (error), "include", "exclude"
    """
    for item in fetch_items():
        # Some items may not have "updated_at" field
        yield item

Nested Cursor Paths

@dlt.resource
def nested_data(
    cursor=incremental("$.metadata.last_modified")
):
    """
    Uses JSON path to access nested cursor field.
    """
    for item in fetch_data():
        # item structure:
        # {
        #   "id": 1,
        #   "metadata": {
        #     "last_modified": "2024-01-01T00:00:00Z"
        #   }
        # }
        yield item

Airflow Integration

@dlt.resource
def scheduled_data(
    timestamp=incremental(
        "timestamp",
        allow_external_schedulers=True
    )
):
    """
    Allows Airflow to manage incremental state.
    State is passed via Airflow's execution context.
    """
    for record in fetch_scheduled_records(since=timestamp.last_value):
        yield record

Custom Last Value Function

def custom_cursor_func(values):
    """
    Custom function to determine cursor value.
    Could implement median, mode, or business-specific logic.
    """
    return sorted(values)[len(values) // 2]  # Median

@dlt.resource
def custom_incremental(
    value=incremental(
        "value",
        last_value_func=custom_cursor_func
    )
):
    for item in fetch_items():
        yield item

Multiple Incremental Cursors

@dlt.resource
def multi_cursor(
    created=incremental("created_at"),
    updated=incremental("updated_at")
):
    """
    Track multiple cursor fields independently.
    """
    for record in fetch_records(
        created_since=created.last_value,
        updated_since=updated.last_value
    ):
        yield record

Range Boundaries

@dlt.resource
def range_data(
    id=incremental(
        "id",
        initial_value=100,
        range_start="open",   # Excludes start value (>)
        range_end="closed"    # Includes end value (<=)
    )
):
    """
    Controls whether boundaries are inclusive or exclusive.
    - "closed": Includes boundary value (>=, <=)
    - "open": Excludes boundary value (>, <)
    """
    # Will load: id > last_value (open start)
    for item in fetch_items(id_gt=id.last_value):
        yield item

Accessing State Directly

@dlt.resource
def stateful_resource(updated=incremental("updated_at")):
    pipeline = dlt.current.pipeline()

    # Access pipeline state
    custom_state = pipeline.state.get("my_custom_value", 0)

    # Incremental state is stored in pipeline.state
    last_cursor = updated.last_value

    for item in fetch_items(since=last_cursor):
        yield item

    # Update custom state
    pipeline.state["my_custom_value"] = custom_state + 1

Incremental with Transformers

@dlt.resource
def raw_events(timestamp=incremental("timestamp")):
    for event in fetch_raw_events(since=timestamp.last_value):
        yield event

@dlt.transformer(data_from=raw_events)
def processed_events(event, timestamp=incremental("processed_at")):
    """
    Transformer with its own incremental state.
    """
    processed = process_event(event)
    processed["processed_at"] = datetime.now()
    yield processed

Incremental State Persistence

Incremental state is automatically persisted:

  1. Local Storage: Saved in .dlt directory (development)
  2. Destination Storage: Synced to destination _dlt_state table (production)
  3. Automatic Sync: State syncs after successful loads
  4. Per-Resource: Each resource maintains independent state

Error Handling

from dlt.extract.incremental import (
    IncrementalCursorPathMissing,
    IncrementalPrimaryKeyMissing,
    IncrementalUnboundError
)

try:
    pipeline.run(my_incremental_resource())
except IncrementalCursorPathMissing:
    # Cursor path not found in data items
    pass
except IncrementalPrimaryKeyMissing:
    # Primary key required for merge but not provided
    pass
except IncrementalUnboundError:
    # Incremental used without upper bound when required
    pass