A cloud-native data pipeline orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.
—
This document covers the fundamental building blocks of Dagster: assets, operations, jobs, graphs, and repositories. These core abstractions form the foundation of all Dagster pipelines.
Assets are the primary abstraction in Dagster, representing data artifacts that exist or should exist. They enable declarative data pipeline development with automatic dependency inference and rich lineage tracking.
@asset { .api }Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator
Define a software-defined asset from a Python function.
from dagster import asset, MaterializeResult
import pandas as pd
@asset
def users_data() -> pd.DataFrame:
"""Load users data from database."""
return pd.read_sql("SELECT * FROM users", connection)
@asset(
deps=["external_source"],
metadata={"owner": "data-team"},
group_name="analytics",
compute_kind="pandas"
)
def processed_users(users_data: pd.DataFrame) -> MaterializeResult:
"""Process users data with metadata."""
processed = users_data.dropna()
return MaterializeResult(
value=processed,
metadata={
"records": len(processed),
"null_dropped": len(users_data) - len(processed)
}
)Parameters:
name: Optional[str] - Asset name (defaults to function name)key_prefix: Optional[CoercibleToAssetKeyPrefix] - Asset key prefixins: Optional[Mapping[str, AssetIn]] - Input specificationsdeps: Optional[Iterable[CoercibleToAssetDep]] - Asset dependenciesmetadata: Optional[ArbitraryMetadataMapping] - Asset metadatatags: Optional[Mapping[str, str]] - Asset tags for UI and filteringdescription: Optional[str] - Asset descriptionconfig_schema: Optional[UserConfigSchema] - Configuration schemarequired_resource_keys: Optional[AbstractSet[str]] - Required resource keysresource_defs: Optional[Mapping[str, object]] - Resource definitions (beta)hooks: Optional[AbstractSet[HookDefinition]] - Success/failure hooksio_manager_def: Optional[object] - I/O manager definition (beta)io_manager_key: Optional[str] - I/O manager keydagster_type: Optional[DagsterType] - Output typepartitions_def: Optional[PartitionsDefinition] - Partitions definitionop_tags: Optional[Mapping[str, Any]] - Operation tagsgroup_name: Optional[str] - Asset group nameoutput_required: bool = True - Whether output is requiredautomation_condition: Optional[AutomationCondition] - Declarative automation conditionfreshness_policy: Optional[InternalFreshnessPolicy] - Freshness policy (internal)backfill_policy: Optional[BackfillPolicy] - Backfill policy (beta)retry_policy: Optional[RetryPolicy] - Retry policy for failed materializationscode_version: Optional[str] - Code version for change trackingkey: Optional[CoercibleToAssetKey] - Explicit asset key (alternative to name)check_specs: Optional[Sequence[AssetCheckSpec]] - Asset check specificationsowners: Optional[Sequence[str]] - Asset owners for metadatakinds: Optional[AbstractSet[str]] - Asset kinds for compute engine tagspool: Optional[str] - Execution pool for asset materializationnon_argument_deps: Optional[Set[AssetKey]] - DEPRECATED - Use deps insteadauto_materialize_policy: Optional[AutoMaterializePolicy] - DEPRECATED - Use automation_conditioncompute_kind: Optional[str] - DEPRECATED - Use kinds insteadReturns: Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]
@multi_asset { .api }Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator
Define multiple related assets from a single function.
from dagster import multi_asset, AssetOut, AssetSpec, MaterializeResult
@multi_asset(
outs={
"users_clean": AssetOut(key_prefix="staging"),
"users_enriched": AssetOut(key_prefix="marts")
}
)
def process_users(users_raw) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Process users into clean and enriched datasets."""
clean = users_raw.dropna()
enriched = clean.merge(demographics_data, on="user_id")
return clean, enriched
# Alternative specification approach
@multi_asset(
specs=[
AssetSpec("customers", group_name="core"),
AssetSpec("customer_metrics", group_name="analytics")
]
)
def customer_pipeline(context) -> dict[str, MaterializeResult]:
"""Generate multiple customer assets."""
customers_df = extract_customers()
metrics_df = calculate_metrics(customers_df)
return {
"customers": MaterializeResult(value=customers_df),
"customer_metrics": MaterializeResult(value=metrics_df)
}Parameters:
outs: Optional[Dict[str, AssetOut]] - Output specifications by namespecs: Optional[Sequence[AssetSpec]] - Asset specificationsname: Optional[str] - Multi-asset nameins: Optional[Dict[str, AssetIn]] - Input specificationsdeps: Optional[Sequence[Union[str, AssetKey, AssetsDefinition]]] - Dependenciesdescription: Optional[str] - Multi-asset descriptionconfig_schema: Optional[ConfigSchema] - Configuration schemarequired_resource_keys: Optional[Set[str]] - Required resource keyscompute_kind: Optional[str] - Compute kind for UIinternal_asset_deps: Optional[Dict[str, Set[AssetKey]]] - Internal dependenciespartitions_def: Optional[PartitionsDefinition] - Partitions definitionbackfill_policy: Optional[BackfillPolicy] - Backfill policyop_tags: Optional[Dict[str, Any]] - Operation tagscan_subset: bool = False - Whether asset can be subsetresource_defs: Optional[Dict[str, ResourceDefinition]] - Resource definitionsgroup_name: Optional[str] - Asset group name@graph_asset { .api }Module: dagster._core.definitions.decorators.asset_decorator
Type: Function decorator
Define an asset composed of a graph of operations.
from dagster import graph_asset, op
@op
def extract_data() -> pd.DataFrame:
return pd.read_csv("data.csv")
@op
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
return df.dropna()
@op
def load_data(df: pd.DataFrame) -> None:
df.to_parquet("output.parquet")
@graph_asset
def etl_pipeline():
"""ETL pipeline as a graph asset."""
load_data(transform_data(extract_data()))AssetsDefinition { .api }Module: dagster._core.definitions.assets.definition.assets_definition
Type: Class
Represents a set of software-defined assets.
from dagster import AssetsDefinition, AssetSpec
# Created from decorator
assets_def = users_data
# Access asset information
asset_keys = assets_def.keys # Set of AssetKey objects
asset_specs = assets_def.specs_by_key # Dict[AssetKey, AssetSpec]
dependencies = assets_def.dependency_keys # Set of dependency keys
# Check if asset can be materialized
can_materialize = assets_def.can_subset
# Get partitions definition
partitions = assets_def.partitions_defKey Properties:
keys: AbstractSet[AssetKey] - Set of asset keysspecs_by_key: Dict[AssetKey, AssetSpec] - Asset specifications by keykeys_by_input_name: Dict[str, AssetKey] - Input name to key mappingkeys_by_output_name: Dict[str, AssetKey] - Output name to key mappingdependency_keys: AbstractSet[AssetKey] - Dependency asset keyspartitions_def: Optional[PartitionsDefinition] - Partitions definitioncan_subset: bool - Whether assets can be subsetexecution_type: AssetExecutionType - Execution typeop: OpDefinition - Underlying operation definitionAssetSpec { .api }Module: dagster._core.definitions.assets.definition.asset_spec
Type: Class
Specification for a single asset, containing metadata and configuration.
from dagster import AssetSpec, AssetKey
# Basic asset specification
spec = AssetSpec(
key="my_asset",
description="Important business data",
metadata={"owner": "data-team"},
group_name="core"
)
# Complex asset specification
complex_spec = AssetSpec(
key=AssetKey(["warehouse", "dim", "customers"]),
description="Customer dimension table",
metadata={
"table_name": "dim_customers",
"schema": "analytics",
"owner": "customer-team"
},
group_name="dimensions",
freshness_policy=freshness_policy,
auto_materialize_policy=AutoMaterializePolicy.eager()
)Parameters:
key: Union[AssetKey, str, Sequence[str]] - Asset keydeps: Optional[Sequence[Union[str, AssetKey, AssetDep]]] - Dependenciesdescription: Optional[str] - Asset descriptionmetadata: Optional[Dict[str, Any]] - Asset metadatagroup_name: Optional[str] - Asset groupfreshness_policy: Optional[FreshnessPolicy] - Freshness policyauto_materialize_policy: Optional[AutoMaterializePolicy] - Auto-materialization policybackfill_policy: Optional[BackfillPolicy] - Backfill policycode_version: Optional[str] - Code versionowners: Optional[Sequence[str]] - Asset ownerstags: Optional[Dict[str, str]] - Asset tagsAssetIn { .api }Module: dagster._core.definitions.assets.job.asset_in
Type: Class
Input specification for assets with metadata and configuration.
from dagster import asset, AssetIn
@asset(
ins={
"upstream_data": AssetIn(
key_prefix="staging",
metadata={"format": "parquet"},
input_manager_key="warehouse_loader"
)
}
)
def processed_asset(upstream_data: pd.DataFrame) -> pd.DataFrame:
"""Process upstream data with custom input config."""
return upstream_data.transform()Parameters:
key: Optional[Union[str, AssetKey]] - Asset key to depend onkey_prefix: Optional[Union[str, Sequence[str]]] - Key prefixmetadata: Optional[Dict[str, Any]] - Input metadatainput_manager_key: Optional[str] - Input manager keydagster_type: Optional[DagsterType] - Input typepartition_mapping: Optional[PartitionMapping] - Partition mappingAssetOut { .api }Module: dagster._core.definitions.assets.job.asset_out
Type: Class
Output specification for assets in multi-asset definitions.
from dagster import multi_asset, AssetOut
@multi_asset(
outs={
"clean_data": AssetOut(
key_prefix="staging",
description="Cleaned version of raw data",
metadata={"quality_score": 0.95},
io_manager_key="parquet_io_manager"
),
"summary_stats": AssetOut(
key_prefix="analytics",
description="Summary statistics",
dagster_type=dict
)
}
)
def data_processing():
"""Process data into clean and summary outputs."""
# Processing logic
return clean_df, summary_dictParameters:
key: Optional[Union[str, AssetKey]] - Output asset keykey_prefix: Optional[Union[str, Sequence[str]]] - Key prefixdagster_type: Optional[DagsterType] - Output typedescription: Optional[str] - Output descriptionis_required: bool = True - Whether output is requiredio_manager_key: Optional[str] - I/O manager keymetadata: Optional[Dict[str, Any]] - Output metadatagroup_name: Optional[str] - Asset group namecode_version: Optional[str] - Code versionAssetDep { .api }Module: dagster._core.definitions.assets.definition.asset_dep
Type: Class
Explicit asset dependency specification with advanced configuration.
from dagster import asset, AssetDep, AssetKey
@asset(
deps=[
AssetDep(
asset=AssetKey("upstream_asset"),
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
),
AssetDep(
asset="other_dependency",
metadata={"relationship": "foreign_key"}
)
]
)
def downstream_asset() -> pd.DataFrame:
"""Asset with explicit dependencies."""
return compute_result()Parameters:
asset: Union[str, AssetKey, AssetsDefinition, SourceAsset] - Dependency assetpartition_mapping: Optional[PartitionMapping] - Partition mappingmetadata: Optional[Dict[str, Any]] - Dependency metadataSourceAsset { .api }Module: dagster._core.definitions.source_asset
Type: Class
External asset definition for assets not materialized by Dagster.
from dagster import SourceAsset, AssetKey
# Simple source asset
raw_data_source = SourceAsset(
key=AssetKey("raw_data"),
description="Raw data from external system",
metadata={"system": "legacy_db", "table": "raw_events"}
)
# Source asset with observations
user_events = SourceAsset(
key=AssetKey(["events", "user_actions"]),
description="User action events from Kafka",
metadata={"topic": "user-actions", "retention": "7d"},
io_manager_key="kafka_manager"
)
@asset(deps=[user_events])
def processed_events():
"""Process events from source asset."""
return transform_events()Parameters:
key: AssetKey - Source asset keymetadata: Optional[Dict[str, Any]] - Source metadataio_manager_key: Optional[str] - I/O manager keydescription: Optional[str] - Source descriptionpartitions_def: Optional[PartitionsDefinition] - Partitions definitionobserve_fn: Optional[Callable] - Observation functionauto_observe_interval_minutes: Optional[float] - Auto-observe intervalgroup_name: Optional[str] - Asset group nameOperations (ops) are the fundamental computational units in Dagster, representing discrete pieces of work that consume inputs and produce outputs.
@op { .api }Module: dagster._core.definitions.decorators.op_decorator
Type: Function decorator
Define an operation (transformation function).
from dagster import op, In, Out, OpExecutionContext
import pandas as pd
@op
def simple_op() -> str:
"""Simple operation with no inputs."""
return "hello world"
@op(
ins={"data": In(dagster_type=pd.DataFrame)},
out=Out(dagster_type=pd.DataFrame, description="Cleaned data"),
config_schema={"threshold": float},
required_resource_keys={"database"},
tags={"team": "data", "env": "prod"}
)
def clean_data(context: OpExecutionContext, data: pd.DataFrame) -> pd.DataFrame:
"""Clean data based on threshold configuration."""
threshold = context.op_config["threshold"]
cleaned = data[data.quality_score > threshold]
context.log.info(f"Removed {len(data) - len(cleaned)} rows below threshold {threshold}")
return cleaned
@op(out={"processed": Out(), "summary": Out()})
def multi_output_op() -> tuple[pd.DataFrame, dict]:
"""Operation with multiple outputs."""
df = process_data()
summary = {"count": len(df), "columns": df.columns.tolist()}
return df, summaryParameters:
name: Optional[str] - Op name (defaults to function name)description: Optional[str] - Op descriptionins: Optional[Mapping[str, In]] - Input specificationsout: Optional[Union[Out, Mapping[str, Out]]] - Output specification(s)config_schema: Optional[UserConfigSchema] - Configuration schemarequired_resource_keys: Optional[AbstractSet[str]] - Required resource keystags: Optional[Mapping[str, Any]] - Op tagsretry_policy: Optional[RetryPolicy] - Retry policy for failed executionscode_version: Optional[str] - Code version for change trackingpool: Optional[str] - Execution pool for op executionversion: Optional[str] - DEPRECATED - Use code_version insteadReturns: Union[OpDefinition, _Op]
OpDefinition { .api }Module: dagster._core.definitions.op_definition
Type: Class
Definition of an operation containing its compute function, configuration, and metadata.
# Access op definition properties
op_def = clean_data
name = op_def.name # Operation name
description = op_def.description # Operation description
input_defs = op_def.input_defs # List of input definitions
output_defs = op_def.output_defs # List of output definitions
config_schema = op_def.config_schema # Configuration schema
required_resource_keys = op_def.required_resource_keys # Required resources
tags = op_def.tags # Operation tags
# Check if op is a generator (for dynamic outputs)
is_generator = op_def.is_generator_opKey Properties:
name: str - Operation namedescription: Optional[str] - Operation descriptioninput_defs: List[InputDefinition] - Input definitionsoutput_defs: List[OutputDefinition] - Output definitionsconfig_schema: Optional[ConfigSchema] - Configuration schemarequired_resource_keys: Set[str] - Required resource keystags: Dict[str, Any] - Operation tagscode_version: Optional[str] - Code versionretry_policy: Optional[RetryPolicy] - Retry policyJobs orchestrate the execution of operations or assets, defining the computational graph and execution parameters.
@job { .api }Module: dagster._core.definitions.decorators.job_decorator
Type: Function decorator
Define a job (collection of ops).
from dagster import job, op, Config
@op
def load_data() -> pd.DataFrame:
return pd.read_csv("input.csv")
@op
def process_data(df: pd.DataFrame) -> pd.DataFrame:
return df.dropna()
@op
def save_data(df: pd.DataFrame) -> None:
df.to_csv("output.csv")
@job(
description="ETL job for processing data",
config={"ops": {"load_data": {"config": {"file_path": "data.csv"}}}},
tags={"team": "data-eng", "env": "production"}
)
def etl_job():
"""ETL job connecting ops."""
processed = process_data(load_data())
save_data(processed)
# Job with configuration class
class ETLJobConfig(Config):
input_path: str = "default.csv"
output_path: str = "output.csv"
@job(config=ETLJobConfig)
def configurable_etl_job(config: ETLJobConfig):
"""ETL job with typed configuration."""
# Job logic using config.input_path, config.output_path
passParameters:
name: Optional[str] - Job name (defaults to function name)description: Optional[str] - Job descriptionresource_defs: Optional[Mapping[str, object]] - Resource definitionsconfig: Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]] - Job configurationtags: Optional[Mapping[str, str]] - Job tags for metadatarun_tags: Optional[Mapping[str, str]] - Tags applied to each runmetadata: Optional[Mapping[str, RawMetadataValue]] - Job metadata for UI displaylogger_defs: Optional[Mapping[str, LoggerDefinition]] - Logger definitionsexecutor_def: Optional[ExecutorDefinition] - Executor definitionhooks: Optional[AbstractSet[HookDefinition]] - Hook definitionsop_retry_policy: Optional[RetryPolicy] - Default retry policy for all opspartitions_def: Optional[PartitionsDefinition] - Partitions definitioninput_values: Optional[Mapping[str, object]] - Direct Python object inputs to the jobReturns: Union[JobDefinition, _Job]
JobDefinition { .api }Module: dagster._core.definitions.job_definition
Type: Class
Definition of a job containing the computational graph and execution configuration.
# Access job definition properties
job_def = etl_job
name = job_def.name # Job name
description = job_def.description # Job description
graph_def = job_def.graph # Underlying graph definition
resource_defs = job_def.resource_defs # Resource definitions
executor_def = job_def.executor_def # Executor definition
# Execute the job
from dagster import execute_job
result = execute_job(job_def, run_config={})Key Properties:
name: str - Job namedescription: Optional[str] - Job descriptiongraph: GraphDefinition - Underlying graph definitionresource_defs: Dict[str, ResourceDefinition] - Resource definitionsexecutor_def: ExecutorDefinition - Executor definitionlogger_defs: Dict[str, LoggerDefinition] - Logger definitionshooks: Set[HookDefinition] - Hook definitionstags: Dict[str, Any] - Job tagspartitions_def: Optional[PartitionsDefinition] - Partitions definitionasset_layer: Optional[AssetLayer] - Asset layer for asset jobsdefine_asset_job { .api }Module: dagster._core.definitions.unresolved_asset_job_definition
Type: Function
Define a job that materializes a selection of assets.
from dagster import define_asset_job, AssetSelection
# Job for specific assets
analytics_job = define_asset_job(
name="analytics_job",
selection=AssetSelection.groups("analytics"),
description="Materialize analytics assets",
tags={"team": "analytics"}
)
# Job with partitions
daily_job = define_asset_job(
name="daily_etl",
selection=AssetSelection.all(),
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
config={
"execution": {
"config": {
"multiprocess": {
"max_concurrent": 4
}
}
}
}
)Parameters:
name: str - Job nameselection: Optional[Union[str, AssetSelection]] - Asset selectionconfig: Optional[Union[ConfigSchema, Dict[str, Any]]] - Job configurationdescription: Optional[str] - Job descriptiontags: Optional[Dict[str, Any]] - Job tagsexecutor_def: Optional[ExecutorDefinition] - Executor definitionhooks: Optional[Set[HookDefinition]] - Hook definitionspartitions_def: Optional[PartitionsDefinition] - Partitions definitionGraphs enable composition and reuse of computational logic, allowing complex operations to be built from simpler components.
@graph { .api }Module: dagster._core.definitions.decorators.graph_decorator
Type: Function decorator
Define a reusable graph of operations.
from dagster import graph, op, In, Out
@op
def fetch_data(url: str) -> dict:
return {"data": f"fetched from {url}"}
@op
def validate_data(data: dict) -> dict:
return {**data, "validated": True}
@op
def transform_data(data: dict) -> dict:
return {**data, "transformed": True}
@graph(
description="Reusable data processing graph",
ins={"source_url": GraphIn(str)},
out={"result": GraphOut()}
)
def process_data_graph(source_url: str):
"""Reusable graph for data processing."""
raw = fetch_data(source_url)
validated = validate_data(raw)
return transform_data(validated)
# Use graph in jobs
@job
def etl_job():
process_data_graph("https://api.example.com/data")
# Convert graph to op for reuse
process_data_op = process_data_graph.to_op()
@job
def complex_job():
result1 = process_data_op("source1")
result2 = process_data_op("source2")
combine_results(result1, result2)Parameters:
name: Optional[str] - Graph name (defaults to function name)description: Optional[str] - Graph descriptionins: Optional[Dict[str, GraphIn]] - Input specificationsout: Optional[Union[GraphOut, Dict[str, GraphOut]]] - Output specification(s)config: Optional[ConfigSchema] - Configuration schematags: Optional[Dict[str, Any]] - Graph tagsGraphDefinition { .api }Module: dagster._core.definitions.graph_definition
Type: Class
Definition of a graph containing its computational structure and metadata.
# Access graph definition properties
graph_def = process_data_graph
name = graph_def.name # Graph name
description = graph_def.description # Graph description
input_mappings = graph_def.input_mappings # Input mappings
output_mappings = graph_def.output_mappings # Output mappings
dependencies = graph_def.dependencies # Op dependencies
# Convert to job or op
job_def = graph_def.to_job()
op_def = graph_def.to_op()Key Properties:
name: str - Graph namedescription: Optional[str] - Graph descriptionnode_defs: List[NodeDefinition] - Node definitionsdependencies: Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]] - Dependenciesinput_mappings: List[InputMapping] - Input mappingsoutput_mappings: List[OutputMapping] - Output mappingsconfig: Optional[ConfigSchema] - Configuration schematags: Dict[str, Any] - Graph tagsRepositories are collections of definitions that can be loaded together, providing a way to organize and deploy Dagster code.
@repository { .api }Module: dagster._core.definitions.decorators.repository_decorator
Type: Function decorator
Define a repository containing jobs, assets, schedules, and sensors.
from dagster import repository, job, asset, schedule
@asset
def users_asset():
return load_users()
@job
def analytics_job():
process_analytics_data()
@schedule(job=analytics_job, cron_schedule="0 9 * * *")
def daily_analytics():
return {}
@repository
def analytics_repository():
"""Repository containing analytics definitions."""
return [
users_asset,
analytics_job,
daily_analytics
]
# Alternative using Definitions class (recommended)
from dagster import Definitions
defs = Definitions(
assets=[users_asset],
jobs=[analytics_job],
schedules=[daily_analytics],
resources={"database": database_resource}
)Definitions { .api }Module: dagster._core.definitions.definitions_class
Type: Class
Container for all definitions in a Dagster code location. This is the recommended way to organize Dagster definitions.
from dagster import Definitions, load_assets_from_modules
import my_assets, my_jobs, my_resources
# Complete definitions container
defs = Definitions(
assets=load_assets_from_modules([my_assets]),
jobs=[my_jobs.etl_job, my_jobs.ml_job],
schedules=[my_jobs.daily_schedule],
sensors=[my_jobs.failure_sensor],
resources=my_resources.get_resources(),
asset_checks=my_assets.get_asset_checks(),
loggers={"custom": custom_logger}
)
# Conditional definitions based on environment
import os
def get_definitions():
base_resources = {"io_manager": fs_io_manager}
if os.getenv("ENV") == "prod":
base_resources.update({
"database": prod_database_resource,
"data_warehouse": snowflake_resource
})
else:
base_resources.update({
"database": dev_database_resource,
"data_warehouse": duckdb_resource
})
return Definitions(
assets=load_assets_from_modules([my_assets]),
resources=base_resources
)
defs = get_definitions()Parameters:
assets: Optional[Iterable[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]] = None - Asset definitionsschedules: Optional[Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]] = None - Schedule definitionssensors: Optional[Iterable[SensorDefinition]] = None - Sensor definitionsjobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None - Job definitionsresources: Optional[Mapping[str, Any]] = None - Resource definitions (accepts ResourceDefinition or any object)executor: Optional[Union[ExecutorDefinition, Executor]] = None - Default executorloggers: Optional[Mapping[str, LoggerDefinition]] = None - Logger definitionsasset_checks: Optional[Iterable[AssetsDefinition]] = None - Asset check definitionsmetadata: Optional[RawMetadataMapping] = None - Definitions-level metadatacomponent_tree: Optional[ComponentTree] = None - Component information for reconstructionReturns: Definitions - Validated container for all code location definitions
Dagster provides utilities to automatically load definitions from Python modules and packages.
load_assets_from_modules { .api }Module: dagster._core.definitions.module_loaders.load_assets_from_modules
Type: Function
Load all assets from specified Python modules.
from dagster import load_assets_from_modules
import my_assets_module1, my_assets_module2
# Load from multiple modules
all_assets = load_assets_from_modules([
my_assets_module1,
my_assets_module2
])
# Load with group assignment
grouped_assets = load_assets_from_modules(
[my_assets_module1],
group_name="core_data"
)
# Load with key prefix
prefixed_assets = load_assets_from_modules(
[my_assets_module1],
key_prefix="staging"
)Parameters:
modules: Sequence[ModuleType] - Modules to load assets fromgroup_name: Optional[str] - Group name to assign to loaded assetskey_prefix: Optional[Union[str, Sequence[str]]] - Key prefix for loaded assetsfreshness_policy: Optional[FreshnessPolicy] - Freshness policy for loaded assetsauto_materialize_policy: Optional[AutoMaterializePolicy] - Auto-materialization policybackfill_policy: Optional[BackfillPolicy] - Backfill policyload_assets_from_package_name { .api }Module: dagster._core.definitions.module_loaders.load_assets_from_modules
Type: Function
Load assets from a package by name.
# Load from package name
assets = load_assets_from_package_name(
"my_company.data_assets",
group_name="company_data"
)
# Recursively load from subpackages
all_package_assets = load_assets_from_package_name(
"my_company.assets",
key_prefix=["company", "data"]
)This documentation provides comprehensive coverage of Dagster's core definition system. The asset system represents the declarative approach to data pipeline development, while operations and jobs provide imperative workflow definition. Graphs enable composition and reuse, and repositories organize definitions for deployment. The module loading utilities facilitate code organization and automatic discovery of definitions.
For execution and runtime behavior, see Execution and Contexts. For configuration of these definitions, see Configuration System.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster