or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

tessl/pypi-dlt

A Python-first scalable data loading library for building ELT pipelines with automatic schema inference and 20+ destination support.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dlt@1.19.x

To install, run

npx @tessl/cli install tessl/pypi-dlt@1.19.0

index.mddocs/

dlt (data load tool)

A Python-first scalable data loading library that enables developers to build production-ready ELT (Extract, Load, Transform) data pipelines without backend infrastructure. dlt provides automatic schema inference, incremental loading, schema evolution, and seamless integration with 20+ data warehouses and destinations.

Package Information

  • Package Name: dlt
  • Language: Python
  • Installation: pip install dlt
  • Documentation: https://dlthub.com/docs

Core Imports

import dlt

Common imports for building pipelines:

from dlt import pipeline, resource, source, transformer, defer
from dlt.sources import incremental

Basic Usage

import dlt

# Define a simple data resource
@dlt.resource
def users():
    for i in range(100):
        yield {"id": i, "name": f"user_{i}"}

# Create and run a pipeline
pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    dataset_name="my_dataset"
)

# Load the data
load_info = pipeline.run(users())
print(load_info)

Architecture

dlt follows the Extract-Normalize-Load (ENL) pattern:

  • Extract: Data sources yield Python data structures (dicts, lists, generators)
  • Normalize: Automatic schema inference and data normalization
  • Load: Efficient batch loading to destinations with schema evolution

Key architectural components:

  • Pipeline: Orchestrates the ENL process and manages state
  • Source: Collection of related resources (decorated with @dlt.source)
  • Resource: Individual data streams (decorated with @dlt.resource)
  • Schema: Automatically inferred and evolved data structure
  • State: Persistent pipeline state for incremental loading
  • Destination: Data warehouse or database target

Capabilities

Pipeline Management

Create, configure, and execute data pipelines with full control over extract, normalize, and load phases.

def pipeline(
    pipeline_name: str = None,
    pipelines_dir: str = None,
    pipeline_salt: str = None,
    destination: Any = None,
    staging: Any = None,
    dataset_name: str = None,
    import_schema_path: str = None,
    export_schema_path: str = None,
    full_refresh: bool = None,
    dev_mode: bool = False,
    refresh: str = None,
    progress: Any = None,
    **kwargs
) -> Pipeline:
    """
    Creates or retrieves a pipeline instance.

    Args:
        pipeline_name: Unique pipeline identifier
        pipelines_dir: Directory for pipeline storage
        pipeline_salt: Salt for hashing sensitive data
        destination: Destination name or instance (e.g., "duckdb", "bigquery")
        staging: Staging destination for staging-aware destinations
        dataset_name: Target dataset/schema name
        import_schema_path: Path to import schema from
        export_schema_path: Path to export schema to
        full_refresh: Deprecated, use dev_mode instead
        dev_mode: Enable development mode (fresh start on each run)
        refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")
        progress: Progress collector ("log", "tqdm", "enlighten", "alive_progress")
        **kwargs: Additional destination-specific parameters

    Returns:
        Pipeline instance
    """
def run(
    data: Any,
    *,
    destination: Any = None,
    staging: Any = None,
    dataset_name: str = None,
    table_name: str = None,
    write_disposition: str = None,
    columns: Any = None,
    schema: Schema = None,
    loader_file_format: str = None,
    table_format: str = None,
    schema_contract: Any = None,
    refresh: str = None
) -> LoadInfo:
    """
    Shorthand to run a pipeline with data in one call.

    Args:
        data: Data to load (resource, source, or Python data)
        destination: Destination name or instance
        staging: Staging destination name or instance
        dataset_name: Target dataset/schema name
        table_name: Target table name
        write_disposition: How to write data ("append", "replace", "merge", "skip")
        columns: Column definitions or hints
        schema: Custom schema
        loader_file_format: File format for staging ("jsonl", "parquet", "csv", "insert_values")
        table_format: Table format ("iceberg", "delta", "hive")
        schema_contract: Schema evolution rules
        refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")

    Returns:
        LoadInfo with load statistics
    """
