or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

schema.mddocs/

Schema Management

Automatic schema inference, evolution, and validation with configurable contracts and data type handling.

Capabilities

Schema Class

class Schema:
    """
    Manages data schemas with tables, columns, and evolution rules.

    Attributes:
        name: Schema name
        tables: Dictionary of table name -> table schema
        data_tables: List of data table names (excluding system tables)
    """

    def __init__(self, name: str): ...

    @property
    def name(self) -> str:
        """Schema name"""

    @property
    def tables(self) -> Dict[str, TTableSchema]:
        """All tables including system tables"""

    @property
    def data_tables(self) -> List[str]:
        """Data tables only (excludes _dlt_* tables)"""

    def get_table(self, table_name: str) -> TTableSchema:
        """Gets table schema by name"""

    def get_table_columns(self, table_name: str) -> Dict[str, TColumnSchema]:
        """Gets columns for a table"""

Schema Types

TTableSchema = TypedDict("TTableSchema", {
    "name": str,
    "columns": Dict[str, "TColumnSchema"],
    "write_disposition": str,  # "append", "replace", "merge", "skip"
    "parent": str,  # Parent table for nested data
    "resource": str,  # Source resource name
}, total=False)

TColumnSchema = TypedDict("TColumnSchema", {
    "name": str,
    "data_type": str,  # "text", "bigint", "double", "bool", "timestamp", etc.
    "nullable": bool,
    "primary_key": bool,
    "unique": bool,
    "sort": bool,
    "merge_key": bool,
    "partition": bool,
    "cluster": bool,
    # ... additional column properties
}, total=False)

TSchemaUpdate = TypedDict("TSchemaUpdate", {
    "tables": Dict[str, TTableSchema]
}, total=False)

TSchemaTables = Dict[str, TTableSchema]

TStoredSchema = TypedDict("TStoredSchema", {
    "name": str,
    "version": int,
    "version_hash": str,
    "engine_version": int,
    "tables": TSchemaTables,
    # ... additional schema properties
}, total=False)

TTableSchemaColumns = Dict[str, TColumnSchema]

TColumnSchemaBase = TypedDict("TColumnSchemaBase", {
    "name": str,
    "data_type": str,
    "nullable": bool
}, total=False)

TSchemaContractDict = TypedDict("TSchemaContractDict", {
    "tables": str,  # "evolve", "freeze", "discard_row", "discard_value"
    "columns": str,
    "data_type": str
}, total=False)

Column Hints

# Available column hints
TColumnHint = Literal[
    "primary_key",   # Primary key constraint
    "merge_key",     # Key for merge deduplication
    "unique",        # Unique constraint
    "partition",     # Partition key
    "cluster",       # Cluster/sort key
    "sort",          # Sort key
    "nullable",      # Nullable column
    "dedup_sort",    # Deduplication sort order
    "root_key",      # Root table foreign key
    "parent_key",    # Parent table foreign key
    "row_key",       # Row identifier
    "hard_delete"    # Hard delete marker
]

# Set of all column hints
COLUMN_HINTS: Set[str]

Data Types

TDataType = Literal[
    "text",       # Text/string
    "bigint",     # 64-bit integer
    "double",     # Double precision float
    "bool",       # Boolean
    "timestamp",  # Timestamp with timezone
    "date",       # Date
    "time",       # Time
    "decimal",    # Fixed-point decimal
    "wei",        # Ethereum Wei (256-bit integer)
    "json",       # JSON object
    "binary",     # Binary data
    "complex"     # Complex/nested types
]

# Set of all data types
DATA_TYPES: Set[str]

Data Type Functions

def coerce_value(
    data_type: TDataType,
    value: Any
) -> Any:
    """
    Coerces value to target data type.

    Args:
        data_type: Target dlt data type
        value: Value to coerce

    Returns:
        Coerced value

    Example:
        timestamp = coerce_value("timestamp", "2024-01-01T00:00:00Z")
    """
def py_type_to_sc_type(py_type: type) -> TDataType:
    """
    Maps Python type to dlt schema type.

    Args:
        py_type: Python type

    Returns:
        Corresponding dlt data type

    Example:
        data_type = py_type_to_sc_type(int)  # Returns "bigint"
        data_type = py_type_to_sc_type(str)  # Returns "text"
    """

Schema Utilities

def verify_schema_hash(
    schema: Schema,
    expected_hash: str
) -> bool:
    """
    Verifies schema integrity using hash.

    Args:
        schema: Schema to verify
        expected_hash: Expected hash value

    Returns:
        True if hash matches
    """

Schema Constants

# Default schema contract mode
DEFAULT_SCHEMA_CONTRACT_MODE: str = "evolve"

Schema Exceptions

class DataValidationError(Exception):
    """Raised when data fails schema validation."""

Usage Examples

Automatic Schema Inference

import dlt

@dlt.resource
def users():
    yield {"id": 1, "name": "Alice", "created_at": "2024-01-01"}

pipeline = dlt.pipeline(destination="duckdb", dataset_name="users_db")
pipeline.run(users())

# Schema automatically inferred:
# - id: bigint
# - name: text
# - created_at: timestamp

Access Schema

