Define data sources and resources using decorators for declarative pipeline construction with automatic schema inference.
Marks a function as a data source that returns one or more resources.
def source(
func: Callable = None,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = False,
schema: Schema = None,
schema_contract: dict = None,
spec: type = None,
_impl_cls: type = None
) -> Callable:
"""
Decorates a function as a data source.
Args:
func: Function that returns DltResource(s)
name: Source name (defaults to function name)
section: Configuration section for dependency injection
max_table_nesting: Maximum nesting level before creating child tables (0-infinity)
root_key: Add root key column to nested tables
schema: Custom schema instance
schema_contract: Schema evolution rules dict
spec: Configuration specification class
_impl_cls: Internal implementation class
Returns:
Decorated source function returning DltSource
Example:
@dlt.source
def my_source(api_key: str = dlt.secrets.value):
return my_resource(api_key)
"""Marks a function or data as a resource that yields data items.
def resource(
data: Any = None,
name: str = None,
table_name: str = None,
write_disposition: Literal["append", "replace", "merge", "skip"] = None,
columns: Any = None,
primary_key: Union[str, List[str]] = None,
merge_key: Union[str, List[str]] = None,
schema_contract: dict = None,
table_format: Literal["iceberg", "delta", "hive"] = None,
file_format: Literal["jsonl", "parquet", "csv"] = None,
references: Any = None,
depends_on: Any = None,
selected: bool = True,
spec: type = None,
standalone: bool = False,
data_from: Any = None,
parallelized: bool = False
) -> Union[Callable, Any]:
"""
Decorates a function or wraps data as a resource.
Args:
data: Data to wrap (generator, iterable, list, or function)
name: Resource name (defaults to function name)
table_name: Target table name (defaults to resource name)
write_disposition: How to write data:
- "append": Add new data to table
- "replace": Drop and recreate table
- "merge": Deduplicate using merge_key
- "skip": Skip loading if table exists
columns: Column hints or TTableSchema dict
primary_key: Primary key column(s) for merge
merge_key: Merge key column(s) for deduplication
schema_contract: Schema evolution rules:
- "evolve": Allow schema changes (default)
- "freeze": Disallow schema changes
- "discard_row": Discard rows with new columns
- "discard_value": Discard new column values
table_format: Table format for data lakes
file_format: Staging file format
references: Foreign key references to other tables
depends_on: Resource dependencies (other resources)
selected: Whether resource is selected for loading
spec: Configuration specification class
standalone: Create standalone resource (not bound to source)
data_from: Resource to transform (for transformers)
parallelized: Enable parallel execution
Returns:
Decorated resource function or DltResource instance
Example:
@dlt.resource(write_disposition="merge", primary_key="id")
def users():
for i in range(100):
yield {"id": i, "name": f"user_{i}"}
"""Marks a function as a transformer that processes data from another resource.
def transformer(
f: Callable = None,
data_from: Any = None,
name: str = None,
**resource_kwargs
) -> Callable:
"""
Decorates a function as a data transformer.
Args:
f: Function that accepts data items and yields transformed items
data_from: Source resource to transform
name: Transformer name
**resource_kwargs: Additional resource() arguments
Returns:
Decorated transformer function
Example:
@dlt.transformer(data_from=users)
def user_emails(user):
yield {"user_id": user["id"], "email": user["email"]}
"""Defers resource evaluation until explicitly called.
def defer(f: Callable) -> Callable:
"""
Defers resource execution until explicitly invoked.
Args:
f: Resource function to defer
Returns:
Deferred resource
Example:
@dlt.resource
@dlt.defer
def expensive_resource():
# Only executed when explicitly called
yield from load_large_dataset()
"""Container for related resources.
class DltSource:
"""
Collection of related resources representing a data source.
Attributes:
name: Source name
section: Configuration section
max_table_nesting: Max nesting level
root_key: Root key setting
schema: Source schema
resources: Dictionary of resources
"""
@property
def name(self) -> str:
"""Source name"""
@property
def resources(self) -> dict:
"""Dictionary of resource name -> DltResource"""
def with_resources(self, *resource_names: str) -> "DltSource":
"""
Selects specific resources from source.
Args:
*resource_names: Resource names to include
Returns:
New source with selected resources
Example:
source = my_source()
selected = source.with_resources("users", "orders")
"""
def add_limit(self, limit: int) -> "DltSource":
"""
Adds row limit to all resources.
Args:
limit: Maximum rows per resource
Returns:
Source with limits applied
"""Represents a single data stream.
class DltResource:
"""
Individual data resource yielding data items.
Attributes:
name: Resource name
table_name: Target table name
write_disposition: Write behavior
columns: Column hints
primary_key: Primary key column(s)
merge_key: Merge key column(s)
selected: Selection status
"""
@property
def name(self) -> str:
"""Resource name"""
@property
def table_name(self) -> str:
"""Target table name"""
def with_name(self, name: str) -> "DltResource":
"""
Creates copy with new name.
Args:
name: New resource name
Returns:
Resource with new name
"""
def with_table_name(self, table_name: str) -> "DltResource":
"""
Sets target table name.
Args:
table_name: Table name
Returns:
Resource with table name set
"""
def add_limit(self, limit: int) -> "DltResource":
"""
Adds row limit.
Args:
limit: Maximum rows
Returns:
Resource with limit
"""
def add_filter(self, filter_func: Callable) -> "DltResource":
"""
Adds filter function.
Args:
filter_func: Function that returns True to keep item
Returns:
Resource with filter
"""
def add_map(self, map_func: Callable) -> "DltResource":
"""
Adds mapping function.
Args:
map_func: Function that transforms each item
Returns:
Resource with mapping
"""
def add_yield_map(self, yield_map_func: Callable) -> "DltResource":
"""
Adds yield mapping function.
Args:
yield_map_func: Function that yields zero or more items per input
Returns:
Resource with yield mapping
"""
def select_columns(self, *column_names: str) -> "DltResource":
"""
Selects specific columns.
Args:
*column_names: Column names to keep
Returns:
Resource with column selection
"""Apply schema hints to resources.
def with_table_name(data: Any, table_name: str) -> Any:
"""
Sets table name for data or resource.
Args:
data: Resource or data
table_name: Target table name
Returns:
Data with table name hint
"""def with_hints(
data: Any,
columns: Any = None,
primary_key: Union[str, List[str]] = None,
merge_key: Union[str, List[str]] = None,
table_name: str = None,
write_disposition: str = None,
**kwargs
) -> Any:
"""
Applies schema hints to data or resource.
Args:
data: Resource or data
columns: Column hints dictionary
primary_key: Primary key column(s)
merge_key: Merge key column(s)
table_name: Target table name
write_disposition: Write behavior
**kwargs: Additional hints
Returns:
Data with hints applied
"""import dlt
@dlt.source
def weather_source(api_key: str = dlt.secrets.value):
@dlt.resource(write_disposition="append")
def temperature_readings():
# Fetch from API
for reading in fetch_temperatures(api_key):
yield reading
return temperature_readings
# Use the source
pipeline = dlt.pipeline(destination="duckdb", dataset_name="weather")
pipeline.run(weather_source())@dlt.source
def ecommerce_source():
@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def users():
yield from fetch_users()
@dlt.resource(
write_disposition="merge",
primary_key="id",
merge_key="id"
)
def orders():
yield from fetch_orders()
return users, orders
# Load specific resources
pipeline.run(ecommerce_source().with_resources("orders"))@dlt.resource
def users():
for i in range(100):
yield {
"id": i,
"email": f"user{i}@example.com",
"name": f"User {i}"
}
@dlt.transformer(data_from=users, write_disposition="append")
def user_emails(user):
# Transform user data
yield {
"user_id": user["id"],
"email": user["email"]
}
# Both resources will be loaded
pipeline.run([users, user_emails])@dlt.resource
def multi_table_data():
for table in ["events_2023", "events_2024"]:
data = fetch_table_data(table)
yield dlt.mark.with_table_name(data, table)
pipeline.run(multi_table_data())@dlt.resource
def raw_data():
yield from fetch_raw_data()
# Apply transformations
processed = (
raw_data()
.add_filter(lambda x: x["status"] == "active")
.add_map(lambda x: {**x, "processed": True})
.add_limit(1000)
.select_columns("id", "name", "processed")
)
pipeline.run(processed)@dlt.source(max_table_nesting=2, root_key=True)
def nested_source():
@dlt.resource
def orders():
yield {
"order_id": 1,
"customer": {"id": 10, "name": "John"},
"items": [
{"product_id": 100, "quantity": 2},
{"product_id": 101, "quantity": 1}
]
}
return orders
# Creates tables: orders, orders__items
# max_table_nesting=2 means nested "customer" stays in orders table
# but "items" becomes separate table@dlt.resource(
schema_contract={
"tables": "evolve", # Allow new tables
"columns": "freeze", # Disallow new columns
"data_type": "freeze" # Disallow type changes
}
)
def strict_data():
yield {"id": 1, "value": "test"}
# Adding new columns will raise an error with "freeze"