or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-definitions.mderror-handling.mdevents-metadata.mdexecution-contexts.mdindex.mdpartitions.mdsensors-schedules.mdstorage-io.md

core-definitions.mddocs/

0

# Core Definitions

1

2

This document covers the fundamental building blocks of Dagster: assets, operations, jobs, graphs, and repositories. These core abstractions form the foundation of all Dagster pipelines.

3

4

## Asset System

5

6

Assets are the primary abstraction in Dagster, representing data artifacts that exist or should exist. They enable declarative data pipeline development with automatic dependency inference and rich lineage tracking.

7

8

### Asset Decorators

9

10

#### `@asset` { .api }

11

12

**Module:** `dagster._core.definitions.decorators.asset_decorator`

13

**Type:** Function decorator

14

15

Define a software-defined asset from a Python function.

16

17

```python

18

from dagster import asset, MaterializeResult

19

import pandas as pd

20

21

@asset

22

def users_data() -> pd.DataFrame:

23

"""Load users data from database."""

24

return pd.read_sql("SELECT * FROM users", connection)

25

26

@asset(

27

deps=["external_source"],

28

metadata={"owner": "data-team"},

29

group_name="analytics",

30

compute_kind="pandas"

31

)

32

def processed_users(users_data: pd.DataFrame) -> MaterializeResult:

33

"""Process users data with metadata."""

34

processed = users_data.dropna()

35

return MaterializeResult(

36

value=processed,

37

metadata={

38

"records": len(processed),

39

"null_dropped": len(users_data) - len(processed)

40

}

41

)

42

```

43

44

**Parameters:**

45

- `name: Optional[str]` - Asset name (defaults to function name)

46

- `key_prefix: Optional[CoercibleToAssetKeyPrefix]` - Asset key prefix

47

- `ins: Optional[Mapping[str, AssetIn]]` - Input specifications

48

- `deps: Optional[Iterable[CoercibleToAssetDep]]` - Asset dependencies

49

- `metadata: Optional[ArbitraryMetadataMapping]` - Asset metadata

50

- `tags: Optional[Mapping[str, str]]` - Asset tags for UI and filtering

51

- `description: Optional[str]` - Asset description

52

- `config_schema: Optional[UserConfigSchema]` - Configuration schema

53

- `required_resource_keys: Optional[AbstractSet[str]]` - Required resource keys

54

- `resource_defs: Optional[Mapping[str, object]]` - Resource definitions (beta)

55

- `hooks: Optional[AbstractSet[HookDefinition]]` - Success/failure hooks

56

- `io_manager_def: Optional[object]` - I/O manager definition (beta)

57

- `io_manager_key: Optional[str]` - I/O manager key

58

- `dagster_type: Optional[DagsterType]` - Output type

59

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

60

- `op_tags: Optional[Mapping[str, Any]]` - Operation tags

61

- `group_name: Optional[str]` - Asset group name

62

- `output_required: bool = True` - Whether output is required

63

- `automation_condition: Optional[AutomationCondition]` - Declarative automation condition

64

- `freshness_policy: Optional[InternalFreshnessPolicy]` - Freshness policy (internal)

65

- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy (beta)

66

- `retry_policy: Optional[RetryPolicy]` - Retry policy for failed materializations

67

- `code_version: Optional[str]` - Code version for change tracking

68

- `key: Optional[CoercibleToAssetKey]` - Explicit asset key (alternative to name)

69

- `check_specs: Optional[Sequence[AssetCheckSpec]]` - Asset check specifications

70

- `owners: Optional[Sequence[str]]` - Asset owners for metadata

71

- `kinds: Optional[AbstractSet[str]]` - Asset kinds for compute engine tags

72

- `pool: Optional[str]` - Execution pool for asset materialization

73

- `non_argument_deps: Optional[Set[AssetKey]]` - **DEPRECATED** - Use `deps` instead

74

- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - **DEPRECATED** - Use `automation_condition`

75

- `compute_kind: Optional[str]` - **DEPRECATED** - Use `kinds` instead

76

77

**Returns:** `Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]`

78

79

#### `@multi_asset` { .api }

80

81

**Module:** `dagster._core.definitions.decorators.asset_decorator`

82

**Type:** Function decorator

83

84

Define multiple related assets from a single function.

85