pipeline = dlt.pipeline(...)
pipeline.run(my_source())

# Get default schema
schema = pipeline.default_schema
print(f"Schema name: {schema.name}")

# List tables
for table_name in schema.data_tables:
    table = schema.get_table(table_name)
    print(f"Table: {table_name}")
    print(f"  Write disposition: {table.get('write_disposition')}")

    # List columns
    columns = schema.get_table_columns(table_name)
    for col_name, col_schema in columns.items():
        print(f"  - {col_name}: {col_schema['data_type']}")

Schema Contracts

@dlt.source(
    schema_contract={
        "tables": "evolve",      # Allow new tables (default)
        "columns": "freeze",     # Disallow new columns
        "data_type": "freeze"    # Disallow type changes
    }
)
def strict_source():
    @dlt.resource
    def data():
        yield {"id": 1, "value": "test"}
    return data

# Adding new columns will raise an error

Schema Contract Modes

# Per-contract settings:
# - "evolve": Allow changes (default)
# - "freeze": Disallow changes, raise error
# - "discard_row": Discard entire row on violation
# - "discard_value": Discard violating values only

@dlt.resource(
    schema_contract={
        "columns": "discard_value"  # Drop new columns
    }
)
def flexible_data():
    yield {"id": 1, "new_col": "ignored"}
    # new_col will be discarded

Column Hints

@dlt.resource(
    columns={
        "id": {
            "name": "id",
            "data_type": "bigint",
            "primary_key": True,
            "nullable": False
        },
        "email": {
            "name": "email",
            "data_type": "text",
            "unique": True
        },
        "created_at": {
            "name": "created_at",
            "data_type": "timestamp",
            "sort": True,
            "partition": True
        }
    }
)
def users():
    yield {"id": 1, "email": "user@example.com", "created_at": "2024-01-01"}

Schema Evolution

# Initial load
@dlt.resource
def data_v1():
    yield {"id": 1, "name": "Alice"}

pipeline.run(data_v1())

# Later: add new column (automatically evolved)
@dlt.resource
def data_v2():
    yield {"id": 2, "name": "Bob", "email": "bob@example.com"}

pipeline.run(data_v2())  # email column added to schema

Custom Schema

from dlt.common.schema import Schema

# Create custom schema
schema = Schema("my_schema")

# Add table manually
schema.tables["users"] = {
    "name": "users",
    "columns": {
        "id": {"name": "id", "data_type": "bigint", "primary_key": True},
        "name": {"name": "name", "data_type": "text"}
    },
    "write_disposition": "merge"
}

# Use in source
@dlt.source(schema=schema)
def my_source():
    ...

Nested Data Flattening

@dlt.source(max_table_nesting=2)
def nested_source():
    @dlt.resource
    def orders():
        yield {
            "order_id": 1,
            "customer": {
                "id": 10,
                "name": "Alice"
            },
            "items": [
                {"product_id": 100, "qty": 2},
                {"product_id": 101, "qty": 1}
            ]
        }
    return orders

# Creates tables:
# - orders: order_id, customer__id, customer__name
# - orders__items: product_id, qty, _dlt_parent_id

Import/Export Schema

# Export schema
pipeline = dlt.pipeline(
    export_schema_path="schemas/exported"
)
pipeline.run(my_source())
# Schema exported to schemas/exported/<schema_name>.json

# Import schema
pipeline = dlt.pipeline(
    import_schema_path="schemas/imported/<schema_name>.json"
)
pipeline.run(my_source())
# Uses imported schema instead of inferring

Schema Synchronization

# Sync schema with destination
pipeline.sync_schema()

# Sync specific schema
pipeline.sync_schema("my_schema")

Data Type Mapping

from dlt.common.data_types import py_type_to_sc_type, coerce_value

# Python to dlt type
print(py_type_to_sc_type(int))      # "bigint"
print(py_type_to_sc_type(float))    # "double"
print(py_type_to_sc_type(str))      # "text"
print(py_type_to_sc_type(bool))     # "bool"

# Coerce values
timestamp = coerce_value("timestamp", "2024-01-01T00:00:00Z")
integer = coerce_value("bigint", "12345")
decimal = coerce_value("decimal", 3.14159)

Schema Validation

from dlt.common.schema import DataValidationError

try:
    pipeline.run(my_source())
except DataValidationError as e:
    print(f"Schema validation failed: {e}")

Merge Key Configuration

@dlt.resource(
    write_disposition="merge",
    primary_key="id",
    merge_key=["id", "version"]  # Composite merge key
)
def versioned_data():
    yield {"id": 1, "version": 1, "data": "v1"}
    yield {"id": 1, "version": 2, "data": "v2"}
# Deduplicates on (id, version) combination

Column Hint Usage

  • primary_key: Unique identifier, used for merge operations
  • merge_key: Deduplication key for merge write disposition
  • unique: Enforces uniqueness (destination-dependent)
  • partition: Partition key for partitioned tables
  • cluster: Clustering/sort key for query optimization
  • sort: Sort order hint
  • nullable: Allows NULL values
  • root_key: Foreign key to root table in nested data
  • parent_key: Foreign key to parent table
  • row_key: Unique row identifier
  • hard_delete: Marks deleted records in merge operations