or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

sources-resources.mddocs/

Sources and Resources

Define data sources and resources using decorators for declarative pipeline construction with automatic schema inference.

Capabilities

Source Decorator

Marks a function as a data source that returns one or more resources.

def source(
    func: Callable = None,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = False,
    schema: Schema = None,
    schema_contract: dict = None,
    spec: type = None,
    _impl_cls: type = None
) -> Callable:
    """
    Decorates a function as a data source.

    Args:
        func: Function that returns DltResource(s)
        name: Source name (defaults to function name)
        section: Configuration section for dependency injection
        max_table_nesting: Maximum nesting level before creating child tables (0-infinity)
        root_key: Add root key column to nested tables
        schema: Custom schema instance
        schema_contract: Schema evolution rules dict
        spec: Configuration specification class
        _impl_cls: Internal implementation class

    Returns:
        Decorated source function returning DltSource

    Example:
        @dlt.source
        def my_source(api_key: str = dlt.secrets.value):
            return my_resource(api_key)
    """

Resource Decorator

Marks a function or data as a resource that yields data items.

def resource(
    data: Any = None,
    name: str = None,
    table_name: str = None,
    write_disposition: Literal["append", "replace", "merge", "skip"] = None,
    columns: Any = None,
    primary_key: Union[str, List[str]] = None,
    merge_key: Union[str, List[str]] = None,
    schema_contract: dict = None,
    table_format: Literal["iceberg", "delta", "hive"] = None,
    file_format: Literal["jsonl", "parquet", "csv"] = None,
    references: Any = None,
    depends_on: Any = None,
    selected: bool = True,
    spec: type = None,
    standalone: bool = False,
    data_from: Any = None,
    parallelized: bool = False
) -> Union[Callable, Any]:
    """
    Decorates a function or wraps data as a resource.

    Args:
        data: Data to wrap (generator, iterable, list, or function)
        name: Resource name (defaults to function name)
        table_name: Target table name (defaults to resource name)
        write_disposition: How to write data:
            - "append": Add new data to table
            - "replace": Drop and recreate table
            - "merge": Deduplicate using merge_key
            - "skip": Skip loading if table exists
        columns: Column hints or TTableSchema dict
        primary_key: Primary key column(s) for merge
        merge_key: Merge key column(s) for deduplication
        schema_contract: Schema evolution rules:
            - "evolve": Allow schema changes (default)
            - "freeze": Disallow schema changes
            - "discard_row": Discard rows with new columns
            - "discard_value": Discard new column values
        table_format: Table format for data lakes
        file_format: Staging file format
        references: Foreign key references to other tables
        depends_on: Resource dependencies (other resources)
        selected: Whether resource is selected for loading
        spec: Configuration specification class
        standalone: Create standalone resource (not bound to source)
        data_from: Resource to transform (for transformers)
        parallelized: Enable parallel execution

    Returns:
        Decorated resource function or DltResource instance

    Example:
        @dlt.resource(write_disposition="merge", primary_key="id")
        def users():
            for i in range(100):
                yield {"id": i, "name": f"user_{i}"}
    """

Transformer Decorator

Marks a function as a transformer that processes data from another resource.

def transformer(
    f: Callable = None,
    data_from: Any = None,
    name: str = None,
    **resource_kwargs
) -> Callable:
    """
    Decorates a function as a data transformer.

    Args:
        f: Function that accepts data items and yields transformed items
        data_from: Source resource to transform
        name: Transformer name
        **resource_kwargs: Additional resource() arguments

    Returns:
        Decorated transformer function

    Example:
        @dlt.transformer(data_from=users)
        def user_emails(user):
            yield {"user_id": user["id"], "email": user["email"]}
    """

Defer Decorator

Defers resource evaluation until explicitly called.

def defer(f: Callable) -> Callable:
    """
    Defers resource execution until explicitly invoked.

    Args:
        f: Resource function to defer

    Returns:
        Deferred resource

    Example:
        @dlt.resource
        @dlt.defer
        def expensive_resource():
            # Only executed when explicitly called
            yield from load_large_dataset()
    """

DltSource Class

Container for related resources.

class DltSource:
    """
    Collection of related resources representing a data source.

    Attributes:
        name: Source name
        section: Configuration section
        max_table_nesting: Max nesting level
        root_key: Root key setting
        schema: Source schema
        resources: Dictionary of resources
    """

    @property
    def name(self) -> str:
        """Source name"""

    @property
    def resources(self) -> dict:
        """Dictionary of resource name -> DltResource"""

    def with_resources(self, *resource_names: str) -> "DltSource":
        """
        Selects specific resources from source.

        Args:
            *resource_names: Resource names to include

        Returns:
            New source with selected resources

        Example:
            source = my_source()
            selected = source.with_resources("users", "orders")
        """

    def add_limit(self, limit: int) -> "DltSource":
        """
        Adds row limit to all resources.

        Args:
            limit: Maximum rows per resource

        Returns:
            Source with limits applied
        """

DltResource Class

Represents a single data stream.