def attach(
    pipeline_name: str = None,
    pipelines_dir: str = None,
    pipeline_salt: str = None
) -> Pipeline:
    """
    Attaches to an existing pipeline.

    Args:
        pipeline_name: Pipeline name
        pipelines_dir: Pipelines directory
        pipeline_salt: Pipeline salt

    Returns:
        Attached Pipeline instance
    """
def current() -> Pipeline:
    """
    Gets the currently active pipeline.

    Returns:
        Current Pipeline instance or None
    """

Pipeline Operations

Resource and Source Decorators

Declarative decorators for defining data sources and resources with automatic schema inference.

def source(
    func: Callable = None,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = False,
    schema: Schema = None,
    schema_contract: Any = None,
    spec: type = None,
    _impl_cls: type = None
) -> Callable:
    """
    Decorates a function as a data source.

    Args:
        func: Function that returns resource(s)
        name: Source name (defaults to function name)
        section: Configuration section name
        max_table_nesting: Maximum nesting level for flattening
        root_key: Add root key to nested data
        schema: Custom schema
        schema_contract: Schema evolution rules
        spec: Configuration spec class
        _impl_cls: Implementation class (internal)

    Returns:
        Decorated source function
    """
def resource(
    data: Any = None,
    name: str = None,
    table_name: str = None,
    write_disposition: str = None,
    columns: Any = None,
    primary_key: str = None,
    merge_key: str = None,
    schema_contract: Any = None,
    table_format: str = None,
    file_format: str = None,
    references: Any = None,
    depends_on: Any = None,
    selected: bool = True,
    spec: type = None,
    standalone: bool = False,
    data_from: Any = None,
    parallelized: bool = False
) -> Callable:
    """
    Decorates a function or data as a resource.

    Args:
        data: Data to wrap (generator, iterable, or function)
        name: Resource name
        table_name: Target table name
        write_disposition: Write behavior ("append", "replace", "merge", "skip")
        columns: Column hints
        primary_key: Primary key column(s)
        merge_key: Merge key for deduplication
        schema_contract: Schema evolution rules
        table_format: Table format ("iceberg", "delta", "hive")
        file_format: Staging file format
        references: Foreign key references
        depends_on: Resource dependencies
        selected: Whether resource is selected for loading
        spec: Configuration spec
        standalone: Create standalone resource
        data_from: Resource to transform data from
        parallelized: Enable parallel execution

    Returns:
        Decorated resource or DltResource instance
    """
def transformer(
    f: Callable = None,
    data_from: Any = None,
    name: str = None,
    **resource_kwargs
) -> Callable:
    """
    Decorates a function as a data transformer.

    Args:
        f: Function that transforms data
        data_from: Source resource or data
        name: Transformer name
        **resource_kwargs: Same as resource() decorator

    Returns:
        Decorated transformer function
    """
def destination(
    func: Callable = None,
    batch_size: int = 10,
    loader_file_format: str = None,
    name: str = None,
    naming_convention: str = None,
    skip_dlt_columns_and_tables: bool = True,
    max_table_nesting: int = 0,
    spec: type = None
) -> Callable:
    """
    Decorates a function as a custom destination.

    Args:
        func: Function that handles data batches
        batch_size: Number of items per batch
        loader_file_format: File format for staging
        name: Destination name
        naming_convention: Naming convention to use
        skip_dlt_columns_and_tables: Skip dlt system tables
        max_table_nesting: Maximum nesting level
        spec: Configuration spec

    Returns:
        Decorated destination function
    """

Sources and Resources

Incremental Loading

Configure incremental data loading with cursor-based tracking, deduplication, and state management.

class Incremental:
    """
    Configures incremental loading for a resource.

    Args:
        cursor_path: JSON path to cursor field (e.g., "updated_at", "$.id")
        initial_value: Starting cursor value
        last_value_func: Function to get last value (max, min, or custom)
        primary_key: Primary key for deduplication
        end_value: End cursor value for bounded loading
        row_order: Row order ("asc" or "desc")
        allow_external_schedulers: Enable Airflow integration
        on_cursor_value_missing: Action when cursor missing ("raise", "include", "exclude")
        lag: Lag/attribution window (timedelta or numeric)
        range_start: Range start boundary ("open" or "closed")
        range_end: Range end boundary ("open" or "closed")
    """

    def __init__(
        self,
        cursor_path: str,
        initial_value: Any = None,
        last_value_func: Callable = max,
        primary_key: str = None,
        end_value: Any = None,
        row_order: str = "asc",
        allow_external_schedulers: bool = False,
        on_cursor_value_missing: str = "raise",
        lag: Any = None,
        range_start: str = "closed",
        range_end: str = "closed"
    ): ...

    @property
    def last_value(self) -> Any:
        """Current cursor value"""

