Configure incremental data loading with cursor-based tracking, deduplication, and state management for efficient data synchronization.
class Incremental:
"""
Configures incremental loading for a resource using cursor-based tracking.
Args:
cursor_path: JSON path to cursor field in data items (e.g., "updated_at", "$.id", "data.timestamp")
initial_value: Starting cursor value (used on first run)
last_value_func: Function to determine last cursor value (max, min, or custom callable)
primary_key: Primary key column(s) for deduplication with merge write disposition
end_value: End cursor value for bounded incremental loading
row_order: Row ordering relative to cursor ("asc" or "desc")
allow_external_schedulers: Enable Airflow scheduler integration
on_cursor_value_missing: Action when cursor field missing in item:
- "raise": Raise exception (default)
- "include": Include item without cursor
- "exclude": Exclude item without cursor
lag: Lag/attribution window as timedelta or numeric value
range_start: Start boundary type ("open" or "closed")
range_end: End boundary type ("open" or "closed")
Attributes:
last_value: Current cursor position (read-only)
"""
def __init__(
self,
cursor_path: str,
initial_value: Any = None,
last_value_func: Callable[[Any], Any] = max,
primary_key: Union[str, List[str]] = None,
end_value: Any = None,
row_order: Literal["asc", "desc"] = "asc",
allow_external_schedulers: bool = False,
on_cursor_value_missing: Literal["raise", "include", "exclude"] = "raise",
lag: Union[timedelta, int, float] = None,
range_start: Literal["open", "closed"] = "closed",
range_end: Literal["open", "closed"] = "closed"
): ...
@property
def last_value(self) -> Any:
"""
Current cursor value.
Returns:
Last cursor value from state
"""
@classmethod
def from_existing_state(
cls,
source_name: str,
resource_name: str,
cursor_path: str = None
) -> "Incremental":
"""
Creates Incremental from existing pipeline state.
Args:
source_name: Source name
resource_name: Resource name
cursor_path: Cursor path (optional if already in state)
Returns:
Incremental instance with restored state
"""
def merge(self, other: "Incremental") -> "Incremental":
"""
Merges two Incremental instances.
Args:
other: Another Incremental instance
Returns:
Merged Incremental instance
"""
def copy(self) -> "Incremental":
"""
Creates a deep copy.
Returns:
Copied Incremental instance
"""
def get_cursor_column_name(self) -> str:
"""
Gets the cursor column name.
Returns:
Column name extracted from cursor_path
"""# Commonly imported as incremental (lowercase)
from dlt.sources import incremental
# Same as: from dlt.extract.incremental import Incrementalimport dlt
from dlt.sources import incremental
@dlt.resource
def users(updated_at=incremental("updated_at")):
# On first run: loads all data
# On subsequent runs: loads only data where updated_at > last_value
last_update = updated_at.last_value or "1970-01-01"
for user in fetch_users(since=last_update):
yield user
pipeline = dlt.pipeline(destination="duckdb", dataset_name="users_db")
pipeline.run(users())@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def orders(created_at=incremental("created_at")):
"""
Loads new orders and updates existing ones.
Uses merge write disposition to deduplicate by primary key.
"""
last_created = created_at.last_value
for order in fetch_orders(since=last_created):
yield order
pipeline.run(orders())from datetime import datetime, timedelta
@dlt.resource
def events(
event_time=incremental(
"event_time",
initial_value=datetime(2024, 1, 1)
)
):
"""
Starts loading from 2024-01-01 on first run.
"""
for event in fetch_events(since=event_time.last_value):
yield event@dlt.resource
def decreasing_ids(
id=incremental(
"id",
initial_value=1000000,
last_value_func=min,
row_order="desc"
)
):
"""
Loads items with decreasing IDs.
Uses min() to track the lowest seen ID.
"""
for item in fetch_items_before(id.last_value):
yield itemfrom datetime import datetime
@dlt.resource
def historical_data(
timestamp=incremental(
"timestamp",
initial_value=datetime(2024, 1, 1),
end_value=datetime(2024, 12, 31)
)
):
"""
Loads data between start and end dates.
Stops when cursor reaches end_value.
"""
for record in fetch_records(
start=timestamp.last_value,
end=datetime(2024, 12, 31)
):
yield recordfrom datetime import timedelta
@dlt.resource
def delayed_events(
event_time=incremental(
"event_time",
lag=timedelta(hours=2)
)
):
"""
Applies 2-hour attribution window.
Only loads events older than (now - 2 hours) to account for late-arriving data.
"""
for event in fetch_events(until=event_time.last_value):
yield event@dlt.resource
def optional_timestamps(
updated=incremental(
"updated_at",
on_cursor_value_missing="include" # Include items without cursor
)
):
"""
Includes items even if they don't have the cursor field.
Options: "raise" (error), "include", "exclude"
"""
for item in fetch_items():
# Some items may not have "updated_at" field
yield item@dlt.resource
def nested_data(
cursor=incremental("$.metadata.last_modified")
):
"""
Uses JSON path to access nested cursor field.
"""
for item in fetch_data():
# item structure:
# {
# "id": 1,
# "metadata": {
# "last_modified": "2024-01-01T00:00:00Z"
# }
# }
yield item@dlt.resource
def scheduled_data(
timestamp=incremental(
"timestamp",
allow_external_schedulers=True
)
):
"""
Allows Airflow to manage incremental state.
State is passed via Airflow's execution context.
"""
for record in fetch_scheduled_records(since=timestamp.last_value):
yield recorddef custom_cursor_func(values):
"""
Custom function to determine cursor value.
Could implement median, mode, or business-specific logic.
"""
return sorted(values)[len(values) // 2] # Median
@dlt.resource
def custom_incremental(
value=incremental(
"value",
last_value_func=custom_cursor_func
)
):
for item in fetch_items():
yield item@dlt.resource
def multi_cursor(
created=incremental("created_at"),
updated=incremental("updated_at")
):
"""
Track multiple cursor fields independently.
"""
for record in fetch_records(
created_since=created.last_value,
updated_since=updated.last_value
):
yield record@dlt.resource
def range_data(
id=incremental(
"id",
initial_value=100,
range_start="open", # Excludes start value (>)
range_end="closed" # Includes end value (<=)
)
):
"""
Controls whether boundaries are inclusive or exclusive.
- "closed": Includes boundary value (>=, <=)
- "open": Excludes boundary value (>, <)
"""
# Will load: id > last_value (open start)
for item in fetch_items(id_gt=id.last_value):
yield item@dlt.resource
def stateful_resource(updated=incremental("updated_at")):
pipeline = dlt.current.pipeline()
# Access pipeline state
custom_state = pipeline.state.get("my_custom_value", 0)
# Incremental state is stored in pipeline.state
last_cursor = updated.last_value
for item in fetch_items(since=last_cursor):
yield item
# Update custom state
pipeline.state["my_custom_value"] = custom_state + 1@dlt.resource
def raw_events(timestamp=incremental("timestamp")):
for event in fetch_raw_events(since=timestamp.last_value):
yield event
@dlt.transformer(data_from=raw_events)
def processed_events(event, timestamp=incremental("processed_at")):
"""
Transformer with its own incremental state.
"""
processed = process_event(event)
processed["processed_at"] = datetime.now()
yield processedIncremental state is automatically persisted:
.dlt directory (development)_dlt_state table (production)from dlt.extract.incremental import (
IncrementalCursorPathMissing,
IncrementalPrimaryKeyMissing,
IncrementalUnboundError
)
try:
pipeline.run(my_incremental_resource())
except IncrementalCursorPathMissing:
# Cursor path not found in data items
pass
except IncrementalPrimaryKeyMissing:
# Primary key required for merge but not provided
pass
except IncrementalUnboundError:
# Incremental used without upper bound when required
pass