0
# Programmatic API
1
2
The dbt programmatic API allows Python applications to invoke dbt commands and access results without using the command-line interface. This is useful for integrating dbt into workflows, orchestration tools, and custom applications.
3
4
## Core Classes
5
6
### dbtRunner
7
8
The main class for programmatic dbt execution.
9
10
```python { .api }
11
class dbtRunner:
12
"""Programmatic interface for invoking dbt commands from Python."""
13
14
def __init__(
15
self,
16
manifest: Optional[Manifest] = None,
17
callbacks: Optional[List[Callable[[EventMsg], None]]] = None,
18
) -> None:
19
"""
20
Initialize the dbt runner.
21
22
Args:
23
manifest: Pre-loaded manifest to use (optional)
24
callbacks: List of event callback functions (optional)
25
"""
26
27
def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult:
28
"""
29
Execute a dbt command programmatically.
30
31
Args:
32
args: Command arguments as list (e.g., ['run', '--select', 'my_model'])
33
**kwargs: Additional parameters to override command options
34
35
Returns:
36
dbtRunnerResult: Result container with success status and data
37
"""
38
```
39
40
### dbtRunnerResult
41
42
Container for results from dbtRunner invocations.
43
44
```python { .api }
45
@dataclass
46
class dbtRunnerResult:
47
"""Contains the result of an invocation of the dbtRunner."""
48
49
success: bool
50
"""Whether the dbt operation succeeded."""
51
52
exception: Optional[BaseException] = None
53
"""Exception if the operation failed."""
54
55
result: Union[
56
bool, # debug command
57
CatalogArtifact, # docs generate
58
List[str], # list/ls commands
59
Manifest, # parse command
60
None, # clean, deps, init, source commands
61
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
62
] = None
63
"""Result data, type varies by command executed."""
64
```
65
66
## Usage Patterns
67
68
### Basic Execution
69
70
Execute dbt commands and handle results:
71
72
```python
73
from dbt.cli.main import dbtRunner, dbtRunnerResult
74
75
# Initialize runner
76
runner = dbtRunner()
77
78
# Execute dbt run
79
result: dbtRunnerResult = runner.invoke(['run'])
80
81
if result.success:
82
print("dbt run succeeded")
83
# Access run results
84
run_results = result.result
85
print(f"Executed {len(run_results.results)} nodes")
86
else:
87
print(f"dbt run failed: {result.exception}")
88
```
89
90
### Model Selection
91
92
Use dbt selection syntax to target specific models:
93
94
```python
95
runner = dbtRunner()
96
97
# Run specific models
98
result = runner.invoke(['run', '--select', 'my_model'])
99
100
# Run changed models (requires state)
101
result = runner.invoke(['run', '--select', 'state:modified'])
102
103
# Run model and downstream dependencies
104
result = runner.invoke(['run', '--select', 'my_model+'])
105
106
# Run with exclusions
107
result = runner.invoke(['run', '--exclude', 'tag:deprecated'])
108
```
109
110
### Configuration Override
111
112
Override command options via kwargs:
113
114
```python
115
runner = dbtRunner()
116
117
# Override threading and target
118
result = runner.invoke(
119
['run'],
120
threads=8,
121
target='production',
122
full_refresh=True
123
)
124
125
# Supply variables
126
result = runner.invoke(
127
['run'],
128
vars='{"my_var": "my_value"}'
129
)
130
```
131
132
### Event Callbacks
133
134
Register callbacks to handle dbt events during execution:
135
136
```python
137
from dbt_common.events.base_types import EventMsg
138
139
def event_callback(event: EventMsg) -> None:
140
"""Handle dbt events during execution."""
141
print(f"Event: {event.info.name} - {event.info.msg}")
142
143
# Initialize runner with callbacks
144
runner = dbtRunner(callbacks=[event_callback])
145
146
# Events will be sent to callback during execution
147
result = runner.invoke(['run'])
148
```
149
150
### Pre-loaded Manifest
151
152
For better performance when running multiple commands, pre-load the manifest:
153
154
```python
155
# First, parse the manifest
156
parse_result = runner.invoke(['parse'])
157
if parse_result.success:
158
manifest = parse_result.result
159
160
# Create new runner with pre-loaded manifest
161
fast_runner = dbtRunner(manifest=manifest)
162
163
# Subsequent runs will be faster
164
run_result = fast_runner.invoke(['run'])
165
test_result = fast_runner.invoke(['test'])
166
```
167
168
## Command-Specific Results
169
170
### Build, Run, Test Commands
171
172
These commands return `RunExecutionResult` with detailed execution information:
173
174
```python
175
result = runner.invoke(['run'])
176
if result.success:
177
run_results = result.result
178
179
# Access individual node results
180
for node_result in run_results.results:
181
print(f"Node: {node_result.unique_id}")
182
print(f"Status: {node_result.status}")
183
print(f"Execution time: {node_result.execution_time}")
184
185
if hasattr(node_result, 'adapter_response'):
186
print(f"Rows affected: {node_result.adapter_response.rows_affected}")
187
```
188
189
### Parse Command
190
191
Returns the parsed `Manifest` object:
192
193
```python
194
result = runner.invoke(['parse'])
195
if result.success:
196
manifest = result.result
197
198
# Access manifest contents
199
print(f"Found {len(manifest.nodes)} nodes")
200
print(f"Found {len(manifest.sources)} sources")
201
202
# Access specific nodes
203
for node_id, node in manifest.nodes.items():
204
if node.resource_type == 'model':
205
print(f"Model: {node.name} in {node.original_file_path}")
206
```
207
208
### List Commands
209
210
Return list of resource names:
211
212
```python
213
result = runner.invoke(['list', '--resource-type', 'model'])
214
if result.success:
215
model_names = result.result
216
print(f"Found models: {', '.join(model_names)}")
217
```
218
219
### Docs Generate
220
221
Returns `CatalogArtifact` with generated documentation:
222
223
```python
224
result = runner.invoke(['docs', 'generate'])
225
if result.success:
226
catalog = result.result
227
print(f"Generated docs for {len(catalog.nodes)} nodes")
228
```
229
230
### Debug Command
231
232
Returns boolean indicating if debug checks passed:
233
234
```python
235
result = runner.invoke(['debug'])
236
if result.success and result.result:
237
print("All debug checks passed")
238
else:
239
print("Debug checks failed")
240
```
241
242
## Error Handling
243
244
Handle different types of failures:
245
246
```python
247
from dbt.cli.exceptions import DbtUsageException, DbtInternalException
248
249
result = runner.invoke(['run', '--invalid-option'])
250
251
if not result.success:
252
if isinstance(result.exception, DbtUsageException):
253
print(f"Usage error: {result.exception}")
254
elif isinstance(result.exception, DbtInternalException):
255
print(f"Internal error: {result.exception}")
256
else:
257
print(f"Unexpected error: {result.exception}")
258
```
259
260
## Integration Examples
261
262
### Airflow Integration
263
264
```python
265
from airflow import DAG
266
from airflow.operators.python import PythonOperator
267
from dbt.cli.main import dbtRunner
268
269
def run_dbt_models(**context):
270
runner = dbtRunner()
271
result = runner.invoke(['run'])
272
273
if not result.success:
274
raise Exception(f"dbt run failed: {result.exception}")
275
276
return result.result
277
278
dag = DAG('dbt_pipeline', ...)
279
dbt_task = PythonOperator(
280
task_id='run_dbt',
281
python_callable=run_dbt_models,
282
dag=dag
283
)
284
```
285
286
### CI/CD Pipeline
287
288
```python
289
import sys
290
from dbt.cli.main import dbtRunner
291
292
def ci_pipeline():
293
runner = dbtRunner()
294
295
# Parse project
296
parse_result = runner.invoke(['parse'])
297
if not parse_result.success:
298
print("Parse failed")
299
return False
300
301
# Run models
302
run_result = runner.invoke(['run'])
303
if not run_result.success:
304
print("Run failed")
305
return False
306
307
# Run tests
308
test_result = runner.invoke(['test'])
309
if not test_result.success:
310
print("Tests failed")
311
return False
312
313
print("All checks passed")
314
return True
315
316
if __name__ == "__main__":
317
success = ci_pipeline()
318
sys.exit(0 if success else 1)
319
```