0
# CLI Resource and Execution
1
2
Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management. This module provides the core interface for running dbt commands within Dagster ops and assets.
3
4
## Capabilities
5
6
### CLI Resource
7
8
#### DbtCliResource
9
10
The main resource for executing dbt CLI commands within Dagster.
11
12
```python { .api }
13
class DbtCliResource(ConfigurableResource):
14
"""
15
Resource for executing dbt CLI commands.
16
17
Attributes:
18
- project_dir: Path to the dbt project directory
19
- profiles_dir: Path to the dbt profiles directory (optional)
20
- profile: Name of the dbt profile to use (optional)
21
- target: Name of the dbt target to use (optional)
22
- global_config_flags: List of global dbt flags to apply
23
"""
24
25
project_dir: str
26
profiles_dir: Optional[str] = None
27
profile: Optional[str] = None
28
target: Optional[str] = None
29
global_config_flags: List[str] = []
30
31
def cli(
32
self,
33
args: List[str],
34
context: Optional[AssetExecutionContext | OpExecutionContext] = None,
35
**kwargs
36
) -> DbtCliInvocation:
37
"""
38
Execute dbt CLI command.
39
40
Parameters:
41
- args: dbt command arguments (e.g., ["build", "--select", "my_model"])
42
- context: Dagster execution context for logging and metadata
43
- **kwargs: Additional arguments passed to subprocess
44
45
Returns:
46
DbtCliInvocation object for streaming results and accessing artifacts
47
"""
48
```
49
50
#### Usage Example
51
52
```python
53
from dagster import asset, AssetExecutionContext
54
from dagster_dbt import DbtCliResource
55
56
dbt_resource = DbtCliResource(
57
project_dir="./my_dbt_project",
58
profiles_dir="~/.dbt",
59
target="dev"
60
)
61
62
@asset
63
def run_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):
64
# Run specific models
65
dbt_run = dbt.cli([
66
"run",
67
"--select", "tag:daily",
68
"--exclude", "tag:slow"
69
], context=context)
70
71
# Stream events and get results
72
for event in dbt_run.stream():
73
context.log.info(f"dbt: {event}")
74
75
# Access artifacts
76
run_results = dbt_run.get_artifact("run_results.json")
77
return {"models_run": len(run_results.get("results", []))}
78
```
79
80
### CLI Invocation
81
82
#### DbtCliInvocation
83
84
Represents a dbt CLI command invocation with methods for streaming events and accessing artifacts.
85
86
```python { .api }
87
class DbtCliInvocation:
88
"""
89
Represents a dbt CLI command invocation.
90
91
Provides methods for streaming execution events and accessing
92
generated artifacts like run_results.json and manifest.json.
93
"""
94
95
def stream(self) -> Iterator[DbtCliEventMessage]:
96
"""
97
Stream dbt CLI events as they occur.
98
99
Yields:
100
DbtCliEventMessage objects containing event data
101
"""
102
103
def stream_raw_events(self) -> Iterator[dict]:
104
"""
105
Stream raw dbt event dictionaries.
106
107
Yields:
108
Raw dbt event dictionaries without parsing
109
"""
110
111
def get_artifact(self, artifact_name: str) -> Optional[dict]:
112
"""
113
Get a dbt artifact by name.
114
115
Parameters:
116
- artifact_name: Name of artifact (e.g., "run_results.json", "manifest.json")
117
118
Returns:
119
Parsed artifact dictionary or None if not found
120
"""
121
122
def wait(self) -> CompletedProcess:
123
"""
124
Wait for the dbt command to complete.
125
126
Returns:
127
CompletedProcess with return code and outputs
128
"""
129
130
@property
131
def is_successful(self) -> bool:
132
"""
133
Check if the dbt command completed successfully.
134
135
Returns:
136
True if command succeeded, False otherwise
137
"""
138
```
139
140
### CLI Event Messages
141
142
#### DbtCliEventMessage
143
144
Base class for dbt CLI event messages with common event handling.
145
146
```python { .api }
147
class DbtCliEventMessage:
148
"""
149
Base class for dbt CLI event messages.
150
151
Attributes:
152
- raw_event: Raw event dictionary from dbt
153
- event_type: Type of dbt event
154
- log_level: Logging level of the event
155
"""
156
157
raw_event: dict
158
159
@property
160
def event_type(self) -> str:
161
"""Get the dbt event type."""
162
163
@property
164
def log_level(self) -> str:
165
"""Get the event log level."""
166
167
def to_default_asset_events(
168
self,
169
context: AssetExecutionContext,
170
manifest: dict,
171
**kwargs
172
) -> Iterator[AssetMaterialization | AssetObservation]:
173
"""
174
Convert to Dagster asset events.
175
176
Parameters:
177
- context: Asset execution context
178
- manifest: dbt manifest dictionary
179
- **kwargs: Additional conversion parameters
180
181
Yields:
182
AssetMaterialization or AssetObservation events
183
"""
184
```
185
186
#### DbtCoreCliEventMessage
187
188
Event message implementation for dbt Core CLI commands.
189
190
```python { .api }
191
class DbtCoreCliEventMessage(DbtCliEventMessage):
192
"""
193
dbt Core CLI event message implementation.
194
195
Handles events from dbt Core CLI execution with Core-specific
196
event parsing and asset event generation.
197
"""
198
```
199
200
#### DbtFusionCliEventMessage
201
202
Event message implementation for dbt Fusion CLI commands.
203
204
```python { .api }
205
class DbtFusionCliEventMessage(DbtCliEventMessage):
206
"""
207
dbt Fusion CLI event message implementation.
208
209
Handles events from dbt Fusion CLI execution with Fusion-specific
210
event parsing and optimized performance characteristics.
211
"""
212
```
213
214
## Advanced Usage Examples
215
216
### Custom Event Handling
217
218
```python
219
from dagster import asset, AssetExecutionContext
220
from dagster_dbt import DbtCliResource
221
222
@asset
223
def process_dbt_with_custom_events(
224
context: AssetExecutionContext,
225
dbt: DbtCliResource
226
):
227
dbt_run = dbt.cli(["test"], context=context)
228
229
test_results = []
230
for event in dbt_run.stream():
231
if event.event_type == "test_result":
232
test_results.append({
233
"test_name": event.raw_event.get("data", {}).get("node_name"),
234
"status": event.raw_event.get("data", {}).get("status"),
235
"execution_time": event.raw_event.get("data", {}).get("execution_time")
236
})
237
238
# Log important events
239
if event.log_level in ["error", "warn"]:
240
context.log.warning(f"dbt {event.log_level}: {event.raw_event}")
241
242
return {"test_results": test_results}
243
```
244
245
### Artifact Access
246
247
```python
248
from dagster import asset, AssetExecutionContext
249
from dagster_dbt import DbtCliResource
250
251
@asset
252
def analyze_dbt_run_results(
253
context: AssetExecutionContext,
254
dbt: DbtCliResource
255
):
256
# Run dbt and get artifacts
257
invocation = dbt.cli(["run", "--select", "tag:important"], context=context)
258
259
# Process events
260
for event in invocation.stream():
261
pass # Process events as needed
262
263
# Access run results
264
run_results = invocation.get_artifact("run_results.json")
265
manifest = invocation.get_artifact("manifest.json")
266
267
if run_results and manifest:
268
successful_models = [
269
result["unique_id"]
270
for result in run_results.get("results", [])
271
if result["status"] == "success"
272
]
273
274
return {
275
"successful_models": successful_models,
276
"total_execution_time": run_results.get("elapsed_time", 0)
277
}
278
279
return {"error": "Could not access dbt artifacts"}
280
```
281
282
## Type Definitions
283
284
```python { .api }
285
# Import types for type hints
286
from dagster import AssetExecutionContext, OpExecutionContext
287
from subprocess import CompletedProcess
288
from typing import Iterator, List, Optional, Union, dict
289
```