The Pipeline class orchestrates data extraction, normalization, and loading with full state management and schema evolution.
class Pipeline:
"""
Main pipeline orchestration class managing the complete ELT process.
Attributes:
pipeline_name: Pipeline identifier
destination: Target destination
dataset_name: Target dataset name
pipelines_dir: Storage directory
state: Pipeline state dictionary
default_schema: Default schema instance
"""Execute individual pipeline phases or the complete ETL cycle.
def extract(
self,
data: Any,
table_name: str = None,
write_disposition: str = None,
columns: Any = None,
primary_key: str = None,
schema: Schema = None,
max_parallel_items: int = None,
workers: int = None
) -> Any:
"""
Extracts data and creates normalized load packages.
Args:
data: Source, resource, or Python data to extract
table_name: Override table name
write_disposition: Write mode ("append", "replace", "merge", "skip")
columns: Column hints or definitions
primary_key: Primary key column(s)
schema: Custom schema to use
max_parallel_items: Maximum parallel extract items
workers: Number of extract workers
Returns:
Extract information with statistics
"""def normalize(
self,
workers: int = None,
loader_file_format: str = None
) -> Any:
"""
Normalizes extracted data into load packages.
Args:
workers: Number of normalize workers
loader_file_format: Target file format ("jsonl", "parquet", "csv")
Returns:
Normalize information with statistics
"""def load(
self,
destination: Any = None,
dataset_name: str = None,
credentials: Any = None,
workers: int = None,
raise_on_failed_jobs: bool = False
) -> Any:
"""
Loads normalized data to destination.
Args:
destination: Override destination
dataset_name: Override dataset name
credentials: Override credentials
workers: Number of load workers
raise_on_failed_jobs: Raise exception on job failures
Returns:
Load information with statistics and job details
"""def run(
self,
data: Any = None,
*,
destination: Any = None,
staging: Any = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: str = None,
columns: Any = None,
primary_key: str = None,
schema: Schema = None,
loader_file_format: str = None,
table_format: str = None,
schema_contract: Any = None,
refresh: str = None
) -> LoadInfo:
"""
Runs complete extract-normalize-load cycle.
Args:
data: Source, resource, or data to load
destination: Override destination
staging: Staging destination
dataset_name: Override dataset name
credentials: Override credentials
table_name: Override table name
write_disposition: Write mode ("append", "replace", "merge", "skip")
columns: Column hints
primary_key: Primary key column(s)
schema: Custom schema
loader_file_format: Staging file format ("jsonl", "parquet", "csv", "insert_values")
table_format: Table format ("iceberg", "delta", "hive")
schema_contract: Schema evolution rules
refresh: Refresh mode ("drop_sources", "drop_resources", "drop_data")
Returns:
LoadInfo with complete load statistics
"""Access and manage pipeline state for incremental loading and coordination.
@property
def state(self) -> dict:
"""
Pipeline state dictionary for storing arbitrary data.
Returns:
Mutable state dictionary
"""def set_local_state_val(self, key: str, value: Any) -> None:
"""
Sets a local state value (not synced to destination).
Args:
key: State key
value: State value
"""def get_local_state_val(self, key: str) -> Any:
"""
Gets a local state value.
Args:
key: State key
Returns:
State value or None
"""def sync_destination(
self,
destination: Any = None,
staging: Any = None,
dataset_name: str = None
) -> None:
"""
Synchronizes pipeline state with destination.
Args:
destination: Destination to sync with
staging: Staging destination
dataset_name: Dataset name
"""Access and manage pipeline schemas.
@property
def schemas(self) -> Any:
"""
Schema storage with all pipeline schemas.
Returns:
Schema storage instance
"""@property
def default_schema(self) -> Schema:
"""
Default schema for the pipeline.
Returns:
Schema instance
"""def sync_schema(self, schema_name: str = None) -> None:
"""
Synchronizes schema with destination.
Args:
schema_name: Schema to sync (None = default schema)
"""Get SQL and destination clients for querying and inspection.
def sql_client(self, schema_name: str = None) -> Any:
"""
Gets SQL client for executing queries.
Args:
schema_name: Schema name (None = default)
Returns:
SQL client context manager
Example:
with pipeline.sql_client() as client:
rows = client.execute_sql("SELECT * FROM users")
"""def destination_client(self, schema_name: str = None) -> Any:
"""
Gets destination-specific client.
Args:
schema_name: Schema name (None = default)
Returns:
Destination client context manager
"""def dataset(self, dataset_name: str = None) -> Any:
"""
Gets dataset query interface.
Args:
dataset_name: Dataset name (None = pipeline dataset)
Returns:
Dataset instance for querying
"""Inspect pipeline state, packages, and jobs.
def has_data(self) -> bool:
"""
Checks if pipeline has any data.
Returns:
True if data exists
"""def has_pending_data(self) -> bool:
"""
Checks if pipeline has pending data to load.
Returns:
True if pending data exists
"""def list_extracted_resources(self) -> list:
"""
Lists all extracted resource names.
Returns:
List of resource names
"""def list_extracted_load_packages(self) -> list:
"""
Lists extracted (not normalized) load packages.
Returns:
List of package IDs
"""def list_normalized_load_packages(self) -> list:
"""
Lists normalized (ready to load) load packages.
Returns:
List of package IDs
"""def list_completed_load_packages(self) -> list:
"""
Lists successfully loaded packages.
Returns:
List of package IDs
"""def get_load_package_info(self, load_id: str) -> dict:
"""
Gets detailed package information.
Args:
load_id: Load package ID
Returns:
Package information dictionary
"""def get_load_package_state(self, load_id: str) -> dict:
"""
Gets package state information.
Args:
load_id: Load package ID
Returns:
Package state dictionary
"""def list_failed_jobs_in_package(self, load_id: str) -> list:
"""
Lists failed jobs in a package.
Args:
load_id: Load package ID
Returns:
List of failed job information
"""Activate and deactivate pipelines in the current context.
def activate(self) -> None:
"""
Activates pipeline as the current pipeline.
"""def deactivate(self) -> None:
"""
Deactivates the current pipeline.
"""Drop data and manage pipeline lifecycle.
def drop(self) -> None:
"""
Drops all pipeline data and state from destination and local storage.
"""def drop_pending_packages(self, with_partial_loads: bool = True) -> None:
"""
Drops pending load packages.
Args:
with_partial_loads: Also drop partially loaded packages
"""@property
def destination(self) -> Any:
"""
Pipeline destination instance.
Returns:
Destination instance
"""
@destination.setter
def destination(self, value: Any) -> None:
"""
Sets pipeline destination.
Args:
value: Destination name or instance
"""@property
def staging(self) -> Any:
"""
Staging destination instance for staging-aware destinations.
Returns:
Staging destination instance or None
"""
@staging.setter
def staging(self, value: Any) -> None:
"""
Sets staging destination.
Args:
value: Staging destination name or instance
"""@property
def naming(self) -> Any:
"""
Naming convention used by the pipeline.
Returns:
Naming convention instance
"""@property
def last_trace(self) -> Any:
"""
Last pipeline execution trace with timing and statistics.
Returns:
Trace instance or None
"""@property
def runtime_config(self) -> dict:
"""
Runtime configuration dictionary.
Returns:
Runtime configuration
"""import dlt
# Create a pipeline
pipeline = dlt.pipeline(
pipeline_name="example",
destination="duckdb",
dataset_name="raw_data"
)
# Run with data
@dlt.resource
def my_data():
yield {"id": 1, "value": "test"}
info = pipeline.run(my_data())
print(f"Loaded {info.load_packages[0].jobs['completed_jobs']} jobs")# Extract phase
extract_info = pipeline.extract(my_source())
print(f"Extracted {len(extract_info.resources)} resources")
# Normalize phase
normalize_info = pipeline.normalize()
print(f"Normalized {normalize_info.row_counts} rows")
# Load phase
load_info = pipeline.load()
print(f"Load completed: {load_info.has_failed_jobs}")# Store incremental state
pipeline.state["last_update"] = "2024-01-01"
# Access in resource
@dlt.resource
def incremental_data():
last = dlt.current.pipeline().state.get("last_update")
# Load data since last update
...# Using SQL client
with pipeline.sql_client() as client:
rows = client.execute_sql("SELECT COUNT(*) FROM users")
print(f"Total users: {rows[0][0]}")
# Using dataset interface
dataset = pipeline.dataset()
users_table = dataset.users
result = users_table.select("*").limit(10).df()
print(result)