CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster

A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.

Pending
Overview
Eval results
Files

core-definitions.mddocs/

Core Definitions

This document covers the fundamental building blocks of Dagster: assets, operations, jobs, graphs, and repositories. These core abstractions form the foundation of all Dagster pipelines.

Asset System

Assets are the primary abstraction in Dagster, representing data artifacts that exist or should exist. They enable declarative data pipeline development with automatic dependency inference and rich lineage tracking.

Asset Decorators

@asset { .api }

Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator

Define a software-defined asset from a Python function.

from dagster import asset, MaterializeResult
import pandas as pd

@asset
def users_data() -> pd.DataFrame:
    """Load users data from database."""
    return pd.read_sql("SELECT * FROM users", connection)

@asset(
    deps=["external_source"],
    metadata={"owner": "data-team"},
    group_name="analytics",
    compute_kind="pandas"
)
def processed_users(users_data: pd.DataFrame) -> MaterializeResult:
    """Process users data with metadata."""
    processed = users_data.dropna()
    return MaterializeResult(
        value=processed,
        metadata={
            "records": len(processed),
            "null_dropped": len(users_data) - len(processed)
        }
    )

Parameters:

  • name: Optional[str] - Asset name (defaults to function name)
  • key_prefix: Optional[CoercibleToAssetKeyPrefix] - Asset key prefix
  • ins: Optional[Mapping[str, AssetIn]] - Input specifications
  • deps: Optional[Iterable[CoercibleToAssetDep]] - Asset dependencies
  • metadata: Optional[ArbitraryMetadataMapping] - Asset metadata
  • tags: Optional[Mapping[str, str]] - Asset tags for UI and filtering
  • description: Optional[str] - Asset description
  • config_schema: Optional[UserConfigSchema] - Configuration schema
  • required_resource_keys: Optional[AbstractSet[str]] - Required resource keys
  • resource_defs: Optional[Mapping[str, object]] - Resource definitions (beta)
  • hooks: Optional[AbstractSet[HookDefinition]] - Success/failure hooks
  • io_manager_def: Optional[object] - I/O manager definition (beta)
  • io_manager_key: Optional[str] - I/O manager key
  • dagster_type: Optional[DagsterType] - Output type
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • op_tags: Optional[Mapping[str, Any]] - Operation tags
  • group_name: Optional[str] - Asset group name
  • output_required: bool = True - Whether output is required
  • automation_condition: Optional[AutomationCondition] - Declarative automation condition
  • freshness_policy: Optional[InternalFreshnessPolicy] - Freshness policy (internal)
  • backfill_policy: Optional[BackfillPolicy] - Backfill policy (beta)
  • retry_policy: Optional[RetryPolicy] - Retry policy for failed materializations
  • code_version: Optional[str] - Code version for change tracking
  • key: Optional[CoercibleToAssetKey] - Explicit asset key (alternative to name)
  • check_specs: Optional[Sequence[AssetCheckSpec]] - Asset check specifications
  • owners: Optional[Sequence[str]] - Asset owners for metadata
  • kinds: Optional[AbstractSet[str]] - Asset kinds for compute engine tags
  • pool: Optional[str] - Execution pool for asset materialization
  • non_argument_deps: Optional[Set[AssetKey]] - DEPRECATED - Use deps instead
  • auto_materialize_policy: Optional[AutoMaterializePolicy] - DEPRECATED - Use automation_condition
  • compute_kind: Optional[str] - DEPRECATED - Use kinds instead

Returns: Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]

@multi_asset { .api }

Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator

Define multiple related assets from a single function.

from dagster import multi_asset, AssetOut, AssetSpec, MaterializeResult