Incremental Loading

Built-in Sources

Ready-to-use sources for common data loading scenarios.

Filesystem Source

Load files from local filesystem or cloud storage (S3, GCS, Azure).

def filesystem(
    bucket_url: str,
    credentials: Any = None,
    file_glob: str = "*",
    files_per_page: int = None,
    extract_content: bool = False,
    kwargs: dict = None,
    client_kwargs: dict = None,
    incremental: Incremental = None
) -> DltResource:
    """
    Lists and loads files from filesystem or cloud storage.

    Args:
        bucket_url: Bucket URL (e.g., "s3://bucket", "gs://bucket", "az://container")
        credentials: Cloud credentials or fsspec AbstractFileSystem instance
        file_glob: Glob pattern for file filtering
        files_per_page: Files per page for pagination (default: 100)
        extract_content: Whether to extract file content
        kwargs: Additional arguments passed to fsspec constructor
        client_kwargs: Additional arguments for fsspec native client
        incremental: Incremental loading configuration (e.g., by modification_date)

    Returns:
        DltResource with file items
    """
def readers(
    bucket_url: str,
    credentials: Any = None,
    file_glob: str = "*",
    kwargs: dict = None,
    client_kwargs: dict = None,
    incremental: Incremental = None
) -> Tuple[DltResource, ...]:
    """
    Source providing chunked file readers for various formats.

    Args:
        bucket_url: Bucket URL
        credentials: Cloud credentials or fsspec AbstractFileSystem instance
        file_glob: Glob pattern for file filtering
        kwargs: Additional arguments for fsspec constructor
        client_kwargs: Additional arguments for fsspec native client
        incremental: Incremental loading configuration

    Returns:
        Tuple of resources: (read_csv, read_jsonl, read_parquet, read_csv_duckdb)
    """
# File reader transformers
def read_csv(chunksize: int = None, **pandas_kwargs) -> DltResource: ...
def read_jsonl(chunksize: int = None) -> DltResource: ...
def read_parquet(chunksize: int = None) -> DltResource: ...
def read_csv_duckdb(chunksize: int = None, **duckdb_kwargs) -> DltResource: ...

SQL Database Source

Load data from SQL databases with automatic schema inference.

def sql_database(
    credentials: Any = None,
    schema: str = None,
    metadata: Any = None,
    table_names: List[str] = None,
    chunk_size: int = 50000,
    backend: str = "sqlalchemy",
    detect_precision_hints: bool = False,
    reflection_level: str = "full",
    defer_table_reflect: bool = None,
    table_adapter_callback: Callable = None,
    backend_kwargs: dict = None,
    include_views: bool = False,
    type_adapter_callback: Callable = None,
    query_adapter_callback: Callable = None,
    resolve_foreign_keys: bool = False,
    engine_adapter_callback: Callable = None
) -> Iterable[DltResource]:
    """
    Loads data from SQL database tables.

    Args:
        credentials: Database credentials, connection string, or SQLAlchemy Engine
        schema: Database schema name
        metadata: Optional SQLAlchemy MetaData instance (ignores schema arg)
        table_names: List of tables to load (None = all tables)
        chunk_size: Rows per chunk
        backend: Backend ("sqlalchemy", "pyarrow", "pandas", "connectorx")
        detect_precision_hints: Detect precision hints for numeric columns
        reflection_level: Schema reflection ("minimal", "full", "full_with_precision")
        defer_table_reflect: Defer reflection until runtime
        table_adapter_callback: Callback to modify table config
        backend_kwargs: Backend-specific arguments
        include_views: Include database views
        type_adapter_callback: Callback to adapt data types
        query_adapter_callback: Callback to modify queries
        resolve_foreign_keys: Resolve foreign key relationships
        engine_adapter_callback: Callback to modify SQLAlchemy engine

    Returns:
        Iterable of DltResource instances (one per table)
    """