86

```python

87

from dagster import multi_asset, AssetOut, AssetSpec, MaterializeResult

88

89

@multi_asset(

90

outs={

91

"users_clean": AssetOut(key_prefix="staging"),

92

"users_enriched": AssetOut(key_prefix="marts")

93

}

94

)

95

def process_users(users_raw) -> tuple[pd.DataFrame, pd.DataFrame]:

96

"""Process users into clean and enriched datasets."""

97

clean = users_raw.dropna()

98

enriched = clean.merge(demographics_data, on="user_id")

99

return clean, enriched

100

101

# Alternative specification approach

102

@multi_asset(

103

specs=[

104

AssetSpec("customers", group_name="core"),

105

AssetSpec("customer_metrics", group_name="analytics")

106

]

107

)

108

def customer_pipeline(context) -> dict[str, MaterializeResult]:

109

"""Generate multiple customer assets."""

110

customers_df = extract_customers()

111

metrics_df = calculate_metrics(customers_df)

112

113

return {

114

"customers": MaterializeResult(value=customers_df),

115

"customer_metrics": MaterializeResult(value=metrics_df)

116

}

117

```

118

119

**Parameters:**

120

- `outs: Optional[Dict[str, AssetOut]]` - Output specifications by name

121

- `specs: Optional[Sequence[AssetSpec]]` - Asset specifications

122

- `name: Optional[str]` - Multi-asset name

123

- `ins: Optional[Dict[str, AssetIn]]` - Input specifications

124

- `deps: Optional[Sequence[Union[str, AssetKey, AssetsDefinition]]]` - Dependencies

125

- `description: Optional[str]` - Multi-asset description

126

- `config_schema: Optional[ConfigSchema]` - Configuration schema

127

- `required_resource_keys: Optional[Set[str]]` - Required resource keys

128

- `compute_kind: Optional[str]` - Compute kind for UI

129

- `internal_asset_deps: Optional[Dict[str, Set[AssetKey]]]` - Internal dependencies

130

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

131

- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy

132

- `op_tags: Optional[Dict[str, Any]]` - Operation tags

133

- `can_subset: bool = False` - Whether asset can be subset

134

- `resource_defs: Optional[Dict[str, ResourceDefinition]]` - Resource definitions

135

- `group_name: Optional[str]` - Asset group name

136

137

#### `@graph_asset` { .api }

138

139

**Module:** `dagster._core.definitions.decorators.asset_decorator`

140

**Type:** Function decorator

141

142

Define an asset composed of a graph of operations.

143

144

```python

145

from dagster import graph_asset, op

146

147

@op

148

def extract_data() -> pd.DataFrame:

149

return pd.read_csv("data.csv")

150

151

@op

152

def transform_data(df: pd.DataFrame) -> pd.DataFrame:

153

return df.dropna()

154

155

@op

156

def load_data(df: pd.DataFrame) -> None:

157

df.to_parquet("output.parquet")

158

159

@graph_asset

160

def etl_pipeline():

161

"""ETL pipeline as a graph asset."""

162

load_data(transform_data(extract_data()))

163

```

164

165

### Asset Definition Classes

166

167

#### `AssetsDefinition` { .api }

168

169

**Module:** `dagster._core.definitions.assets.definition.assets_definition`

170

**Type:** Class

171

172

Represents a set of software-defined assets.

173

174

```python

175

from dagster import AssetsDefinition, AssetSpec

176

177

# Created from decorator

178

assets_def = users_data

179

180

# Access asset information

181

asset_keys = assets_def.keys # Set of AssetKey objects

182

asset_specs = assets_def.specs_by_key # Dict[AssetKey, AssetSpec]

183

dependencies = assets_def.dependency_keys # Set of dependency keys

184

185

# Check if asset can be materialized

186

can_materialize = assets_def.can_subset

187

188

# Get partitions definition

189

partitions = assets_def.partitions_def

190

```

191

192

**Key Properties:**

193

- `keys: AbstractSet[AssetKey]` - Set of asset keys

194

- `specs_by_key: Dict[AssetKey, AssetSpec]` - Asset specifications by key

195

- `keys_by_input_name: Dict[str, AssetKey]` - Input name to key mapping

196

- `keys_by_output_name: Dict[str, AssetKey]` - Output name to key mapping