@multi_asset(
    outs={
        "users_clean": AssetOut(key_prefix="staging"),
        "users_enriched": AssetOut(key_prefix="marts")
    }
)
def process_users(users_raw) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Process users into clean and enriched datasets."""
    clean = users_raw.dropna()
    enriched = clean.merge(demographics_data, on="user_id")
    return clean, enriched

# Alternative specification approach
@multi_asset(
    specs=[
        AssetSpec("customers", group_name="core"),
        AssetSpec("customer_metrics", group_name="analytics")  
    ]
)
def customer_pipeline(context) -> dict[str, MaterializeResult]:
    """Generate multiple customer assets."""
    customers_df = extract_customers()
    metrics_df = calculate_metrics(customers_df)
    
    return {
        "customers": MaterializeResult(value=customers_df),
        "customer_metrics": MaterializeResult(value=metrics_df)
    }

Parameters:

  • outs: Optional[Dict[str, AssetOut]] - Output specifications by name
  • specs: Optional[Sequence[AssetSpec]] - Asset specifications
  • name: Optional[str] - Multi-asset name
  • ins: Optional[Dict[str, AssetIn]] - Input specifications
  • deps: Optional[Sequence[Union[str, AssetKey, AssetsDefinition]]] - Dependencies
  • description: Optional[str] - Multi-asset description
  • config_schema: Optional[ConfigSchema] - Configuration schema
  • required_resource_keys: Optional[Set[str]] - Required resource keys
  • compute_kind: Optional[str] - Compute kind for UI
  • internal_asset_deps: Optional[Dict[str, Set[AssetKey]]] - Internal dependencies
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • backfill_policy: Optional[BackfillPolicy] - Backfill policy
  • op_tags: Optional[Dict[str, Any]] - Operation tags
  • can_subset: bool = False - Whether asset can be subset
  • resource_defs: Optional[Dict[str, ResourceDefinition]] - Resource definitions
  • group_name: Optional[str] - Asset group name

@graph_asset { .api }

Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator

Define an asset composed of a graph of operations.

from dagster import graph_asset, op

@op
def extract_data() -> pd.DataFrame:
    return pd.read_csv("data.csv")

@op  
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.dropna()

@op
def load_data(df: pd.DataFrame) -> None:
    df.to_parquet("output.parquet")

@graph_asset
def etl_pipeline():
    """ETL pipeline as a graph asset."""
    load_data(transform_data(extract_data()))

Asset Definition Classes

AssetsDefinition { .api }

Module: dagster._core.definitions.assets.definition.assets_definition
Type: Class

Represents a set of software-defined assets.

from dagster import AssetsDefinition, AssetSpec

# Created from decorator
assets_def = users_data

# Access asset information
asset_keys = assets_def.keys  # Set of AssetKey objects
asset_specs = assets_def.specs_by_key  # Dict[AssetKey, AssetSpec]
dependencies = assets_def.dependency_keys  # Set of dependency keys

# Check if asset can be materialized
can_materialize = assets_def.can_subset

# Get partitions definition
partitions = assets_def.partitions_def

Key Properties:

  • keys: AbstractSet[AssetKey] - Set of asset keys
  • specs_by_key: Dict[AssetKey, AssetSpec] - Asset specifications by key
  • keys_by_input_name: Dict[str, AssetKey] - Input name to key mapping
  • keys_by_output_name: Dict[str, AssetKey] - Output name to key mapping
  • dependency_keys: AbstractSet[AssetKey] - Dependency asset keys
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • can_subset: bool - Whether assets can be subset
  • execution_type: AssetExecutionType - Execution type
  • op: OpDefinition - Underlying operation definition

AssetSpec { .api }

Module: dagster._core.definitions.assets.definition.asset_spec
Type: Class

Specification for a single asset, containing metadata and configuration.

from dagster import AssetSpec, AssetKey

# Basic asset specification
spec = AssetSpec(
    key="my_asset",
    description="Important business data",
    metadata={"owner": "data-team"},
    group_name="core"
)

# Complex asset specification  
complex_spec = AssetSpec(
    key=AssetKey(["warehouse", "dim", "customers"]),
    description="Customer dimension table",
    metadata={
        "table_name": "dim_customers",
        "schema": "analytics",
        "owner": "customer-team"
    },
    group_name="dimensions", 
    freshness_policy=freshness_policy,
    auto_materialize_policy=AutoMaterializePolicy.eager()
)

Parameters:

  • key: Union[AssetKey, str, Sequence[str]] - Asset key
  • deps: Optional[Sequence[Union[str, AssetKey, AssetDep]]] - Dependencies
  • description: Optional[str] - Asset description
  • metadata: Optional[Dict[str, Any]] - Asset metadata
  • group_name: Optional[str] - Asset group
  • freshness_policy: Optional[FreshnessPolicy] - Freshness policy
  • auto_materialize_policy: Optional[AutoMaterializePolicy] - Auto-materialization policy
  • backfill_policy: Optional[BackfillPolicy] - Backfill policy
  • code_version: Optional[str] - Code version
  • owners: Optional[Sequence[str]] - Asset owners
  • tags: Optional[Dict[str, str]] - Asset tags

Asset I/O Specifications

AssetIn { .api }

Module: dagster._core.definitions.assets.job.asset_in
Type: Class

Input specification for assets with metadata and configuration.

from dagster import asset, AssetIn

@asset(
    ins={
        "upstream_data": AssetIn(
            key_prefix="staging",
            metadata={"format": "parquet"},
            input_manager_key="warehouse_loader"
        )
    }
)
def processed_asset(upstream_data: pd.DataFrame) -> pd.DataFrame:
    """Process upstream data with custom input config."""
    return upstream_data.transform()

Parameters:

  • key: Optional[Union[str, AssetKey]] - Asset key to depend on
  • key_prefix: Optional[Union[str, Sequence[str]]] - Key prefix
  • metadata: Optional[Dict[str, Any]] - Input metadata
  • input_manager_key: Optional[str] - Input manager key
  • dagster_type: Optional[DagsterType] - Input type
  • partition_mapping: Optional[PartitionMapping] - Partition mapping

AssetOut { .api }

Module: dagster._core.definitions.assets.job.asset_out
Type: Class

Output specification for assets in multi-asset definitions.

from dagster import multi_asset, AssetOut

@multi_asset(
    outs={
        "clean_data": AssetOut(
            key_prefix="staging",
            description="Cleaned version of raw data",
            metadata={"quality_score": 0.95},
            io_manager_key="parquet_io_manager"
        ),
        "summary_stats": AssetOut(
            key_prefix="analytics", 
            description="Summary statistics",
            dagster_type=dict
        )
    }
)
def data_processing():
    """Process data into clean and summary outputs."""
    # Processing logic
    return clean_df, summary_dict

Parameters:

  • key: Optional[Union[str, AssetKey]] - Output asset key
  • key_prefix: Optional[Union[str, Sequence[str]]] - Key prefix
  • dagster_type: Optional[DagsterType] - Output type
  • description: Optional[str] - Output description
  • is_required: bool = True - Whether output is required
  • io_manager_key: Optional[str] - I/O manager key
  • metadata: Optional[Dict[str, Any]] - Output metadata
  • group_name: Optional[str] - Asset group name
  • code_version: Optional[str] - Code version

Asset Dependencies

AssetDep { .api }

Module: dagster._core.definitions.assets.definition.asset_dep
Type: Class

Explicit asset dependency specification with advanced configuration.

from dagster import asset, AssetDep, AssetKey

@asset(
    deps=[
        AssetDep(
            asset=AssetKey("upstream_asset"),
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
        ),
        AssetDep(
            asset="other_dependency", 
            metadata={"relationship": "foreign_key"}
        )
    ]
)
def downstream_asset() -> pd.DataFrame:
    """Asset with explicit dependencies."""
    return compute_result()

Parameters:

  • asset: Union[str, AssetKey, AssetsDefinition, SourceAsset] - Dependency asset
  • partition_mapping: Optional[PartitionMapping] - Partition mapping
  • metadata: Optional[Dict[str, Any]] - Dependency metadata

Source Assets

SourceAsset { .api }

Module: dagster._core.definitions.source_asset
Type: Class

External asset definition for assets not materialized by Dagster.

from dagster import SourceAsset, AssetKey

# Simple source asset
raw_data_source = SourceAsset(
    key=AssetKey("raw_data"),
    description="Raw data from external system",
    metadata={"system": "legacy_db", "table": "raw_events"}
)

# Source asset with observations
user_events = SourceAsset(
    key=AssetKey(["events", "user_actions"]),
    description="User action events from Kafka",
    metadata={"topic": "user-actions", "retention": "7d"},
    io_manager_key="kafka_manager"
)

@asset(deps=[user_events])
def processed_events():
    """Process events from source asset."""
    return transform_events()

Parameters:

  • key: AssetKey - Source asset key
  • metadata: Optional[Dict[str, Any]] - Source metadata
  • io_manager_key: Optional[str] - I/O manager key
  • description: Optional[str] - Source description
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • observe_fn: Optional[Callable] - Observation function
  • auto_observe_interval_minutes: Optional[float] - Auto-observe interval
  • group_name: Optional[str] - Asset group name

Operations System

Operations (ops) are the fundamental computational units in Dagster, representing discrete pieces of work that consume inputs and produce outputs.

Op Decorator

@op { .api }

Module: dagster._core.definitions.decorators.op_decorator
Type: Function decorator

Define an operation (transformation function).

from dagster import op, In, Out, OpExecutionContext
import pandas as pd

@op
def simple_op() -> str:
    """Simple operation with no inputs."""
    return "hello world"

@op(
    ins={"data": In(dagster_type=pd.DataFrame)},
    out=Out(dagster_type=pd.DataFrame, description="Cleaned data"),
    config_schema={"threshold": float},
    required_resource_keys={"database"},
    tags={"team": "data", "env": "prod"}
)
def clean_data(context: OpExecutionContext, data: pd.DataFrame) -> pd.DataFrame:
    """Clean data based on threshold configuration."""
    threshold = context.op_config["threshold"]
    cleaned = data[data.quality_score > threshold]
    
    context.log.info(f"Removed {len(data) - len(cleaned)} rows below threshold {threshold}")
    
    return cleaned

@op(out={"processed": Out(), "summary": Out()})
def multi_output_op() -> tuple[pd.DataFrame, dict]:
    """Operation with multiple outputs."""
    df = process_data()
    summary = {"count": len(df), "columns": df.columns.tolist()}
    return df, summary

Parameters:

  • name: Optional[str] - Op name (defaults to function name)
  • description: Optional[str] - Op description
  • ins: Optional[Mapping[str, In]] - Input specifications
  • out: Optional[Union[Out, Mapping[str, Out]]] - Output specification(s)
  • config_schema: Optional[UserConfigSchema] - Configuration schema
  • required_resource_keys: Optional[AbstractSet[str]] - Required resource keys
  • tags: Optional[Mapping[str, Any]] - Op tags
  • retry_policy: Optional[RetryPolicy] - Retry policy for failed executions
  • code_version: Optional[str] - Code version for change tracking
  • pool: Optional[str] - Execution pool for op execution
  • version: Optional[str] - DEPRECATED - Use code_version instead

Returns: Union[OpDefinition, _Op]

OpDefinition { .api }

Module: dagster._core.definitions.op_definition
Type: Class

Definition of an operation containing its compute function, configuration, and metadata.

# Access op definition properties
op_def = clean_data
name = op_def.name  # Operation name
description = op_def.description  # Operation description
input_defs = op_def.input_defs  # List of input definitions
output_defs = op_def.output_defs  # List of output definitions
config_schema = op_def.config_schema  # Configuration schema
required_resource_keys = op_def.required_resource_keys  # Required resources
tags = op_def.tags  # Operation tags

# Check if op is a generator (for dynamic outputs)
is_generator = op_def.is_generator_op

Key Properties:

  • name: str - Operation name
  • description: Optional[str] - Operation description
  • input_defs: List[InputDefinition] - Input definitions
  • output_defs: List[OutputDefinition] - Output definitions
  • config_schema: Optional[ConfigSchema] - Configuration schema
  • required_resource_keys: Set[str] - Required resource keys
  • tags: Dict[str, Any] - Operation tags
  • code_version: Optional[str] - Code version
  • retry_policy: Optional[RetryPolicy] - Retry policy

Job System

Jobs orchestrate the execution of operations or assets, defining the computational graph and execution parameters.

Job Decorator

@job { .api }

Module: dagster._core.definitions.decorators.job_decorator
Type: Function decorator

Define a job (collection of ops).

from dagster import job, op, Config

@op
def load_data() -> pd.DataFrame:
    return pd.read_csv("input.csv")

@op
def process_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.dropna()

@op  
def save_data(df: pd.DataFrame) -> None:
    df.to_csv("output.csv")

@job(
    description="ETL job for processing data",
    config={"ops": {"load_data": {"config": {"file_path": "data.csv"}}}},
    tags={"team": "data-eng", "env": "production"}
)
def etl_job():
    """ETL job connecting ops."""
    processed = process_data(load_data())
    save_data(processed)

# Job with configuration class
class ETLJobConfig(Config):
    input_path: str = "default.csv"
    output_path: str = "output.csv" 
    
@job(config=ETLJobConfig)
def configurable_etl_job(config: ETLJobConfig):
    """ETL job with typed configuration."""
    # Job logic using config.input_path, config.output_path
    pass

Parameters:

  • name: Optional[str] - Job name (defaults to function name)
  • description: Optional[str] - Job description
  • resource_defs: Optional[Mapping[str, object]] - Resource definitions
  • config: Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]] - Job configuration
  • tags: Optional[Mapping[str, str]] - Job tags for metadata
  • run_tags: Optional[Mapping[str, str]] - Tags applied to each run
  • metadata: Optional[Mapping[str, RawMetadataValue]] - Job metadata for UI display
  • logger_defs: Optional[Mapping[str, LoggerDefinition]] - Logger definitions
  • executor_def: Optional[ExecutorDefinition] - Executor definition
  • hooks: Optional[AbstractSet[HookDefinition]] - Hook definitions
  • op_retry_policy: Optional[RetryPolicy] - Default retry policy for all ops
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • input_values: Optional[Mapping[str, object]] - Direct Python object inputs to the job

Returns: Union[JobDefinition, _Job]

JobDefinition { .api }

Module: dagster._core.definitions.job_definition
Type: Class

Definition of a job containing the computational graph and execution configuration.

# Access job definition properties
job_def = etl_job
name = job_def.name  # Job name
description = job_def.description  # Job description
graph_def = job_def.graph  # Underlying graph definition
resource_defs = job_def.resource_defs  # Resource definitions
executor_def = job_def.executor_def  # Executor definition

# Execute the job
from dagster import execute_job
result = execute_job(job_def, run_config={})

Key Properties:

  • name: str - Job name
  • description: Optional[str] - Job description
  • graph: GraphDefinition - Underlying graph definition
  • resource_defs: Dict[str, ResourceDefinition] - Resource definitions
  • executor_def: ExecutorDefinition - Executor definition
  • logger_defs: Dict[str, LoggerDefinition] - Logger definitions
  • hooks: Set[HookDefinition] - Hook definitions
  • tags: Dict[str, Any] - Job tags
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition
  • asset_layer: Optional[AssetLayer] - Asset layer for asset jobs

Asset Jobs

define_asset_job { .api }

Module: dagster._core.definitions.unresolved_asset_job_definition
Type: Function

Define a job that materializes a selection of assets.

from dagster import define_asset_job, AssetSelection

# Job for specific assets
analytics_job = define_asset_job(
    name="analytics_job",
    selection=AssetSelection.groups("analytics"),
    description="Materialize analytics assets",
    tags={"team": "analytics"}
)

# Job with partitions
daily_job = define_asset_job(
    name="daily_etl",
    selection=AssetSelection.all(),
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 4
                }
            }
        }
    }
)

Parameters:

  • name: str - Job name
  • selection: Optional[Union[str, AssetSelection]] - Asset selection
  • config: Optional[Union[ConfigSchema, Dict[str, Any]]] - Job configuration
  • description: Optional[str] - Job description
  • tags: Optional[Dict[str, Any]] - Job tags
  • executor_def: Optional[ExecutorDefinition] - Executor definition
  • hooks: Optional[Set[HookDefinition]] - Hook definitions
  • partitions_def: Optional[PartitionsDefinition] - Partitions definition

Graph System

Graphs enable composition and reuse of computational logic, allowing complex operations to be built from simpler components.

Graph Decorator

@graph { .api }

Module: dagster._core.definitions.decorators.graph_decorator
Type: Function decorator

Define a reusable graph of operations.

from dagster import graph, op, In, Out

@op
def fetch_data(url: str) -> dict:
    return {"data": f"fetched from {url}"}

@op
def validate_data(data: dict) -> dict:
    return {**data, "validated": True}

@op
def transform_data(data: dict) -> dict:
    return {**data, "transformed": True}

@graph(
    description="Reusable data processing graph",
    ins={"source_url": GraphIn(str)},
    out={"result": GraphOut()}
)
def process_data_graph(source_url: str):
    """Reusable graph for data processing."""
    raw = fetch_data(source_url)
    validated = validate_data(raw)
    return transform_data(validated)

# Use graph in jobs
@job
def etl_job():
    process_data_graph("https://api.example.com/data")

# Convert graph to op for reuse
process_data_op = process_data_graph.to_op()

@job
def complex_job():
    result1 = process_data_op("source1")
    result2 = process_data_op("source2")
    combine_results(result1, result2)

Parameters:

  • name: Optional[str] - Graph name (defaults to function name)
  • description: Optional[str] - Graph description
  • ins: Optional[Dict[str, GraphIn]] - Input specifications
  • out: Optional[Union[GraphOut, Dict[str, GraphOut]]] - Output specification(s)
  • config: Optional[ConfigSchema] - Configuration schema
  • tags: Optional[Dict[str, Any]] - Graph tags

GraphDefinition { .api }

Module: dagster._core.definitions.graph_definition
Type: Class

Definition of a graph containing its computational structure and metadata.

# Access graph definition properties
graph_def = process_data_graph
name = graph_def.name  # Graph name
description = graph_def.description  # Graph description
input_mappings = graph_def.input_mappings  # Input mappings
output_mappings = graph_def.output_mappings  # Output mappings
dependencies = graph_def.dependencies  # Op dependencies

# Convert to job or op
job_def = graph_def.to_job()
op_def = graph_def.to_op()

Key Properties:

  • name: str - Graph name
  • description: Optional[str] - Graph description
  • node_defs: List[NodeDefinition] - Node definitions
  • dependencies: Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]] - Dependencies
  • input_mappings: List[InputMapping] - Input mappings
  • output_mappings: List[OutputMapping] - Output mappings
  • config: Optional[ConfigSchema] - Configuration schema
  • tags: Dict[str, Any] - Graph tags

Repository System

Repositories are collections of definitions that can be loaded together, providing a way to organize and deploy Dagster code.

Repository Decorator

@repository { .api }

Module: dagster._core.definitions.decorators.repository_decorator
Type: Function decorator

Define a repository containing jobs, assets, schedules, and sensors.

from dagster import repository, job, asset, schedule

@asset
def users_asset():
    return load_users()

@job
def analytics_job():
    process_analytics_data()

@schedule(job=analytics_job, cron_schedule="0 9 * * *")
def daily_analytics():
    return {}

@repository
def analytics_repository():
    """Repository containing analytics definitions."""
    return [
        users_asset,
        analytics_job,
        daily_analytics
    ]

# Alternative using Definitions class (recommended)
from dagster import Definitions

defs = Definitions(
    assets=[users_asset],
    jobs=[analytics_job], 
    schedules=[daily_analytics],
    resources={"database": database_resource}
)

Definitions { .api }

Module: dagster._core.definitions.definitions_class
Type: Class

Container for all definitions in a Dagster code location. This is the recommended way to organize Dagster definitions.

from dagster import Definitions, load_assets_from_modules
import my_assets, my_jobs, my_resources

# Complete definitions container
defs = Definitions(
    assets=load_assets_from_modules([my_assets]),
    jobs=[my_jobs.etl_job, my_jobs.ml_job],
    schedules=[my_jobs.daily_schedule],
    sensors=[my_jobs.failure_sensor],
    resources=my_resources.get_resources(),
    asset_checks=my_assets.get_asset_checks(),
    loggers={"custom": custom_logger}
)

# Conditional definitions based on environment
import os

def get_definitions():
    base_resources = {"io_manager": fs_io_manager}
    
    if os.getenv("ENV") == "prod":
        base_resources.update({
            "database": prod_database_resource,
            "data_warehouse": snowflake_resource
        })
    else:
        base_resources.update({
            "database": dev_database_resource,
            "data_warehouse": duckdb_resource
        })
    
    return Definitions(
        assets=load_assets_from_modules([my_assets]),
        resources=base_resources
    )

defs = get_definitions()

Parameters:

  • assets: Optional[Iterable[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]] = None - Asset definitions
  • schedules: Optional[Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]] = None - Schedule definitions
  • sensors: Optional[Iterable[SensorDefinition]] = None - Sensor definitions
  • jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None - Job definitions
  • resources: Optional[Mapping[str, Any]] = None - Resource definitions (accepts ResourceDefinition or any object)
  • executor: Optional[Union[ExecutorDefinition, Executor]] = None - Default executor
  • loggers: Optional[Mapping[str, LoggerDefinition]] = None - Logger definitions
  • asset_checks: Optional[Iterable[AssetsDefinition]] = None - Asset check definitions
  • metadata: Optional[RawMetadataMapping] = None - Definitions-level metadata
  • component_tree: Optional[ComponentTree] = None - Component information for reconstruction

Returns: Definitions - Validated container for all code location definitions

Module Loading Utilities

Dagster provides utilities to automatically load definitions from Python modules and packages.

Asset Loading

load_assets_from_modules { .api }

Module: dagster._core.definitions.module_loaders.load_assets_from_modules
Type: Function

Load all assets from specified Python modules.

from dagster import load_assets_from_modules
import my_assets_module1, my_assets_module2

# Load from multiple modules
all_assets = load_assets_from_modules([
    my_assets_module1, 
    my_assets_module2
])

# Load with group assignment
grouped_assets = load_assets_from_modules(
    [my_assets_module1], 
    group_name="core_data"
)

# Load with key prefix
prefixed_assets = load_assets_from_modules(
    [my_assets_module1],
    key_prefix="staging"
)

Parameters:

  • modules: Sequence[ModuleType] - Modules to load assets from
  • group_name: Optional[str] - Group name to assign to loaded assets
  • key_prefix: Optional[Union[str, Sequence[str]]] - Key prefix for loaded assets
  • freshness_policy: Optional[FreshnessPolicy] - Freshness policy for loaded assets
  • auto_materialize_policy: Optional[AutoMaterializePolicy] - Auto-materialization policy
  • backfill_policy: Optional[BackfillPolicy] - Backfill policy

load_assets_from_package_name { .api }

Module: dagster._core.definitions.module_loaders.load_assets_from_modules
Type: Function

Load assets from a package by name.

# Load from package name
assets = load_assets_from_package_name(
    "my_company.data_assets",
    group_name="company_data"
)

# Recursively load from subpackages
all_package_assets = load_assets_from_package_name(
    "my_company.assets",
    key_prefix=["company", "data"]
)

This documentation provides comprehensive coverage of Dagster's core definition system. The asset system represents the declarative approach to data pipeline development, while operations and jobs provide imperative workflow definition. Graphs enable composition and reuse, and repositories organize definitions for deployment. The module loading utilities facilitate code organization and automatic discovery of definitions.

For execution and runtime behavior, see Execution and Contexts. For configuration of these definitions, see Configuration System.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster

docs

configuration.md

core-definitions.md

error-handling.md

events-metadata.md

execution-contexts.md

index.md

partitions.md

sensors-schedules.md

storage-io.md

tile.json