def sql_table(
    credentials: Any = None,
    table: str = None,
    schema: str = None,
    metadata: Any = None,
    incremental: Incremental = None,
    chunk_size: int = 50000,
    backend: str = "sqlalchemy",
    detect_precision_hints: bool = None,
    reflection_level: str = "full",
    defer_table_reflect: bool = None,
    table_adapter_callback: Callable = None,
    backend_kwargs: dict = None,
    type_adapter_callback: Callable = None,
    included_columns: List[str] = None,
    excluded_columns: List[str] = None,
    query_adapter_callback: Callable = None,
    resolve_foreign_keys: bool = False,
    engine_adapter_callback: Callable = None,
    write_disposition: str = "append",
    primary_key: Any = None,
    merge_key: Any = None
) -> DltResource:
    """
    Loads data from a single SQL database table.

    Args:
        credentials: Database credentials, connection string, or SQLAlchemy Engine
        table: Table name to load
        schema: Database schema name
        metadata: Optional SQLAlchemy MetaData instance
        incremental: Incremental loading configuration
        chunk_size: Rows per chunk
        backend: Backend ("sqlalchemy", "pyarrow", "pandas", "connectorx")
        detect_precision_hints: Detect precision hints for numeric columns
        reflection_level: Schema reflection level
        defer_table_reflect: Defer reflection until runtime
        table_adapter_callback: Callback to modify table config
        backend_kwargs: Backend-specific arguments
        type_adapter_callback: Callback to adapt data types
        included_columns: Columns to include
        excluded_columns: Columns to exclude
        query_adapter_callback: Callback to modify queries
        resolve_foreign_keys: Resolve foreign key relationships
        engine_adapter_callback: Callback to modify SQLAlchemy engine
        write_disposition: Write mode ("append", "replace", "merge")
        primary_key: Primary key column(s)
        merge_key: Merge key column(s)

    Returns:
        DltResource for the table
    """

REST API Source

Declarative REST API loading with automatic pagination.

def rest_api(
    client: dict = None,
    resources: List[Union[str, dict, Any]] = None,
    resource_defaults: dict = None
) -> List[DltResource]:
    """
    Declaratively loads data from REST APIs.

    Args:
        client: REST client configuration (base_url, auth, etc.)
        resources: List of endpoint resources to load
        resource_defaults: Default configuration for all resources

    Returns:
        List of DltResource instances
    """
def rest_api_source(
    config: RESTAPIConfig,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = None,
    schema: Schema = None,
    schema_contract: Any = None,
    parallelized: bool = False
) -> DltSource:
    """
    Creates a REST API source with full source configuration options.

    Args:
        config: REST API configuration dictionary
        name: Source name
        section: Configuration section
        max_table_nesting: Maximum nesting level
        root_key: Add root key to nested tables
        schema: Custom schema
        schema_contract: Schema evolution rules
        parallelized: Enable parallel extraction

    Returns:
        DltSource instance
    """
def rest_api_resources(config: RESTAPIConfig) -> List[DltResource]:
    """
    Creates a list of resources from REST API configuration.

    Args:
        config: REST API configuration dictionary

    Returns:
        List of DltResource instances
    """

Filesystem Source

SQL Database Source

REST API Source

Destinations

20+ supported destinations including data warehouses, databases, data lakes, and vector databases.

# Relational Databases
def postgres(credentials: Any = None, **kwargs) -> Destination: ...
def duckdb(credentials: str = None, **kwargs) -> Destination: ...
def mssql(credentials: Any = None, **kwargs) -> Destination: ...
def clickhouse(credentials: Any = None, **kwargs) -> Destination: ...
def sqlalchemy(credentials: Any = None, **kwargs) -> Destination: ...

# Cloud Data Warehouses
def snowflake(credentials: Any = None, **kwargs) -> Destination: ...
def bigquery(credentials: Any = None, **kwargs) -> Destination: ...
def redshift(credentials: Any = None, **kwargs) -> Destination: ...
def athena(credentials: Any = None, **kwargs) -> Destination: ...
def synapse(credentials: Any = None, **kwargs) -> Destination: ...
def databricks(credentials: Any = None, **kwargs) -> Destination: ...
def dremio(credentials: Any = None, **kwargs) -> Destination: ...