197

- `dependency_keys: AbstractSet[AssetKey]` - Dependency asset keys

198

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

199

- `can_subset: bool` - Whether assets can be subset

200

- `execution_type: AssetExecutionType` - Execution type

201

- `op: OpDefinition` - Underlying operation definition

202

203

#### `AssetSpec` { .api }

204

205

**Module:** `dagster._core.definitions.assets.definition.asset_spec`

206

**Type:** Class

207

208

Specification for a single asset, containing metadata and configuration.

209

210

```python

211

from dagster import AssetSpec, AssetKey

212

213

# Basic asset specification

214

spec = AssetSpec(

215

key="my_asset",

216

description="Important business data",

217

metadata={"owner": "data-team"},

218

group_name="core"

219

)

220

221

# Complex asset specification

222

complex_spec = AssetSpec(

223

key=AssetKey(["warehouse", "dim", "customers"]),

224

description="Customer dimension table",

225

metadata={

226

"table_name": "dim_customers",

227

"schema": "analytics",

228

"owner": "customer-team"

229

},

230

group_name="dimensions",

231

freshness_policy=freshness_policy,

232

auto_materialize_policy=AutoMaterializePolicy.eager()

233

)

234

```

235

236

**Parameters:**

237

- `key: Union[AssetKey, str, Sequence[str]]` - Asset key

238

- `deps: Optional[Sequence[Union[str, AssetKey, AssetDep]]]` - Dependencies

239

- `description: Optional[str]` - Asset description

240

- `metadata: Optional[Dict[str, Any]]` - Asset metadata

241

- `group_name: Optional[str]` - Asset group

242

- `freshness_policy: Optional[FreshnessPolicy]` - Freshness policy

243

- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - Auto-materialization policy

244

- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy

245

- `code_version: Optional[str]` - Code version

246

- `owners: Optional[Sequence[str]]` - Asset owners

247

- `tags: Optional[Dict[str, str]]` - Asset tags

248

249

### Asset I/O Specifications

250

251

#### `AssetIn` { .api }

252

253

**Module:** `dagster._core.definitions.assets.job.asset_in`

254

**Type:** Class

255

256

Input specification for assets with metadata and configuration.

257

258

```python

259

from dagster import asset, AssetIn

260

261

@asset(

262

ins={

263

"upstream_data": AssetIn(

264

key_prefix="staging",

265

metadata={"format": "parquet"},

266

input_manager_key="warehouse_loader"

267

)

268

}

269

)

270

def processed_asset(upstream_data: pd.DataFrame) -> pd.DataFrame:

271

"""Process upstream data with custom input config."""

272

return upstream_data.transform()

273

```

274

275

**Parameters:**

276

- `key: Optional[Union[str, AssetKey]]` - Asset key to depend on

277

- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix

278

- `metadata: Optional[Dict[str, Any]]` - Input metadata

279

- `input_manager_key: Optional[str]` - Input manager key

280

- `dagster_type: Optional[DagsterType]` - Input type

281

- `partition_mapping: Optional[PartitionMapping]` - Partition mapping

282

283

#### `AssetOut` { .api }

284

285

**Module:** `dagster._core.definitions.assets.job.asset_out`

286

**Type:** Class

287

288

Output specification for assets in multi-asset definitions.

289

290

```python

291

from dagster import multi_asset, AssetOut

292

293

@multi_asset(

294

outs={

295

"clean_data": AssetOut(

296

key_prefix="staging",

297

description="Cleaned version of raw data",

298

metadata={"quality_score": 0.95},

299

io_manager_key="parquet_io_manager"

300

),

301

"summary_stats": AssetOut(

302

key_prefix="analytics",

303

description="Summary statistics",

304

dagster_type=dict

305

)

306

}

307

)

308

def data_processing():

309

"""Process data into clean and summary outputs."""

310

# Processing logic

311

return clean_df, summary_dict

312

```

313

314

**Parameters:**

315

- `key: Optional[Union[str, AssetKey]]` - Output asset key

316

- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix

317

- `dagster_type: Optional[DagsterType]` - Output type

318

- `description: Optional[str]` - Output description

319

- `is_required: bool = True` - Whether output is required

320

- `io_manager_key: Optional[str]` - I/O manager key

321

