or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

pipeline.mddocs/

Pipeline Operations

The Pipeline class orchestrates data extraction, normalization, and loading with full state management and schema evolution.

Capabilities

Pipeline Class

class Pipeline:
    """
    Main pipeline orchestration class managing the complete ELT process.

    Attributes:
        pipeline_name: Pipeline identifier
        destination: Target destination
        dataset_name: Target dataset name
        pipelines_dir: Storage directory
        state: Pipeline state dictionary
        default_schema: Default schema instance
    """

Core ETL Operations

Execute individual pipeline phases or the complete ETL cycle.

def extract(
    self,
    data: Any,
    table_name: str = None,
    write_disposition: str = None,
    columns: Any = None,
    primary_key: str = None,
    schema: Schema = None,
    max_parallel_items: int = None,
    workers: int = None
) -> Any:
    """
    Extracts data and creates normalized load packages.

    Args:
        data: Source, resource, or Python data to extract
        table_name: Override table name
        write_disposition: Write mode ("append", "replace", "merge", "skip")
        columns: Column hints or definitions
        primary_key: Primary key column(s)
        schema: Custom schema to use
        max_parallel_items: Maximum parallel extract items
        workers: Number of extract workers

    Returns:
        Extract information with statistics
    """
def normalize(
    self,
    workers: int = None,
    loader_file_format: str = None
) -> Any:
    """
    Normalizes extracted data into load packages.

    Args:
        workers: Number of normalize workers
        loader_file_format: Target file format ("jsonl", "parquet", "csv")

    Returns:
        Normalize information with statistics
    """
def load(
    self,
    destination: Any = None,
    dataset_name: str = None,
    credentials: Any = None,
    workers: int = None,
    raise_on_failed_jobs: bool = False
) -> Any:
    """
    Loads normalized data to destination.

    Args:
        destination: Override destination
        dataset_name: Override dataset name
        credentials: Override credentials
        workers: Number of load workers
        raise_on_failed_jobs: Raise exception on job failures

    Returns:
        Load information with statistics and job details
    """
def run(
    self,
    data: Any = None,
    *,
    destination: Any = None,
    staging: Any = None,
    dataset_name: str = None,
    credentials: Any = None,
    table_name: str = None,
    write_disposition: str = None,
    columns: Any = None,
    primary_key: str = None,
    schema: Schema = None,
    loader_file_format: str = None,
    table_format: str = None,
    schema_contract: Any = None,
    refresh: str = None
) -> LoadInfo:
    """
    Runs complete extract-normalize-load cycle.

    Args:
        data: Source, resource, or data to load
        destination: Override destination
        staging: Staging destination
        dataset_name: Override dataset name
        credentials: Override credentials
        table_name: Override table name
        write_disposition: Write mode ("append", "replace", "merge", "skip")
        columns: Column hints
        primary_key: Primary key column(s)
        schema: Custom schema
        loader_file_format: Staging file format ("jsonl", "parquet", "csv", "insert_values")
        table_format: Table format ("iceberg", "delta", "hive")
        schema_contract: Schema evolution rules
        refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")

    Returns:
        LoadInfo with complete load statistics
    """

State Management

Access and manage pipeline state for incremental loading and coordination.

@property
def state(self) -> dict:
    """
    Pipeline state dictionary for storing arbitrary data.

    Returns:
        Mutable state dictionary
    """
def set_local_state_val(self, key: str, value: Any) -> None:
    """
    Sets a local state value (not synced to destination).

    Args:
        key: State key
        value: State value
    """
def get_local_state_val(self, key: str) -> Any:
    """
    Gets a local state value.

    Args:
        key: State key

    Returns:
        State value or None
    """
def sync_destination(
    self,
    destination: Any = None,
    staging: Any = None,
    dataset_name: str = None
) -> None:
    """
    Synchronizes pipeline state with destination.

    Args:
        destination: Destination to sync with
        staging: Staging destination
        dataset_name: Dataset name
    """

Schema Operations

Access and manage pipeline schemas.

@property
def schemas(self) -> Any:
    """
    Schema storage with all pipeline schemas.

    Returns:
        Schema storage instance
    """
@property
def default_schema(self) -> Schema:
    """
    Default schema for the pipeline.

    Returns:
        Schema instance
    """
def sync_schema(self, schema_name: str = None) -> None:
    """
    Synchronizes schema with destination.

    Args:
        schema_name: Schema to sync (None = default schema)
    """

Client Access

Get SQL and destination clients for querying and inspection.

def sql_client(self, schema_name: str = None) -> Any:
    """
    Gets SQL client for executing queries.

    Args:
        schema_name: Schema name (None = default)

    Returns:
        SQL client context manager

    Example:
        with pipeline.sql_client() as client:
            rows = client.execute_sql("SELECT * FROM users")
    """