class DltResource:
    """
    Individual data resource yielding data items.

    Attributes:
        name: Resource name
        table_name: Target table name
        write_disposition: Write behavior
        columns: Column hints
        primary_key: Primary key column(s)
        merge_key: Merge key column(s)
        selected: Selection status
    """

    @property
    def name(self) -> str:
        """Resource name"""

    @property
    def table_name(self) -> str:
        """Target table name"""

    def with_name(self, name: str) -> "DltResource":
        """
        Creates copy with new name.

        Args:
            name: New resource name

        Returns:
            Resource with new name
        """

    def with_table_name(self, table_name: str) -> "DltResource":
        """
        Sets target table name.

        Args:
            table_name: Table name

        Returns:
            Resource with table name set
        """

    def add_limit(self, limit: int) -> "DltResource":
        """
        Adds row limit.

        Args:
            limit: Maximum rows

        Returns:
            Resource with limit
        """

    def add_filter(self, filter_func: Callable) -> "DltResource":
        """
        Adds filter function.

        Args:
            filter_func: Function that returns True to keep item

        Returns:
            Resource with filter
        """

    def add_map(self, map_func: Callable) -> "DltResource":
        """
        Adds mapping function.

        Args:
            map_func: Function that transforms each item

        Returns:
            Resource with mapping
        """

    def add_yield_map(self, yield_map_func: Callable) -> "DltResource":
        """
        Adds yield mapping function.

        Args:
            yield_map_func: Function that yields zero or more items per input

        Returns:
            Resource with yield mapping
        """

    def select_columns(self, *column_names: str) -> "DltResource":
        """
        Selects specific columns.

        Args:
            *column_names: Column names to keep

        Returns:
            Resource with column selection
        """

Resource Hints

Apply schema hints to resources.

def with_table_name(data: Any, table_name: str) -> Any:
    """
    Sets table name for data or resource.

    Args:
        data: Resource or data
        table_name: Target table name

    Returns:
        Data with table name hint
    """
def with_hints(
    data: Any,
    columns: Any = None,
    primary_key: Union[str, List[str]] = None,
    merge_key: Union[str, List[str]] = None,
    table_name: str = None,
    write_disposition: str = None,
    **kwargs
) -> Any:
    """
    Applies schema hints to data or resource.

    Args:
        data: Resource or data
        columns: Column hints dictionary
        primary_key: Primary key column(s)
        merge_key: Merge key column(s)
        table_name: Target table name
        write_disposition: Write behavior
        **kwargs: Additional hints

    Returns:
        Data with hints applied
    """

Usage Examples

Simple Source and Resource

import dlt

@dlt.source
def weather_source(api_key: str = dlt.secrets.value):
    @dlt.resource(write_disposition="append")
    def temperature_readings():
        # Fetch from API
        for reading in fetch_temperatures(api_key):
            yield reading

    return temperature_readings

# Use the source
pipeline = dlt.pipeline(destination="duckdb", dataset_name="weather")
pipeline.run(weather_source())

Multiple Resources in Source

@dlt.source
def ecommerce_source():
    @dlt.resource(
        write_disposition="merge",
        primary_key="id"
    )
    def users():
        yield from fetch_users()

    @dlt.resource(
        write_disposition="merge",
        primary_key="id",
        merge_key="id"
    )
    def orders():
        yield from fetch_orders()

    return users, orders

# Load specific resources
pipeline.run(ecommerce_source().with_resources("orders"))

Transformer Pattern

@dlt.resource
def users():
    for i in range(100):
        yield {
            "id": i,
            "email": f"user{i}@example.com",
            "name": f"User {i}"
        }

@dlt.transformer(data_from=users, write_disposition="append")
def user_emails(user):
    # Transform user data
    yield {
        "user_id": user["id"],
        "email": user["email"]
    }

# Both resources will be loaded
pipeline.run([users, user_emails])

Dynamic Table Names

@dlt.resource
def multi_table_data():
    for table in ["events_2023", "events_2024"]:
        data = fetch_table_data(table)
        yield dlt.mark.with_table_name(data, table)

pipeline.run(multi_table_data())

Resource Chaining

@dlt.resource
def raw_data():
    yield from fetch_raw_data()

# Apply transformations
processed = (
    raw_data()
    .add_filter(lambda x: x["status"] == "active")
    .add_map(lambda x: {**x, "processed": True})
    .add_limit(1000)
    .select_columns("id", "name", "processed")
)

pipeline.run(processed)

Nested Data Handling

@dlt.source(max_table_nesting=2, root_key=True)
def nested_source():
    @dlt.resource
    def orders():
        yield {
            "order_id": 1,
            "customer": {"id": 10, "name": "John"},
            "items": [
                {"product_id": 100, "quantity": 2},
                {"product_id": 101, "quantity": 1}
            ]
        }

    return orders

# Creates tables: orders, orders__items
# max_table_nesting=2 means nested "customer" stays in orders table
# but "items" becomes separate table

Schema Contracts

@dlt.resource(
    schema_contract={
        "tables": "evolve",      # Allow new tables
        "columns": "freeze",     # Disallow new columns
        "data_type": "freeze"    # Disallow type changes
    }
)
def strict_data():
    yield {"id": 1, "value": "test"}

# Adding new columns will raise an error with "freeze"