- `metadata: Optional[Dict[str, Any]]` - Output metadata

322

- `group_name: Optional[str]` - Asset group name

323

- `code_version: Optional[str]` - Code version

324

325

### Asset Dependencies

326

327

#### `AssetDep` { .api }

328

329

**Module:** `dagster._core.definitions.assets.definition.asset_dep`

330

**Type:** Class

331

332

Explicit asset dependency specification with advanced configuration.

333

334

```python

335

from dagster import asset, AssetDep, AssetKey

336

337

@asset(

338

deps=[

339

AssetDep(

340

asset=AssetKey("upstream_asset"),

341

partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1)

342

),

343

AssetDep(

344

asset="other_dependency",

345

metadata={"relationship": "foreign_key"}

346

)

347

]

348

)

349

def downstream_asset() -> pd.DataFrame:

350

"""Asset with explicit dependencies."""

351

return compute_result()

352

```

353

354

**Parameters:**

355

- `asset: Union[str, AssetKey, AssetsDefinition, SourceAsset]` - Dependency asset

356

- `partition_mapping: Optional[PartitionMapping]` - Partition mapping

357

- `metadata: Optional[Dict[str, Any]]` - Dependency metadata

358

359

### Source Assets

360

361

#### `SourceAsset` { .api }

362

363

**Module:** `dagster._core.definitions.source_asset`

364

**Type:** Class

365

366

External asset definition for assets not materialized by Dagster.

367

368

```python

369

from dagster import SourceAsset, AssetKey

370

371

# Simple source asset

372

raw_data_source = SourceAsset(

373

key=AssetKey("raw_data"),

374

description="Raw data from external system",

375

metadata={"system": "legacy_db", "table": "raw_events"}

376

)

377

378

# Source asset with observations

379

user_events = SourceAsset(

380

key=AssetKey(["events", "user_actions"]),

381

description="User action events from Kafka",

382

metadata={"topic": "user-actions", "retention": "7d"},

383

io_manager_key="kafka_manager"

384

)

385

386

@asset(deps=[user_events])

387

def processed_events():

388

"""Process events from source asset."""

389

return transform_events()

390

```

391

392

**Parameters:**

393

- `key: AssetKey` - Source asset key

394

- `metadata: Optional[Dict[str, Any]]` - Source metadata

395

- `io_manager_key: Optional[str]` - I/O manager key

396

- `description: Optional[str]` - Source description

397

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

398

- `observe_fn: Optional[Callable]` - Observation function

399

- `auto_observe_interval_minutes: Optional[float]` - Auto-observe interval

400

- `group_name: Optional[str]` - Asset group name

401

402

## Operations System

403

404

Operations (ops) are the fundamental computational units in Dagster, representing discrete pieces of work that consume inputs and produce outputs.

405

406

### Op Decorator

407

408

#### `@op` { .api }

409

410

**Module:** `dagster._core.definitions.decorators.op_decorator`

411

**Type:** Function decorator

412

413

Define an operation (transformation function).

414

415

```python

416

from dagster import op, In, Out, OpExecutionContext

417

import pandas as pd

418

419

@op

420

def simple_op() -> str:

421

"""Simple operation with no inputs."""

422

return "hello world"

423

424

@op(

425

ins={"data": In(dagster_type=pd.DataFrame)},

426

out=Out(dagster_type=pd.DataFrame, description="Cleaned data"),

427

config_schema={"threshold": float},

428

required_resource_keys={"database"},

429

tags={"team": "data", "env": "prod"}

430

)

431

def clean_data(context: OpExecutionContext, data: pd.DataFrame) -> pd.DataFrame:

432

"""Clean data based on threshold configuration."""

433

threshold = context.op_config["threshold"]

434

cleaned = data[data.quality_score > threshold]

435

436

context.log.info(f"Removed {len(data) - len(cleaned)} rows below threshold {threshold}")

437

438

return cleaned

439

440

@op(out={"processed": Out(), "summary": Out()})

441

def multi_output_op() -> tuple[pd.DataFrame, dict]:

442

"""Operation with multiple outputs."""

443

df = process_data()

444

summary = {"count": len(df), "columns": df.columns.tolist()}

445

return df, summary

446

```

447

448

**Parameters:**

449

- `name: Optional[str]` - Op name (defaults to function name)