def destination_client(self, schema_name: str = None) -> Any:
    """
    Gets destination-specific client.

    Args:
        schema_name: Schema name (None = default)

    Returns:
        Destination client context manager
    """
def dataset(self, dataset_name: str = None) -> Any:
    """
    Gets dataset query interface.

    Args:
        dataset_name: Dataset name (None = pipeline dataset)

    Returns:
        Dataset instance for querying
    """

Inspection Methods

Inspect pipeline state, packages, and jobs.

def has_data(self) -> bool:
    """
    Checks if pipeline has any data.

    Returns:
        True if data exists
    """
def has_pending_data(self) -> bool:
    """
    Checks if pipeline has pending data to load.

    Returns:
        True if pending data exists
    """
def list_extracted_resources(self) -> list:
    """
    Lists all extracted resource names.

    Returns:
        List of resource names
    """
def list_extracted_load_packages(self) -> list:
    """
    Lists extracted (not normalized) load packages.

    Returns:
        List of package IDs
    """
def list_normalized_load_packages(self) -> list:
    """
    Lists normalized (ready to load) load packages.

    Returns:
        List of package IDs
    """
def list_completed_load_packages(self) -> list:
    """
    Lists successfully loaded packages.

    Returns:
        List of package IDs
    """
def get_load_package_info(self, load_id: str) -> dict:
    """
    Gets detailed package information.

    Args:
        load_id: Load package ID

    Returns:
        Package information dictionary
    """
def get_load_package_state(self, load_id: str) -> dict:
    """
    Gets package state information.

    Args:
        load_id: Load package ID

    Returns:
        Package state dictionary
    """
def list_failed_jobs_in_package(self, load_id: str) -> list:
    """
    Lists failed jobs in a package.

    Args:
        load_id: Load package ID

    Returns:
        List of failed job information
    """

Context Management

Activate and deactivate pipelines in the current context.

def activate(self) -> None:
    """
    Activates pipeline as the current pipeline.
    """
def deactivate(self) -> None:
    """
    Deactivates the current pipeline.
    """

Pipeline Maintenance

Drop data and manage pipeline lifecycle.

def drop(self) -> None:
    """
    Drops all pipeline data and state from destination and local storage.
    """
def drop_pending_packages(self, with_partial_loads: bool = True) -> None:
    """
    Drops pending load packages.

    Args:
        with_partial_loads: Also drop partially loaded packages
    """

Pipeline Properties

@property
def destination(self) -> Any:
    """
    Pipeline destination instance.

    Returns:
        Destination instance
    """

@destination.setter
def destination(self, value: Any) -> None:
    """
    Sets pipeline destination.

    Args:
        value: Destination name or instance
    """
@property
def staging(self) -> Any:
    """
    Staging destination instance for staging-aware destinations.

    Returns:
        Staging destination instance or None
    """

@staging.setter
def staging(self, value: Any) -> None:
    """
    Sets staging destination.

    Args:
        value: Staging destination name or instance
    """
@property
def naming(self) -> Any:
    """
    Naming convention used by the pipeline.

    Returns:
        Naming convention instance
    """
@property
def last_trace(self) -> Any:
    """
    Last pipeline execution trace with timing and statistics.

    Returns:
        Trace instance or None
    """
@property
def runtime_config(self) -> dict:
    """
    Runtime configuration dictionary.

    Returns:
        Runtime configuration
    """

Usage Examples

Basic Pipeline Execution

import dlt

# Create a pipeline
pipeline = dlt.pipeline(
    pipeline_name="example",
    destination="duckdb",
    dataset_name="raw_data"
)

# Run with data
@dlt.resource
def my_data():
    yield {"id": 1, "value": "test"}

info = pipeline.run(my_data())
print(f"Loaded {info.load_packages[0].jobs['completed_jobs']} jobs")

Multi-Phase Pipeline

# Extract phase
extract_info = pipeline.extract(my_source())
print(f"Extracted {len(extract_info.resources)} resources")

# Normalize phase
normalize_info = pipeline.normalize()
print(f"Normalized {normalize_info.row_counts} rows")

# Load phase
load_info = pipeline.load()
print(f"Load completed: {load_info.has_failed_jobs}")

State Management

# Store incremental state
pipeline.state["last_update"] = "2024-01-01"

# Access in resource
@dlt.resource
def incremental_data():
    last = dlt.current.pipeline().state.get("last_update")
    # Load data since last update
    ...

Querying Loaded Data

# Using SQL client
with pipeline.sql_client() as client:
    rows = client.execute_sql("SELECT COUNT(*) FROM users")
    print(f"Total users: {rows[0][0]}")

# Using dataset interface
dataset = pipeline.dataset()
users_table = dataset.users
result = users_table.select("*").limit(10).df()
print(result)