# Data Lakes
def filesystem(bucket_url: str = None, credentials: Any = None, **kwargs) -> Destination: ...
def ducklake(credentials: str = None, **kwargs) -> Destination: ...
def motherduck(credentials: Any = None, **kwargs) -> Destination: ...

# Vector Databases
def weaviate(credentials: Any = None, **kwargs) -> Destination: ...
def qdrant(credentials: Any = None, location: str = None, **kwargs) -> Destination: ...
def lancedb(credentials: Any = None, **kwargs) -> Destination: ...

# Testing
def dummy(mode: str = "normal", **kwargs) -> Destination: ...

Destinations

Configuration and Secrets

Hierarchical configuration system with multiple providers (TOML files, environment variables, secrets).

# Configuration accessors
config: Any  # Dictionary-like access to configuration
secrets: Any  # Dictionary-like access to secrets
def configspec(cls: type = None, **kwargs) -> type:
    """
    Decorator marking a class as a configuration specification.

    Args:
        cls: Class to decorate
        **kwargs: Configuration options

    Returns:
        Decorated configuration class
    """
def with_config(
    func: Callable = None,
    sections: tuple = None,
    auto_pipeline_section: bool = False,
    **kwargs
) -> Callable:
    """
    Decorator that injects configuration into function parameters.

    Args:
        func: Function to decorate
        sections: Configuration sections to search
        auto_pipeline_section: Automatically use pipeline section
        **kwargs: Additional options

    Returns:
        Decorated function with injected config
    """
def configspec(cls: type = None, **kwargs) -> type:
    """
    Marks a class as a configuration specification.

    Args:
        cls: Class to mark as configspec
        **kwargs: Configuration options

    Returns:
        Decorated configuration class
    """

Configuration

Schema Management

Automatic schema inference, evolution, and validation with configurable contracts.

class Schema:
    """
    Manages data schemas with tables, columns, and evolution rules.

    Attributes:
        name: Schema name
        tables: Dictionary of table schemas
    """

Schema Management

Helpers and Integrations

Integration helpers for dbt, Airflow, and data visualization.

# dbt integration
from dlt.helpers.dbt import create_venv, package_runner, DBTPackageRunner
from dlt.helpers.dbt_cloud import run_dbt_cloud_job, get_dbt_cloud_run_status

# Schema visualization
from dlt.helpers import graphviz, dbml, mermaid

Helpers

Dataset and Querying

Query loaded data using a pandas-like interface with SQL translation.

def dataset(
    pipeline: Pipeline = None,
    dataset_name: str = None,
    dataset_type: str = "auto"
) -> Dataset:
    """
    Creates a dataset query interface for loaded data.

    Args:
        pipeline: Pipeline instance (None = current pipeline)
        dataset_name: Dataset name (None = pipeline dataset)
        dataset_type: Dataset type ("auto", "ibis", "default")

    Returns:
        Dataset instance for querying tables
    """
class Dataset:
    """
    Dataset interface for querying loaded data.

    Attributes:
        dataset_name: Name of the dataset
        schema_name: Schema name

    Methods:
        Access tables as attributes (e.g., dataset.users)
    """
class Relation:
    """
    Represents a queryable table with pandas-like operations.

    Methods:
        select(*columns): Select columns
        filter(condition): Filter rows
        limit(n): Limit results
        head(n): Get first n rows
        df(): Convert to pandas DataFrame
    """

dbt Integration

Integration with dbt (data build tool) for transformations.

# dbt package execution
from dlt.pipeline.dbt import get_venv, run_dbt_package

def get_venv(
    pipeline: Pipeline,
    venv_path: str = "dbt",
    dbt_version: str = None
) -> Any:
    """
    Creates or restores virtual environment for dbt execution.

    Args:
        pipeline: Pipeline instance
        venv_path: Path to virtual environment
        dbt_version: dbt version to install

    Returns:
        Virtual environment instance
    """
# dbt cloud integration
from dlt.helpers.dbt_cloud import run_dbt_cloud_job, get_dbt_cloud_run_status