450

- `description: Optional[str]` - Op description

451

- `ins: Optional[Mapping[str, In]]` - Input specifications

452

- `out: Optional[Union[Out, Mapping[str, Out]]]` - Output specification(s)

453

- `config_schema: Optional[UserConfigSchema]` - Configuration schema

454

- `required_resource_keys: Optional[AbstractSet[str]]` - Required resource keys

455

- `tags: Optional[Mapping[str, Any]]` - Op tags

456

- `retry_policy: Optional[RetryPolicy]` - Retry policy for failed executions

457

- `code_version: Optional[str]` - Code version for change tracking

458

- `pool: Optional[str]` - Execution pool for op execution

459

- `version: Optional[str]` - **DEPRECATED** - Use `code_version` instead

460

461

**Returns:** `Union[OpDefinition, _Op]`

462

463

#### `OpDefinition` { .api }

464

465

**Module:** `dagster._core.definitions.op_definition`

466

**Type:** Class

467

468

Definition of an operation containing its compute function, configuration, and metadata.

469

470

```python

471

# Access op definition properties

472

op_def = clean_data

473

name = op_def.name # Operation name

474

description = op_def.description # Operation description

475

input_defs = op_def.input_defs # List of input definitions

476

output_defs = op_def.output_defs # List of output definitions

477

config_schema = op_def.config_schema # Configuration schema

478

required_resource_keys = op_def.required_resource_keys # Required resources

479

tags = op_def.tags # Operation tags

480

481

# Check if op is a generator (for dynamic outputs)

482

is_generator = op_def.is_generator_op

483

```

484

485

**Key Properties:**

486

- `name: str` - Operation name

487

- `description: Optional[str]` - Operation description

488

- `input_defs: List[InputDefinition]` - Input definitions

489

- `output_defs: List[OutputDefinition]` - Output definitions

490

- `config_schema: Optional[ConfigSchema]` - Configuration schema

491

- `required_resource_keys: Set[str]` - Required resource keys

492

- `tags: Dict[str, Any]` - Operation tags

493

- `code_version: Optional[str]` - Code version

494

- `retry_policy: Optional[RetryPolicy]` - Retry policy

495

496

## Job System

497

498

Jobs orchestrate the execution of operations or assets, defining the computational graph and execution parameters.

499

500

### Job Decorator

501

502

#### `@job` { .api }

503

504

**Module:** `dagster._core.definitions.decorators.job_decorator`

505

**Type:** Function decorator

506

507

Define a job (collection of ops).

508

509

```python

510

from dagster import job, op, Config

511

512

@op

513

def load_data() -> pd.DataFrame:

514

return pd.read_csv("input.csv")

515

516

@op

517

def process_data(df: pd.DataFrame) -> pd.DataFrame:

518

return df.dropna()

519

520

@op

521

def save_data(df: pd.DataFrame) -> None:

522

df.to_csv("output.csv")

523

524

@job(

525

description="ETL job for processing data",

526

config={"ops": {"load_data": {"config": {"file_path": "data.csv"}}}},

527

tags={"team": "data-eng", "env": "production"}

528

)

529

def etl_job():

530

"""ETL job connecting ops."""

531

processed = process_data(load_data())

532

save_data(processed)

533

534

# Job with configuration class

535

class ETLJobConfig(Config):

536

input_path: str = "default.csv"

537

output_path: str = "output.csv"

538

539

@job(config=ETLJobConfig)

540

def configurable_etl_job(config: ETLJobConfig):

541

"""ETL job with typed configuration."""

542

# Job logic using config.input_path, config.output_path

543

pass

544

```

545

546

**Parameters:**

547

- `name: Optional[str]` - Job name (defaults to function name)

548

- `description: Optional[str]` - Job description

549

- `resource_defs: Optional[Mapping[str, object]]` - Resource definitions

550

- `config: Optional[Union[ConfigMapping, Mapping[str, Any], RunConfig, PartitionedConfig]]` - Job configuration

551

- `tags: Optional[Mapping[str, str]]` - Job tags for metadata

552

- `run_tags: Optional[Mapping[str, str]]` - Tags applied to each run

553

- `metadata: Optional[Mapping[str, RawMetadataValue]]` - Job metadata for UI display

554

