0
# Core Definitions
1
2
This document covers the fundamental building blocks of Dagster: assets, operations, jobs, graphs, and repositories. These core abstractions form the foundation of all Dagster pipelines.
3
4
## Asset System
5
6
Assets are the primary abstraction in Dagster, representing data artifacts that exist or should exist. They enable declarative data pipeline development with automatic dependency inference and rich lineage tracking.
7
8
### Asset Decorators
9
10
#### `@asset` { .api }
11
12
**Module:** `dagster._core.definitions.decorators.asset_decorator`
13
**Type:** Function decorator
14
15
Define a software-defined asset from a Python function.
16
17
```python
18
from dagster import asset, MaterializeResult
19
import pandas as pd
20
21
@asset
22
def users_data() -> pd.DataFrame:
23
"""Load users data from database."""
24
return pd.read_sql("SELECT * FROM users", connection)
25
26
@asset(
27
deps=["external_source"],
28
metadata={"owner": "data-team"},
29
group_name="analytics",
30
compute_kind="pandas"
31
)
32
def processed_users(users_data: pd.DataFrame) -> MaterializeResult:
33
"""Process users data with metadata."""
34
processed = users_data.dropna()
35
return MaterializeResult(
36
value=processed,
37
metadata={
38
"records": len(processed),
39
"null_dropped": len(users_data) - len(processed)
40
}
41
)
42
```
43
44
**Parameters:**
45
- `name: Optional[str]` - Asset name (defaults to function name)
46
- `key_prefix: Optional[CoercibleToAssetKeyPrefix]` - Asset key prefix
47
- `ins: Optional[Mapping[str, AssetIn]]` - Input specifications
48
- `deps: Optional[Iterable[CoercibleToAssetDep]]` - Asset dependencies
49
- `metadata: Optional[ArbitraryMetadataMapping]` - Asset metadata
50
- `tags: Optional[Mapping[str, str]]` - Asset tags for UI and filtering
51
- `description: Optional[str]` - Asset description
52
- `config_schema: Optional[UserConfigSchema]` - Configuration schema
53
- `required_resource_keys: Optional[AbstractSet[str]]` - Required resource keys
54
- `resource_defs: Optional[Mapping[str, object]]` - Resource definitions (beta)
55
- `hooks: Optional[AbstractSet[HookDefinition]]` - Success/failure hooks
56
- `io_manager_def: Optional[object]` - I/O manager definition (beta)
57
- `io_manager_key: Optional[str]` - I/O manager key
58
- `dagster_type: Optional[DagsterType]` - Output type
59
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
60
- `op_tags: Optional[Mapping[str, Any]]` - Operation tags
61
- `group_name: Optional[str]` - Asset group name
62
- `output_required: bool = True` - Whether output is required
63
- `automation_condition: Optional[AutomationCondition]` - Declarative automation condition
64
- `freshness_policy: Optional[InternalFreshnessPolicy]` - Freshness policy (internal)
65
- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy (beta)
66
- `retry_policy: Optional[RetryPolicy]` - Retry policy for failed materializations
67
- `code_version: Optional[str]` - Code version for change tracking
68
- `key: Optional[CoercibleToAssetKey]` - Explicit asset key (alternative to name)
69
- `check_specs: Optional[Sequence[AssetCheckSpec]]` - Asset check specifications
70
- `owners: Optional[Sequence[str]]` - Asset owners for metadata
71
- `kinds: Optional[AbstractSet[str]]` - Asset kinds for compute engine tags
72
- `pool: Optional[str]` - Execution pool for asset materialization
73
- `non_argument_deps: Optional[Set[AssetKey]]` - **DEPRECATED** - Use `deps` instead
74
- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - **DEPRECATED** - Use `automation_condition`
75
- `compute_kind: Optional[str]` - **DEPRECATED** - Use `kinds` instead
76
77
**Returns:** `Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]`
78
79
#### `@multi_asset` { .api }
80
81
**Module:** `dagster._core.definitions.decorators.asset_decorator`
82
**Type:** Function decorator
83
84
Define multiple related assets from a single function.
85
86
```python
87
from dagster import multi_asset, AssetOut, AssetSpec, MaterializeResult
88
89
@multi_asset(
90
outs={
91
"users_clean": AssetOut(key_prefix="staging"),
92
"users_enriched": AssetOut(key_prefix="marts")
93
}
94
)
95
def process_users(users_raw) -> tuple[pd.DataFrame, pd.DataFrame]:
96
"""Process users into clean and enriched datasets."""
97
clean = users_raw.dropna()
98
enriched = clean.merge(demographics_data, on="user_id")
99
return clean, enriched
100
101
# Alternative specification approach
102
@multi_asset(
103
specs=[
104
AssetSpec("customers", group_name="core"),
105
AssetSpec("customer_metrics", group_name="analytics")
106
]
107
)
108
def customer_pipeline(context) -> dict[str, MaterializeResult]:
109
"""Generate multiple customer assets."""
110
customers_df = extract_customers()
111
metrics_df = calculate_metrics(customers_df)
112
113
return {
114
"customers": MaterializeResult(value=customers_df),
115
"customer_metrics": MaterializeResult(value=metrics_df)
116
}
117
```
118
119
**Parameters:**
120
- `outs: Optional[Dict[str, AssetOut]]` - Output specifications by name
121
- `specs: Optional[Sequence[AssetSpec]]` - Asset specifications
122
- `name: Optional[str]` - Multi-asset name
123
- `ins: Optional[Dict[str, AssetIn]]` - Input specifications
124
- `deps: Optional[Sequence[Union[str, AssetKey, AssetsDefinition]]]` - Dependencies
125
- `description: Optional[str]` - Multi-asset description
126
- `config_schema: Optional[ConfigSchema]` - Configuration schema
127
- `required_resource_keys: Optional[Set[str]]` - Required resource keys
128
- `compute_kind: Optional[str]` - Compute kind for UI
129
- `internal_asset_deps: Optional[Dict[str, Set[AssetKey]]]` - Internal dependencies
130
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
131
- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy
132
- `op_tags: Optional[Dict[str, Any]]` - Operation tags
133
- `can_subset: bool = False` - Whether asset can be subset
134
- `resource_defs: Optional[Dict[str, ResourceDefinition]]` - Resource definitions
135
- `group_name: Optional[str]` - Asset group name
136
137
#### `@graph_asset` { .api }
138
139
**Module:** `dagster._core.definitions.decorators.asset_decorator`
140
**Type:** Function decorator
141
142
Define an asset composed of a graph of operations.
143
144
```python
145
from dagster import graph_asset, op
146
147
@op
148
def extract_data() -> pd.DataFrame:
149
return pd.read_csv("data.csv")
150
151
@op
152
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
153
return df.dropna()
154
155
@op
156
def load_data(df: pd.DataFrame) -> None:
157
df.to_parquet("output.parquet")
158
159
@graph_asset
160
def etl_pipeline():
161
"""ETL pipeline as a graph asset."""
162
load_data(transform_data(extract_data()))
163
```
164
165
### Asset Definition Classes
166
167
#### `AssetsDefinition` { .api }
168
169
**Module:** `dagster._core.definitions.assets.definition.assets_definition`
170
**Type:** Class
171
172
Represents a set of software-defined assets.
173
174
```python
175
from dagster import AssetsDefinition, AssetSpec
176
177
# Created from decorator
178
assets_def = users_data
179
180
# Access asset information
181
asset_keys = assets_def.keys # Set of AssetKey objects
182
asset_specs = assets_def.specs_by_key # Dict[AssetKey, AssetSpec]
183
dependencies = assets_def.dependency_keys # Set of dependency keys
184
185
# Check if asset can be materialized
186
can_materialize = assets_def.can_subset
187
188
# Get partitions definition
189
partitions = assets_def.partitions_def
190
```
191
192
**Key Properties:**
193
- `keys: AbstractSet[AssetKey]` - Set of asset keys
194
- `specs_by_key: Dict[AssetKey, AssetSpec]` - Asset specifications by key
195
- `keys_by_input_name: Dict[str, AssetKey]` - Input name to key mapping
196
- `keys_by_output_name: Dict[str, AssetKey]` - Output name to key mapping
197
- `dependency_keys: AbstractSet[AssetKey]` - Dependency asset keys
198
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
199
- `can_subset: bool` - Whether assets can be subset
200
- `execution_type: AssetExecutionType` - Execution type
201
- `op: OpDefinition` - Underlying operation definition
202
203
#### `AssetSpec` { .api }
204
205
**Module:** `dagster._core.definitions.assets.definition.asset_spec`
206
**Type:** Class
207
208
Specification for a single asset, containing metadata and configuration.
209
210
```python
211
from dagster import AssetSpec, AssetKey
212
213
# Basic asset specification
214
spec = AssetSpec(
215
key="my_asset",
216
description="Important business data",
217
metadata={"owner": "data-team"},
218
group_name="core"
219
)
220
221
# Complex asset specification
222
complex_spec = AssetSpec(
223
key=AssetKey(["warehouse", "dim", "customers"]),
224
description="Customer dimension table",
225
metadata={
226
"table_name": "dim_customers",
227
"schema": "analytics",
228
"owner": "customer-team"
229
},
230
group_name="dimensions",
231
freshness_policy=freshness_policy,
232
auto_materialize_policy=AutoMaterializePolicy.eager()
233
)
234
```
235
236
**Parameters:**
237
- `key: Union[AssetKey, str, Sequence[str]]` - Asset key
238
- `deps: Optional[Sequence[Union[str, AssetKey, AssetDep]]]` - Dependencies
239
- `description: Optional[str]` - Asset description
240
- `metadata: Optional[Dict[str, Any]]` - Asset metadata
241
- `group_name: Optional[str]` - Asset group
242
- `freshness_policy: Optional[FreshnessPolicy]` - Freshness policy
243
- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - Auto-materialization policy
244
- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy
245
- `code_version: Optional[str]` - Code version
246
- `owners: Optional[Sequence[str]]` - Asset owners
247
- `tags: Optional[Dict[str, str]]` - Asset tags
248
249
### Asset I/O Specifications
250
251
#### `AssetIn` { .api }
252
253
**Module:** `dagster._core.definitions.assets.job.asset_in`
254
**Type:** Class
255
256
Input specification for assets with metadata and configuration.
257
258
```python
259
from dagster import asset, AssetIn
260
261
@asset(
262
ins={
263
"upstream_data": AssetIn(
264
key_prefix="staging",
265
metadata={"format": "parquet"},
266
input_manager_key="warehouse_loader"
267
)
268
}
269
)
270
def processed_asset(upstream_data: pd.DataFrame) -> pd.DataFrame:
271
"""Process upstream data with custom input config."""
272
return upstream_data.transform()
273
```
274
275
**Parameters:**
276
- `key: Optional[Union[str, AssetKey]]` - Asset key to depend on
277
- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix
278
- `metadata: Optional[Dict[str, Any]]` - Input metadata
279
- `input_manager_key: Optional[str]` - Input manager key
280
- `dagster_type: Optional[DagsterType]` - Input type
281
- `partition_mapping: Optional[PartitionMapping]` - Partition mapping
282
283
#### `AssetOut` { .api }
284
285
**Module:** `dagster._core.definitions.assets.job.asset_out`
286
**Type:** Class
287
288
Output specification for assets in multi-asset definitions.
289
290
```python
291
from dagster import multi_asset, AssetOut
292
293
@multi_asset(
294
outs={
295
"clean_data": AssetOut(
296
key_prefix="staging",
297
description="Cleaned version of raw data",
298
metadata={"quality_score": 0.95},
299
io_manager_key="parquet_io_manager"
300
),
301
"summary_stats": AssetOut(
302
key_prefix="analytics",
303
description="Summary statistics",
304
dagster_type=dict
305
)
306
}
307
)
308
def data_processing():
309
"""Process data into clean and summary outputs."""
310
# Processing logic
311
return clean_df, summary_dict
312
```
313
314
**Parameters:**
315
- `key: Optional[Union[str, AssetKey]]` - Output asset key
316
- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix
317
- `dagster_type: Optional[DagsterType]` - Output type
318
- `description: Optional[str]` - Output description
319
- `is_required: bool = True` - Whether output is required
320
- `io_manager_key: Optional[str]` - I/O manager key
321
- `metadata: Optional[Dict[str, Any]]` - Output metadata
322
- `group_name: Optional[str]` - Asset group name
323
- `code_version: Optional[str]` - Code version
324
325
### Asset Dependencies
326
327
#### `AssetDep` { .api }
328
329
**Module:** `dagster._core.definitions.assets.definition.asset_dep`
330
**Type:** Class
331
332
Explicit asset dependency specification with advanced configuration.
333
334
```python
335
from dagster import asset, AssetDep, AssetKey
336
337
@asset(
338
deps=[
339
AssetDep(
340
asset=AssetKey("upstream_asset"),
341
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)
342
),
343
AssetDep(
344
asset="other_dependency",
345
metadata={"relationship": "foreign_key"}
346
)
347
]
348
)
349
def downstream_asset() -> pd.DataFrame:
350
"""Asset with explicit dependencies."""
351
return compute_result()
352
```
353
354
**Parameters:**
355
- `asset: Union[str, AssetKey, AssetsDefinition, SourceAsset]` - Dependency asset
356
- `partition_mapping: Optional[PartitionMapping]` - Partition mapping
357
- `metadata: Optional[Dict[str, Any]]` - Dependency metadata
358
359
### Source Assets
360
361
#### `SourceAsset` { .api }
362
363
**Module:** `dagster._core.definitions.source_asset`
364
**Type:** Class
365
366
External asset definition for assets not materialized by Dagster.
367
368
```python
369
from dagster import SourceAsset, AssetKey
370
371
# Simple source asset
372
raw_data_source = SourceAsset(
373
key=AssetKey("raw_data"),
374
description="Raw data from external system",
375
metadata={"system": "legacy_db", "table": "raw_events"}
376
)
377
378
# Source asset with observations
379
user_events = SourceAsset(
380
key=AssetKey(["events", "user_actions"]),
381
description="User action events from Kafka",
382
metadata={"topic": "user-actions", "retention": "7d"},
383
io_manager_key="kafka_manager"
384
)
385
386
@asset(deps=[user_events])
387
def processed_events():
388
"""Process events from source asset."""
389
return transform_events()
390
```
391
392
**Parameters:**
393
- `key: AssetKey` - Source asset key
394
- `metadata: Optional[Dict[str, Any]]` - Source metadata
395
- `io_manager_key: Optional[str]` - I/O manager key
396
- `description: Optional[str]` - Source description
397
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
398
- `observe_fn: Optional[Callable]` - Observation function
399
- `auto_observe_interval_minutes: Optional[float]` - Auto-observe interval
400
- `group_name: Optional[str]` - Asset group name
401
402
## Operations System
403
404
Operations (ops) are the fundamental computational units in Dagster, representing discrete pieces of work that consume inputs and produce outputs.
405
406
### Op Decorator
407
408
#### `@op` { .api }
409
410
**Module:** `dagster._core.definitions.decorators.op_decorator`
411
**Type:** Function decorator
412
413
Define an operation (transformation function).
414
415
```python
416
from dagster import op, In, Out, OpExecutionContext
417
import pandas as pd
418
419
@op
420
def simple_op() -> str:
421
"""Simple operation with no inputs."""
422
return "hello world"
423
424
@op(
425
ins={"data": In(dagster_type=pd.DataFrame)},
426
out=Out(dagster_type=pd.DataFrame, description="Cleaned data"),
427
config_schema={"threshold": float},
428
required_resource_keys={"database"},
429
tags={"team": "data", "env": "prod"}
430
)
431
def clean_data(context: OpExecutionContext, data: pd.DataFrame) -> pd.DataFrame:
432
"""Clean data based on threshold configuration."""
433
threshold = context.op_config["threshold"]
434
cleaned = data[data.quality_score > threshold]
435
436
context.log.info(f"Removed {len(data) - len(cleaned)} rows below threshold {threshold}")
437
438
return cleaned
439
440
@op(out={"processed": Out(), "summary": Out()})
441
def multi_output_op() -> tuple[pd.DataFrame, dict]:
442
"""Operation with multiple outputs."""
443
df = process_data()
444
summary = {"count": len(df), "columns": df.columns.tolist()}
445
return df, summary
446
```
447
448
**Parameters:**
449
- `name: Optional[str]` - Op name (defaults to function name)
450
- `description: Optional[str]` - Op description
451
- `ins: Optional[Mapping[str, In]]` - Input specifications
452
- `out: Optional[Union[Out, Mapping[str, Out]]]` - Output specification(s)
453
- `config_schema: Optional[UserConfigSchema]` - Configuration schema
454
- `required_resource_keys: Optional[AbstractSet[str]]` - Required resource keys
455
- `tags: Optional[Mapping[str, Any]]` - Op tags
456
- `retry_policy: Optional[RetryPolicy]` - Retry policy for failed executions
457
- `code_version: Optional[str]` - Code version for change tracking
458
- `pool: Optional[str]` - Execution pool for op execution
459
- `version: Optional[str]` - **DEPRECATED** - Use `code_version` instead
460
461
**Returns:** `Union[OpDefinition, _Op]`
462
463
#### `OpDefinition` { .api }
464
465
**Module:** `dagster._core.definitions.op_definition`
466
**Type:** Class
467
468
Definition of an operation containing its compute function, configuration, and metadata.
469
470
```python
471
# Access op definition properties
472
op_def = clean_data
473
name = op_def.name # Operation name
474
description = op_def.description # Operation description
475
input_defs = op_def.input_defs # List of input definitions
476
output_defs = op_def.output_defs # List of output definitions
477
config_schema = op_def.config_schema # Configuration schema
478
required_resource_keys = op_def.required_resource_keys # Required resources
479
tags = op_def.tags # Operation tags
480
481
# Check if op is a generator (for dynamic outputs)
482
is_generator = op_def.is_generator_op
483
```
484
485
**Key Properties:**
486
- `name: str` - Operation name
487
- `description: Optional[str]` - Operation description
488
- `input_defs: List[InputDefinition]` - Input definitions
489
- `output_defs: List[OutputDefinition]` - Output definitions
490
- `config_schema: Optional[ConfigSchema]` - Configuration schema
491
- `required_resource_keys: Set[str]` - Required resource keys
492
- `tags: Dict[str, Any]` - Operation tags
493
- `code_version: Optional[str]` - Code version
494
- `retry_policy: Optional[RetryPolicy]` - Retry policy
495
496
## Job System
497
498
Jobs orchestrate the execution of operations or assets, defining the computational graph and execution parameters.
499
500
### Job Decorator
501
502
#### `@job` { .api }
503
504
**Module:** `dagster._core.definitions.decorators.job_decorator`
505
**Type:** Function decorator
506
507
Define a job (collection of ops).
508
509
```python
510
from dagster import job, op, Config
511
512
@op
513
def load_data() -> pd.DataFrame:
514
return pd.read_csv("input.csv")
515
516
@op
517
def process_data(df: pd.DataFrame) -> pd.DataFrame:
518
return df.dropna()
519
520
@op
521
def save_data(df: pd.DataFrame) -> None:
522
df.to_csv("output.csv")
523
524
@job(
525
description="ETL job for processing data",
526
config={"ops": {"load_data": {"config": {"file_path": "data.csv"}}}},
527
tags={"team": "data-eng", "env": "production"}
528
)
529
def etl_job():
530
"""ETL job connecting ops."""
531
processed = process_data(load_data())
532
save_data(processed)
533
534
# Job with configuration class
535
class ETLJobConfig(Config):
536
input_path: str = "default.csv"
537
output_path: str = "output.csv"
538
539
@job(config=ETLJobConfig)
540
def configurable_etl_job(config: ETLJobConfig):
541
"""ETL job with typed configuration."""
542
# Job logic using config.input_path, config.output_path
543
pass
544
```
545
546
**Parameters:**
547
- `name: Optional[str]` - Job name (defaults to function name)
548
- `description: Optional[str]` - Job description
549
- `resource_defs: Optional[Mapping[str, object]]` - Resource definitions
550
- `config: Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]]` - Job configuration
551
- `tags: Optional[Mapping[str, str]]` - Job tags for metadata
552
- `run_tags: Optional[Mapping[str, str]]` - Tags applied to each run
553
- `metadata: Optional[Mapping[str, RawMetadataValue]]` - Job metadata for UI display
554
- `logger_defs: Optional[Mapping[str, LoggerDefinition]]` - Logger definitions
555
- `executor_def: Optional[ExecutorDefinition]` - Executor definition
556
- `hooks: Optional[AbstractSet[HookDefinition]]` - Hook definitions
557
- `op_retry_policy: Optional[RetryPolicy]` - Default retry policy for all ops
558
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
559
- `input_values: Optional[Mapping[str, object]]` - Direct Python object inputs to the job
560
561
**Returns:** `Union[JobDefinition, _Job]`
562
563
#### `JobDefinition` { .api }
564
565
**Module:** `dagster._core.definitions.job_definition`
566
**Type:** Class
567
568
Definition of a job containing the computational graph and execution configuration.
569
570
```python
571
# Access job definition properties
572
job_def = etl_job
573
name = job_def.name # Job name
574
description = job_def.description # Job description
575
graph_def = job_def.graph # Underlying graph definition
576
resource_defs = job_def.resource_defs # Resource definitions
577
executor_def = job_def.executor_def # Executor definition
578
579
# Execute the job
580
from dagster import execute_job
581
result = execute_job(job_def, run_config={})
582
```
583
584
**Key Properties:**
585
- `name: str` - Job name
586
- `description: Optional[str]` - Job description
587
- `graph: GraphDefinition` - Underlying graph definition
588
- `resource_defs: Dict[str, ResourceDefinition]` - Resource definitions
589
- `executor_def: ExecutorDefinition` - Executor definition
590
- `logger_defs: Dict[str, LoggerDefinition]` - Logger definitions
591
- `hooks: Set[HookDefinition]` - Hook definitions
592
- `tags: Dict[str, Any]` - Job tags
593
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
594
- `asset_layer: Optional[AssetLayer]` - Asset layer for asset jobs
595
596
### Asset Jobs
597
598
#### `define_asset_job` { .api }
599
600
**Module:** `dagster._core.definitions.unresolved_asset_job_definition`
601
**Type:** Function
602
603
Define a job that materializes a selection of assets.
604
605
```python
606
from dagster import define_asset_job, AssetSelection
607
608
# Job for specific assets
609
analytics_job = define_asset_job(
610
name="analytics_job",
611
selection=AssetSelection.groups("analytics"),
612
description="Materialize analytics assets",
613
tags={"team": "analytics"}
614
)
615
616
# Job with partitions
617
daily_job = define_asset_job(
618
name="daily_etl",
619
selection=AssetSelection.all(),
620
partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
621
config={
622
"execution": {
623
"config": {
624
"multiprocess": {
625
"max_concurrent": 4
626
}
627
}
628
}
629
}
630
)
631
```
632
633
**Parameters:**
634
- `name: str` - Job name
635
- `selection: Optional[Union[str, AssetSelection]]` - Asset selection
636
- `config: Optional[Union[ConfigSchema, Dict[str, Any]]]` - Job configuration
637
- `description: Optional[str]` - Job description
638
- `tags: Optional[Dict[str, Any]]` - Job tags
639
- `executor_def: Optional[ExecutorDefinition]` - Executor definition
640
- `hooks: Optional[Set[HookDefinition]]` - Hook definitions
641
- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition
642
643
## Graph System
644
645
Graphs enable composition and reuse of computational logic, allowing complex operations to be built from simpler components.
646
647
### Graph Decorator
648
649
#### `@graph` { .api }
650
651
**Module:** `dagster._core.definitions.decorators.graph_decorator`
652
**Type:** Function decorator
653
654
Define a reusable graph of operations.
655
656
```python
657
from dagster import graph, op, In, Out
658
659
@op
660
def fetch_data(url: str) -> dict:
661
return {"data": f"fetched from {url}"}
662
663
@op
664
def validate_data(data: dict) -> dict:
665
return {**data, "validated": True}
666
667
@op
668
def transform_data(data: dict) -> dict:
669
return {**data, "transformed": True}
670
671
@graph(
672
description="Reusable data processing graph",
673
ins={"source_url": GraphIn(str)},
674
out={"result": GraphOut()}
675
)
676
def process_data_graph(source_url: str):
677
"""Reusable graph for data processing."""
678
raw = fetch_data(source_url)
679
validated = validate_data(raw)
680
return transform_data(validated)
681
682
# Use graph in jobs
683
@job
684
def etl_job():
685
process_data_graph("https://api.example.com/data")
686
687
# Convert graph to op for reuse
688
process_data_op = process_data_graph.to_op()
689
690
@job
691
def complex_job():
692
result1 = process_data_op("source1")
693
result2 = process_data_op("source2")
694
combine_results(result1, result2)
695
```
696
697
**Parameters:**
698
- `name: Optional[str]` - Graph name (defaults to function name)
699
- `description: Optional[str]` - Graph description
700
- `ins: Optional[Dict[str, GraphIn]]` - Input specifications
701
- `out: Optional[Union[GraphOut, Dict[str, GraphOut]]]` - Output specification(s)
702
- `config: Optional[ConfigSchema]` - Configuration schema
703
- `tags: Optional[Dict[str, Any]]` - Graph tags
704
705
#### `GraphDefinition` { .api }
706
707
**Module:** `dagster._core.definitions.graph_definition`
708
**Type:** Class
709
710
Definition of a graph containing its computational structure and metadata.
711
712
```python
713
# Access graph definition properties
714
graph_def = process_data_graph
715
name = graph_def.name # Graph name
716
description = graph_def.description # Graph description
717
input_mappings = graph_def.input_mappings # Input mappings
718
output_mappings = graph_def.output_mappings # Output mappings
719
dependencies = graph_def.dependencies # Op dependencies
720
721
# Convert to job or op
722
job_def = graph_def.to_job()
723
op_def = graph_def.to_op()
724
```
725
726
**Key Properties:**
727
- `name: str` - Graph name
728
- `description: Optional[str]` - Graph description
729
- `node_defs: List[NodeDefinition]` - Node definitions
730
- `dependencies: Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]]` - Dependencies
731
- `input_mappings: List[InputMapping]` - Input mappings
732
- `output_mappings: List[OutputMapping]` - Output mappings
733
- `config: Optional[ConfigSchema]` - Configuration schema
734
- `tags: Dict[str, Any]` - Graph tags
735
736
## Repository System
737
738
Repositories are collections of definitions that can be loaded together, providing a way to organize and deploy Dagster code.
739
740
### Repository Decorator
741
742
#### `@repository` { .api }
743
744
**Module:** `dagster._core.definitions.decorators.repository_decorator`
745
**Type:** Function decorator
746
747
Define a repository containing jobs, assets, schedules, and sensors.
748
749
```python
750
from dagster import repository, job, asset, schedule
751
752
@asset
753
def users_asset():
754
return load_users()
755
756
@job
757
def analytics_job():
758
process_analytics_data()
759
760
@schedule(job=analytics_job, cron_schedule="0 9 * * *")
761
def daily_analytics():
762
return {}
763
764
@repository
765
def analytics_repository():
766
"""Repository containing analytics definitions."""
767
return [
768
users_asset,
769
analytics_job,
770
daily_analytics
771
]
772
773
# Alternative using Definitions class (recommended)
774
from dagster import Definitions
775
776
defs = Definitions(
777
assets=[users_asset],
778
jobs=[analytics_job],
779
schedules=[daily_analytics],
780
resources={"database": database_resource}
781
)
782
```
783
784
#### `Definitions` { .api }
785
786
**Module:** `dagster._core.definitions.definitions_class`
787
**Type:** Class
788
789
Container for all definitions in a Dagster code location. This is the recommended way to organize Dagster definitions.
790
791
```python
792
from dagster import Definitions, load_assets_from_modules
793
import my_assets, my_jobs, my_resources
794
795
# Complete definitions container
796
defs = Definitions(
797
assets=load_assets_from_modules([my_assets]),
798
jobs=[my_jobs.etl_job, my_jobs.ml_job],
799
schedules=[my_jobs.daily_schedule],
800
sensors=[my_jobs.failure_sensor],
801
resources=my_resources.get_resources(),
802
asset_checks=my_assets.get_asset_checks(),
803
loggers={"custom": custom_logger}
804
)
805
806
# Conditional definitions based on environment
807
import os
808
809
def get_definitions():
810
base_resources = {"io_manager": fs_io_manager}
811
812
if os.getenv("ENV") == "prod":
813
base_resources.update({
814
"database": prod_database_resource,
815
"data_warehouse": snowflake_resource
816
})
817
else:
818
base_resources.update({
819
"database": dev_database_resource,
820
"data_warehouse": duckdb_resource
821
})
822
823
return Definitions(
824
assets=load_assets_from_modules([my_assets]),
825
resources=base_resources
826
)
827
828
defs = get_definitions()
829
```
830
831
**Parameters:**
832
- `assets: Optional[Iterable[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]] = None` - Asset definitions
833
- `schedules: Optional[Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]] = None` - Schedule definitions
834
- `sensors: Optional[Iterable[SensorDefinition]] = None` - Sensor definitions
835
- `jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None` - Job definitions
836
- `resources: Optional[Mapping[str, Any]] = None` - Resource definitions (accepts ResourceDefinition or any object)
837
- `executor: Optional[Union[ExecutorDefinition, Executor]] = None` - Default executor
838
- `loggers: Optional[Mapping[str, LoggerDefinition]] = None` - Logger definitions
839
- `asset_checks: Optional[Iterable[AssetsDefinition]] = None` - Asset check definitions
840
- `metadata: Optional[RawMetadataMapping] = None` - Definitions-level metadata
841
- `component_tree: Optional[ComponentTree] = None` - Component information for reconstruction
842
843
**Returns:** `Definitions` - Validated container for all code location definitions
844
845
## Module Loading Utilities
846
847
Dagster provides utilities to automatically load definitions from Python modules and packages.
848
849
### Asset Loading
850
851
#### `load_assets_from_modules` { .api }
852
853
**Module:** `dagster._core.definitions.module_loaders.load_assets_from_modules`
854
**Type:** Function
855
856
Load all assets from specified Python modules.
857
858
```python
859
from dagster import load_assets_from_modules
860
import my_assets_module1, my_assets_module2
861
862
# Load from multiple modules
863
all_assets = load_assets_from_modules([
864
my_assets_module1,
865
my_assets_module2
866
])
867
868
# Load with group assignment
869
grouped_assets = load_assets_from_modules(
870
[my_assets_module1],
871
group_name="core_data"
872
)
873
874
# Load with key prefix
875
prefixed_assets = load_assets_from_modules(
876
[my_assets_module1],
877
key_prefix="staging"
878
)
879
```
880
881
**Parameters:**
882
- `modules: Sequence[ModuleType]` - Modules to load assets from
883
- `group_name: Optional[str]` - Group name to assign to loaded assets
884
- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix for loaded assets
885
- `freshness_policy: Optional[FreshnessPolicy]` - Freshness policy for loaded assets
886
- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - Auto-materialization policy
887
- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy
888
889
#### `load_assets_from_package_name` { .api }
890
891
**Module:** `dagster._core.definitions.module_loaders.load_assets_from_modules`
892
**Type:** Function
893
894
Load assets from a package by name.
895
896
```python
897
# Load from package name
898
assets = load_assets_from_package_name(
899
"my_company.data_assets",
900
group_name="company_data"
901
)
902
903
# Recursively load from subpackages
904
all_package_assets = load_assets_from_package_name(
905
"my_company.assets",
906
key_prefix=["company", "data"]
907
)
908
```
909
910
This documentation provides comprehensive coverage of Dagster's core definition system. The asset system represents the declarative approach to data pipeline development, while operations and jobs provide imperative workflow definition. Graphs enable composition and reuse, and repositories organize definitions for deployment. The module loading utilities facilitate code organization and automatic discovery of definitions.
911
912
For execution and runtime behavior, see [Execution and Contexts](./execution-contexts.md). For configuration of these definitions, see [Configuration System](./configuration.md).