Automatic schema inference, evolution, and validation with configurable contracts and data type handling.
class Schema:
"""
Manages data schemas with tables, columns, and evolution rules.
Attributes:
name: Schema name
tables: Dictionary of table name -> table schema
data_tables: List of data table names (excluding system tables)
"""
def __init__(self, name: str): ...
@property
def name(self) -> str:
"""Schema name"""
@property
def tables(self) -> Dict[str, TTableSchema]:
"""All tables including system tables"""
@property
def data_tables(self) -> List[str]:
"""Data tables only (excludes _dlt_* tables)"""
def get_table(self, table_name: str) -> TTableSchema:
"""Gets table schema by name"""
def get_table_columns(self, table_name: str) -> Dict[str, TColumnSchema]:
"""Gets columns for a table"""TTableSchema = TypedDict("TTableSchema", {
"name": str,
"columns": Dict[str, "TColumnSchema"],
"write_disposition": str, # "append", "replace", "merge", "skip"
"parent": str, # Parent table for nested data
"resource": str, # Source resource name
}, total=False)
TColumnSchema = TypedDict("TColumnSchema", {
"name": str,
"data_type": str, # "text", "bigint", "double", "bool", "timestamp", etc.
"nullable": bool,
"primary_key": bool,
"unique": bool,
"sort": bool,
"merge_key": bool,
"partition": bool,
"cluster": bool,
# ... additional column properties
}, total=False)
TSchemaUpdate = TypedDict("TSchemaUpdate", {
"tables": Dict[str, TTableSchema]
}, total=False)
TSchemaTables = Dict[str, TTableSchema]
TStoredSchema = TypedDict("TStoredSchema", {
"name": str,
"version": int,
"version_hash": str,
"engine_version": int,
"tables": TSchemaTables,
# ... additional schema properties
}, total=False)
TTableSchemaColumns = Dict[str, TColumnSchema]
TColumnSchemaBase = TypedDict("TColumnSchemaBase", {
"name": str,
"data_type": str,
"nullable": bool
}, total=False)
TSchemaContractDict = TypedDict("TSchemaContractDict", {
"tables": str, # "evolve", "freeze", "discard_row", "discard_value"
"columns": str,
"data_type": str
}, total=False)# Available column hints
TColumnHint = Literal[
"primary_key", # Primary key constraint
"merge_key", # Key for merge deduplication
"unique", # Unique constraint
"partition", # Partition key
"cluster", # Cluster/sort key
"sort", # Sort key
"nullable", # Nullable column
"dedup_sort", # Deduplication sort order
"root_key", # Root table foreign key
"parent_key", # Parent table foreign key
"row_key", # Row identifier
"hard_delete" # Hard delete marker
]
# Set of all column hints
COLUMN_HINTS: Set[str]TDataType = Literal[
"text", # Text/string
"bigint", # 64-bit integer
"double", # Double precision float
"bool", # Boolean
"timestamp", # Timestamp with timezone
"date", # Date
"time", # Time
"decimal", # Fixed-point decimal
"wei", # Ethereum Wei (256-bit integer)
"json", # JSON object
"binary", # Binary data
"complex" # Complex/nested types
]
# Set of all data types
DATA_TYPES: Set[str]def coerce_value(
data_type: TDataType,
value: Any
) -> Any:
"""
Coerces value to target data type.
Args:
data_type: Target dlt data type
value: Value to coerce
Returns:
Coerced value
Example:
timestamp = coerce_value("timestamp", "2024-01-01T00:00:00Z")
"""def py_type_to_sc_type(py_type: type) -> TDataType:
"""
Maps Python type to dlt schema type.
Args:
py_type: Python type
Returns:
Corresponding dlt data type
Example:
data_type = py_type_to_sc_type(int) # Returns "bigint"
data_type = py_type_to_sc_type(str) # Returns "text"
"""def verify_schema_hash(
schema: Schema,
expected_hash: str
) -> bool:
"""
Verifies schema integrity using hash.
Args:
schema: Schema to verify
expected_hash: Expected hash value
Returns:
True if hash matches
"""# Default schema contract mode
DEFAULT_SCHEMA_CONTRACT_MODE: str = "evolve"class DataValidationError(Exception):
"""Raised when data fails schema validation."""import dlt
@dlt.resource
def users():
yield {"id": 1, "name": "Alice", "created_at": "2024-01-01"}
pipeline = dlt.pipeline(destination="duckdb", dataset_name="users_db")
pipeline.run(users())
# Schema automatically inferred:
# - id: bigint
# - name: text
# - created_at: timestamppipeline = dlt.pipeline(...)
pipeline.run(my_source())
# Get default schema
schema = pipeline.default_schema
print(f"Schema name: {schema.name}")
# List tables
for table_name in schema.data_tables:
table = schema.get_table(table_name)
print(f"Table: {table_name}")
print(f" Write disposition: {table.get('write_disposition')}")
# List columns
columns = schema.get_table_columns(table_name)
for col_name, col_schema in columns.items():
print(f" - {col_name}: {col_schema['data_type']}")@dlt.source(
schema_contract={
"tables": "evolve", # Allow new tables (default)
"columns": "freeze", # Disallow new columns
"data_type": "freeze" # Disallow type changes
}
)
def strict_source():
@dlt.resource
def data():
yield {"id": 1, "value": "test"}
return data
# Adding new columns will raise an error# Per-contract settings:
# - "evolve": Allow changes (default)
# - "freeze": Disallow changes, raise error
# - "discard_row": Discard entire row on violation
# - "discard_value": Discard violating values only
@dlt.resource(
schema_contract={
"columns": "discard_value" # Drop new columns
}
)
def flexible_data():
yield {"id": 1, "new_col": "ignored"}
# new_col will be discarded@dlt.resource(
columns={
"id": {
"name": "id",
"data_type": "bigint",
"primary_key": True,
"nullable": False
},
"email": {
"name": "email",
"data_type": "text",
"unique": True
},
"created_at": {
"name": "created_at",
"data_type": "timestamp",
"sort": True,
"partition": True
}
}
)
def users():
yield {"id": 1, "email": "user@example.com", "created_at": "2024-01-01"}# Initial load
@dlt.resource
def data_v1():
yield {"id": 1, "name": "Alice"}
pipeline.run(data_v1())
# Later: add new column (automatically evolved)
@dlt.resource
def data_v2():
yield {"id": 2, "name": "Bob", "email": "bob@example.com"}
pipeline.run(data_v2()) # email column added to schemafrom dlt.common.schema import Schema
# Create custom schema
schema = Schema("my_schema")
# Add table manually
schema.tables["users"] = {
"name": "users",
"columns": {
"id": {"name": "id", "data_type": "bigint", "primary_key": True},
"name": {"name": "name", "data_type": "text"}
},
"write_disposition": "merge"
}
# Use in source
@dlt.source(schema=schema)
def my_source():
...@dlt.source(max_table_nesting=2)
def nested_source():
@dlt.resource
def orders():
yield {
"order_id": 1,
"customer": {
"id": 10,
"name": "Alice"
},
"items": [
{"product_id": 100, "qty": 2},
{"product_id": 101, "qty": 1}
]
}
return orders
# Creates tables:
# - orders: order_id, customer__id, customer__name
# - orders__items: product_id, qty, _dlt_parent_id# Export schema
pipeline = dlt.pipeline(
export_schema_path="schemas/exported"
)
pipeline.run(my_source())
# Schema exported to schemas/exported/<schema_name>.json
# Import schema
pipeline = dlt.pipeline(
import_schema_path="schemas/imported/<schema_name>.json"
)
pipeline.run(my_source())
# Uses imported schema instead of inferring# Sync schema with destination
pipeline.sync_schema()
# Sync specific schema
pipeline.sync_schema("my_schema")from dlt.common.data_types import py_type_to_sc_type, coerce_value
# Python to dlt type
print(py_type_to_sc_type(int)) # "bigint"
print(py_type_to_sc_type(float)) # "double"
print(py_type_to_sc_type(str)) # "text"
print(py_type_to_sc_type(bool)) # "bool"
# Coerce values
timestamp = coerce_value("timestamp", "2024-01-01T00:00:00Z")
integer = coerce_value("bigint", "12345")
decimal = coerce_value("decimal", 3.14159)from dlt.common.schema import DataValidationError
try:
pipeline.run(my_source())
except DataValidationError as e:
print(f"Schema validation failed: {e}")@dlt.resource(
write_disposition="merge",
primary_key="id",
merge_key=["id", "version"] # Composite merge key
)
def versioned_data():
yield {"id": 1, "version": 1, "data": "v1"}
yield {"id": 1, "version": 2, "data": "v2"}
# Deduplicates on (id, version) combination