- `logger_defs: Optional[Mapping[str, LoggerDefinition]]` - Logger definitions

555

- `executor_def: Optional[ExecutorDefinition]` - Executor definition

556

- `hooks: Optional[AbstractSet[HookDefinition]]` - Hook definitions

557

- `op_retry_policy: Optional[RetryPolicy]` - Default retry policy for all ops

558

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

559

- `input_values: Optional[Mapping[str, object]]` - Direct Python object inputs to the job

560

561

**Returns:** `Union[JobDefinition, _Job]`

562

563

#### `JobDefinition` { .api }

564

565

**Module:** `dagster._core.definitions.job_definition`

566

**Type:** Class

567

568

Definition of a job containing the computational graph and execution configuration.

569

570

```python

571

# Access job definition properties

572

job_def = etl_job

573

name = job_def.name # Job name

574

description = job_def.description # Job description

575

graph_def = job_def.graph # Underlying graph definition

576

resource_defs = job_def.resource_defs # Resource definitions

577

executor_def = job_def.executor_def # Executor definition

578

579

# Execute the job

580

from dagster import execute_job

581

result = execute_job(job_def, run_config={})

582

```

583

584

**Key Properties:**

585

- `name: str` - Job name

586

- `description: Optional[str]` - Job description

587

- `graph: GraphDefinition` - Underlying graph definition

588

- `resource_defs: Dict[str, ResourceDefinition]` - Resource definitions

589

- `executor_def: ExecutorDefinition` - Executor definition

590

- `logger_defs: Dict[str, LoggerDefinition]` - Logger definitions

591

- `hooks: Set[HookDefinition]` - Hook definitions

592

- `tags: Dict[str, Any]` - Job tags

593

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

594

- `asset_layer: Optional[AssetLayer]` - Asset layer for asset jobs

595

596

### Asset Jobs

597

598

#### `define_asset_job` { .api }

599

600

**Module:** `dagster._core.definitions.unresolved_asset_job_definition`

601

**Type:** Function

602

603

Define a job that materializes a selection of assets.

604

605

```python

606

from dagster import define_asset_job, AssetSelection

607

608

# Job for specific assets

609

analytics_job = define_asset_job(

610

name="analytics_job",

611

selection=AssetSelection.groups("analytics"),

612

description="Materialize analytics assets",

613

tags={"team": "analytics"}

614

)

615

616

# Job with partitions

617

daily_job = define_asset_job(

618

name="daily_etl",

619

selection=AssetSelection.all(),

620

partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),

621

config={

622

"execution": {

623

"config": {

624

"multiprocess": {

625

"max_concurrent": 4

626

}

627

}

628

}

629

}

630

)

631

```

632

633

**Parameters:**

634

- `name: str` - Job name

635

- `selection: Optional[Union[str, AssetSelection]]` - Asset selection

636

- `config: Optional[Union[ConfigSchema, Dict[str, Any]]]` - Job configuration

637

- `description: Optional[str]` - Job description

638

- `tags: Optional[Dict[str, Any]]` - Job tags

639

- `executor_def: Optional[ExecutorDefinition]` - Executor definition

640

- `hooks: Optional[Set[HookDefinition]]` - Hook definitions

641

- `partitions_def: Optional[PartitionsDefinition]` - Partitions definition

642

643

## Graph System

644

645

Graphs enable composition and reuse of computational logic, allowing complex operations to be built from simpler components.

646

647

### Graph Decorator

648

649

#### `@graph` { .api }

650

651

**Module:** `dagster._core.definitions.decorators.graph_decorator`

652

**Type:** Function decorator

653

654

Define a reusable graph of operations.

655

656

```python

657

from dagster import graph, op, In, Out

658

659

@op

660

def fetch_data(url: str) -> dict:

661

return {"data": f"fetched from {url}"}

662

663

@op

664

def validate_data(data: dict) -> dict:

665

return {**data, "validated": True}

666

667

@op

668

def transform_data(data: dict) -> dict:

669

return {**data, "transformed": True}

670

671

@graph(

672

description="Reusable data processing graph",

673

ins={"source_url": GraphIn(str)},

674

out={"result": GraphOut()}

675

)

676

def process_data_graph(source_url: str):

677

"""Reusable graph for data processing."""

678

raw = fetch_data(source_url)

679

validated = validate_data(raw)

680

return transform_data(validated)

681

682

# Use graph in jobs

683

@job

684

def etl_job():

685

process_data_graph("https://api.example.com/data")

686

687

# Convert graph to op for reuse

688

process_data_op = process_data_graph.to_op()

689

690

@job

691

def complex_job():

692

result1 = process_data_op("source1")

693

result2 = process_data_op("source2")

694

combine_results(result1, result2)

695

```