Data Marking

Mark data with special processing instructions.

from dlt import mark

def mark.with_table_name(data: Any, table_name: str) -> Any:
    """
    Marks data to be loaded to a specific table.

    Args:
        data: Data to mark
        table_name: Target table name

    Returns:
        Marked data
    """
def mark.with_hints(
    data: Any,
    hints: dict = None,
    create_table_variant: bool = False
) -> Any:
    """
    Marks data with column or table hints.

    Args:
        data: Data to mark
        hints: Column or table hints dictionary
        create_table_variant: Create table variant

    Returns:
        Marked data
    """
def mark.with_file_import(
    file_path: str,
    table_name: str = None,
    **kwargs
) -> Any:
    """
    Marks a file for direct import.

    Args:
        file_path: Path to file
        table_name: Target table name
        **kwargs: Additional arguments

    Returns:
        File import marker
    """
def mark.make_hints(
    columns: dict = None,
    primary_key: Any = None,
    merge_key: Any = None,
    **kwargs
) -> dict:
    """
    Creates a hints dictionary.

    Args:
        columns: Column hints
        primary_key: Primary key column(s)
        merge_key: Merge key column(s)
        **kwargs: Additional hints

    Returns:
        Hints dictionary
    """

Progress Tracking

Track pipeline execution progress with various collectors.

from dlt import progress

# Progress collectors
progress.tqdm()         # tqdm progress bars
progress.log()          # Log-based progress
progress.enlighten()    # Enlighten progress bars
progress.alive_progress()  # Alive progress bars

Package Version

from dlt import __version__

__version__: str  # Package version string

Hub

from dlt import hub

# Access to verified sources from dltHub
# Requires dlthub package: pip install dlthub

Helpers

Types

Core Types

# Secret values
TSecretValue = Any  # Marks values as secrets
TSecretStrValue = str  # String secrets

# Credentials
TCredentials = Any  # Generic credentials type

# Data items
TDataItem = Any  # Single data item (dict, object, etc.)
TDataItems = Union[TDataItem, List[TDataItem]]  # Single or multiple items

# Write dispositions
TWriteDisposition = Literal["append", "replace", "merge", "skip"]

# File formats
TLoaderFileFormat = Literal["jsonl", "typed-jsonl", "parquet", "csv", "insert_values"]

# Table formats
TTableFormat = Literal["iceberg", "delta", "hive"]

# Data types
TDataType = Literal[
    "text", "bigint", "double", "bool", "timestamp", "date", "time",
    "decimal", "wei", "json", "binary", "complex"
]

# Column hints
TColumnHint = Literal[
    "primary_key", "merge_key", "unique", "partition", "cluster", "sort",
    "nullable", "dedup_sort", "root_key", "parent_key", "row_key", "hard_delete"
]

Source and Resource Classes

class DltSource:
    """
    Represents a data source containing one or more resources.

    Attributes:
        name: Source name
        schema: Source schema
        resources: Dictionary of resources
        selected_resources: Currently selected resources

    Methods:
        with_resources(*resource_names): Select specific resources
        add_limit(limit): Add row limit to all resources
    """
class DltResource:
    """
    Represents a single data resource (table).

    Attributes:
        name: Resource name
        write_disposition: Write mode
        selected: Whether resource is selected

    Methods:
        add_filter(filter_func): Add filter function
        add_map(map_func): Add transformation function
        add_yield_map(map_func): Add yield-based transformation
        add_limit(limit): Limit number of items
        apply_hints(**hints): Apply column/table hints
    """

REST API Types

RESTAPIConfig = TypedDict("RESTAPIConfig", {
    "client": dict,  # Client configuration
    "resources": List[Union[str, dict]],  # Resource definitions
    "resource_defaults": dict  # Default resource settings
}, total=False)

Schema Types

TColumnSchema = TypedDict("TColumnSchema", {
    "name": str,
    "data_type": TDataType,
    "nullable": bool,
    # ... additional column properties
}, total=False)

TTableSchema = TypedDict("TTableSchema", {
    "name": str,
    "columns": Dict[str, TColumnSchema],
    "write_disposition": TWriteDisposition,
    # ... additional table properties
}, total=False)