A command-line tool for data transformation and analytics engineering workflows.
—
The dbt programmatic API allows Python applications to invoke dbt commands and access results without using the command-line interface. This is useful for integrating dbt into workflows, orchestration tools, and custom applications.
The main class for programmatic dbt execution.
class dbtRunner:
"""Programmatic interface for invoking dbt commands from Python."""
def __init__(
self,
manifest: Optional[Manifest] = None,
callbacks: Optional[List[Callable[[EventMsg], None]]] = None,
) -> None:
"""
Initialize the dbt runner.
Args:
manifest: Pre-loaded manifest to use (optional)
callbacks: List of event callback functions (optional)
"""
def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult:
"""
Execute a dbt command programmatically.
Args:
args: Command arguments as list (e.g., ['run', '--select', 'my_model'])
**kwargs: Additional parameters to override command options
Returns:
dbtRunnerResult: Result container with success status and data
"""Container for results from dbtRunner invocations.
@dataclass
class dbtRunnerResult:
"""Contains the result of an invocation of the dbtRunner."""
success: bool
"""Whether the dbt operation succeeded."""
exception: Optional[BaseException] = None
"""Exception if the operation failed."""
result: Union[
bool, # debug command
CatalogArtifact, # docs generate
List[str], # list/ls commands
Manifest, # parse command
None, # clean, deps, init, source commands
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None
"""Result data, type varies by command executed."""Execute dbt commands and handle results:
from dbt.cli.main import dbtRunner, dbtRunnerResult
# Initialize runner
runner = dbtRunner()
# Execute dbt run
result: dbtRunnerResult = runner.invoke(['run'])
if result.success:
print("dbt run succeeded")
# Access run results
run_results = result.result
print(f"Executed {len(run_results.results)} nodes")
else:
print(f"dbt run failed: {result.exception}")Use dbt selection syntax to target specific models:
runner = dbtRunner()
# Run specific models
result = runner.invoke(['run', '--select', 'my_model'])
# Run changed models (requires state)
result = runner.invoke(['run', '--select', 'state:modified'])
# Run model and downstream dependencies
result = runner.invoke(['run', '--select', 'my_model+'])
# Run with exclusions
result = runner.invoke(['run', '--exclude', 'tag:deprecated'])Override command options via kwargs:
runner = dbtRunner()
# Override threading and target
result = runner.invoke(
['run'],
threads=8,
target='production',
full_refresh=True
)
# Supply variables
result = runner.invoke(
['run'],
vars='{"my_var": "my_value"}'
)Register callbacks to handle dbt events during execution:
from dbt_common.events.base_types import EventMsg
def event_callback(event: EventMsg) -> None:
"""Handle dbt events during execution."""
print(f"Event: {event.info.name} - {event.info.msg}")
# Initialize runner with callbacks
runner = dbtRunner(callbacks=[event_callback])
# Events will be sent to callback during execution
result = runner.invoke(['run'])For better performance when running multiple commands, pre-load the manifest:
# First, parse the manifest
parse_result = runner.invoke(['parse'])
if parse_result.success:
manifest = parse_result.result
# Create new runner with pre-loaded manifest
fast_runner = dbtRunner(manifest=manifest)
# Subsequent runs will be faster
run_result = fast_runner.invoke(['run'])
test_result = fast_runner.invoke(['test'])These commands return RunExecutionResult with detailed execution information:
result = runner.invoke(['run'])
if result.success:
run_results = result.result
# Access individual node results
for node_result in run_results.results:
print(f"Node: {node_result.unique_id}")
print(f"Status: {node_result.status}")
print(f"Execution time: {node_result.execution_time}")
if hasattr(node_result, 'adapter_response'):
print(f"Rows affected: {node_result.adapter_response.rows_affected}")Returns the parsed Manifest object:
result = runner.invoke(['parse'])
if result.success:
manifest = result.result
# Access manifest contents
print(f"Found {len(manifest.nodes)} nodes")
print(f"Found {len(manifest.sources)} sources")
# Access specific nodes
for node_id, node in manifest.nodes.items():
if node.resource_type == 'model':
print(f"Model: {node.name} in {node.original_file_path}")Return list of resource names:
result = runner.invoke(['list', '--resource-type', 'model'])
if result.success:
model_names = result.result
print(f"Found models: {', '.join(model_names)}")Returns CatalogArtifact with generated documentation:
result = runner.invoke(['docs', 'generate'])
if result.success:
catalog = result.result
print(f"Generated docs for {len(catalog.nodes)} nodes")Returns boolean indicating if debug checks passed:
result = runner.invoke(['debug'])
if result.success and result.result:
print("All debug checks passed")
else:
print("Debug checks failed")Handle different types of failures:
from dbt.cli.exceptions import DbtUsageException, DbtInternalException
result = runner.invoke(['run', '--invalid-option'])
if not result.success:
if isinstance(result.exception, DbtUsageException):
print(f"Usage error: {result.exception}")
elif isinstance(result.exception, DbtInternalException):
print(f"Internal error: {result.exception}")
else:
print(f"Unexpected error: {result.exception}")from airflow import DAG
from airflow.operators.python import PythonOperator
from dbt.cli.main import dbtRunner
def run_dbt_models(**context):
runner = dbtRunner()
result = runner.invoke(['run'])
if not result.success:
raise Exception(f"dbt run failed: {result.exception}")
return result.result
dag = DAG('dbt_pipeline', ...)
dbt_task = PythonOperator(
task_id='run_dbt',
python_callable=run_dbt_models,
dag=dag
)import sys
from dbt.cli.main import dbtRunner
def ci_pipeline():
runner = dbtRunner()
# Parse project
parse_result = runner.invoke(['parse'])
if not parse_result.success:
print("Parse failed")
return False
# Run models
run_result = runner.invoke(['run'])
if not run_result.success:
print("Run failed")
return False
# Run tests
test_result = runner.invoke(['test'])
if not test_result.success:
print("Tests failed")
return False
print("All checks passed")
return True
if __name__ == "__main__":
success = ci_pipeline()
sys.exit(0 if success else 1)Install with Tessl CLI
npx tessl i tessl/pypi-dbt-core