696

697

**Parameters:**

698

- `name: Optional[str]` - Graph name (defaults to function name)

699

- `description: Optional[str]` - Graph description

700

- `ins: Optional[Dict[str, GraphIn]]` - Input specifications

701

- `out: Optional[Union[GraphOut, Dict[str, GraphOut]]]` - Output specification(s)

702

- `config: Optional[ConfigSchema]` - Configuration schema

703

- `tags: Optional[Dict[str, Any]]` - Graph tags

704

705

#### `GraphDefinition` { .api }

706

707

**Module:** `dagster._core.definitions.graph_definition`

708

**Type:** Class

709

710

Definition of a graph containing its computational structure and metadata.

711

712

```python

713

# Access graph definition properties

714

graph_def = process_data_graph

715

name = graph_def.name # Graph name

716

description = graph_def.description # Graph description

717

input_mappings = graph_def.input_mappings # Input mappings

718

output_mappings = graph_def.output_mappings # Output mappings

719

dependencies = graph_def.dependencies # Op dependencies

720

721

# Convert to job or op

722

job_def = graph_def.to_job()

723

op_def = graph_def.to_op()

724

```

725

726

**Key Properties:**

727

- `name: str` - Graph name

728

- `description: Optional[str]` - Graph description

729

- `node_defs: List[NodeDefinition]` - Node definitions

730

- `dependencies: Dict[Union[str, NodeInvocation], Dict[str, IDependencyDefinition]]` - Dependencies

731

- `input_mappings: List[InputMapping]` - Input mappings

732

- `output_mappings: List[OutputMapping]` - Output mappings

733

- `config: Optional[ConfigSchema]` - Configuration schema

734

- `tags: Dict[str, Any]` - Graph tags

735

736

## Repository System

737

738

Repositories are collections of definitions that can be loaded together, providing a way to organize and deploy Dagster code.

739

740

### Repository Decorator

741

742

#### `@repository` { .api }

743

744

**Module:** `dagster._core.definitions.decorators.repository_decorator`

745

**Type:** Function decorator

746

747

Define a repository containing jobs, assets, schedules, and sensors.

748

749

```python

750

from dagster import repository, job, asset, schedule

751

752

@asset

753

def users_asset():

754

return load_users()

755

756

@job

757

def analytics_job():

758

process_analytics_data()

759

760

@schedule(job=analytics_job, cron_schedule="0 9 * * *")

761

def daily_analytics():

762

return {}

763

764

@repository

765

def analytics_repository():

766

"""Repository containing analytics definitions."""

767

return [

768

users_asset,

769

analytics_job,

770

daily_analytics

771

]

772

773

# Alternative using Definitions class (recommended)

774

from dagster import Definitions

775

776

defs = Definitions(

777

assets=[users_asset],

778

jobs=[analytics_job],

779

schedules=[daily_analytics],

780

resources={"database": database_resource}

781

)

782

```

783

784

#### `Definitions` { .api }

785

786

**Module:** `dagster._core.definitions.definitions_class`

787

**Type:** Class

788

789

Container for all definitions in a Dagster code location. This is the recommended way to organize Dagster definitions.

790

791

```python

792

from dagster import Definitions, load_assets_from_modules

793

import my_assets, my_jobs, my_resources

794

795

# Complete definitions container

796

defs = Definitions(

797

assets=load_assets_from_modules([my_assets]),

798

jobs=[my_jobs.etl_job, my_jobs.ml_job],

799

schedules=[my_jobs.daily_schedule],

800

sensors=[my_jobs.failure_sensor],

801

resources=my_resources.get_resources(),

802

asset_checks=my_assets.get_asset_checks(),

803

loggers={"custom": custom_logger}

804

)

805

806

# Conditional definitions based on environment

807

import os

808

809

def get_definitions():

810

base_resources = {"io_manager": fs_io_manager}

811

812

if os.getenv("ENV") == "prod":

813

base_resources.update({

814

"database": prod_database_resource,

815

"data_warehouse": snowflake_resource

816

})

817

else:

818

base_resources.update({

819

"database": dev_database_resource,

820

"data_warehouse": duckdb_resource

821

})

822

823

return Definitions(

824

assets=load_assets_from_modules([my_assets]),

825

resources=base_resources

826

)

827

828

defs = get_definitions()

829

```

830

831

**Parameters:**

832

- `assets: Optional[Iterable[Union[AssetsDefinition, AssetSpec, SourceAsset, CacheableAssetsDefinition]]] = None` - Asset definitions

833

- `schedules: Optional[Iterable[Union[ScheduleDefinition, UnresolvedPartitionedAssetScheduleDefinition]]] = None` - Schedule definitions

834

- `sensors: Optional[Iterable[SensorDefinition]] = None` - Sensor definitions

835

- `jobs: Optional[Iterable[Union[JobDefinition, UnresolvedAssetJobDefinition]]] = None` - Job definitions

836

- `resources: Optional[Mapping[str, Any]] = None` - Resource definitions (accepts ResourceDefinition or any object)

837

- `executor: Optional[Union[ExecutorDefinition, Executor]] = None` - Default executor

838

- `loggers: Optional[Mapping[str, LoggerDefinition]] = None` - Logger definitions

839

- `asset_checks: Optional[Iterable[AssetsDefinition]] = None` - Asset check definitions

840

- `metadata: Optional[RawMetadataMapping] = None` - Definitions-level metadata

841

- `component_tree: Optional[ComponentTree] = None` - Component information for reconstruction

842

843

**Returns:** `Definitions` - Validated container for all code location definitions

844

845

## Module Loading Utilities

846

847

Dagster provides utilities to automatically load definitions from Python modules and packages.

848

849

### Asset Loading

850

851

#### `load_assets_from_modules` { .api }

852

853

**Module:** `dagster._core.definitions.module_loaders.load_assets_from_modules`

854

**Type:** Function

855

856

Load all assets from specified Python modules.

857

858

```python

859

from dagster import load_assets_from_modules

860

import my_assets_module1, my_assets_module2

861

862

# Load from multiple modules

863

all_assets = load_assets_from_modules([

864

my_assets_module1,

865

my_assets_module2

866

])

867

868

# Load with group assignment

869

grouped_assets = load_assets_from_modules(

870

[my_assets_module1],

871

group_name="core_data"

872

)

873

874

# Load with key prefix

875

prefixed_assets = load_assets_from_modules(

876

[my_assets_module1],

877

key_prefix="staging"

878

)

879

```

880

881

**Parameters:**

882

- `modules: Sequence[ModuleType]` - Modules to load assets from

883

- `group_name: Optional[str]` - Group name to assign to loaded assets

884

- `key_prefix: Optional[Union[str, Sequence[str]]]` - Key prefix for loaded assets

885

- `freshness_policy: Optional[FreshnessPolicy]` - Freshness policy for loaded assets

886

- `auto_materialize_policy: Optional[AutoMaterializePolicy]` - Auto-materialization policy

887

- `backfill_policy: Optional[BackfillPolicy]` - Backfill policy

888

889

#### `load_assets_from_package_name` { .api }

890

891

**Module:** `dagster._core.definitions.module_loaders.load_assets_from_modules`

892

**Type:** Function

893

894

Load assets from a package by name.

895

896

```python

897

# Load from package name

898

assets = load_assets_from_package_name(

899

"my_company.data_assets",

900

group_name="company_data"

901

)

902

903

# Recursively load from subpackages

904

all_package_assets = load_assets_from_package_name(

905

"my_company.assets",

906

key_prefix=["company", "data"]

907

)

908

```

909

910

This documentation provides comprehensive coverage of Dagster's core definition system. The asset system represents the declarative approach to data pipeline development, while operations and jobs provide imperative workflow definition. Graphs enable composition and reuse, and repositories organize definitions for deployment. The module loading utilities facilitate code organization and automatic discovery of definitions.

911

912

For execution and runtime behavior, see [Execution and Contexts](./execution-contexts.md). For configuration of these definitions, see [Configuration System](./configuration.md).