CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dbt-core

A command-line tool for data transformation and analytics engineering workflows.

Pending
Overview
Eval results
Files

programmatic-api.mddocs/

Programmatic API

The dbt programmatic API allows Python applications to invoke dbt commands and access results without using the command-line interface. This is useful for integrating dbt into workflows, orchestration tools, and custom applications.

Core Classes

dbtRunner

The main class for programmatic dbt execution.

class dbtRunner:
    """Programmatic interface for invoking dbt commands from Python."""
    
    def __init__(
        self,
        manifest: Optional[Manifest] = None,
        callbacks: Optional[List[Callable[[EventMsg], None]]] = None,
    ) -> None:
        """
        Initialize the dbt runner.
        
        Args:
            manifest: Pre-loaded manifest to use (optional)
            callbacks: List of event callback functions (optional)
        """
    
    def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult:
        """
        Execute a dbt command programmatically.
        
        Args:
            args: Command arguments as list (e.g., ['run', '--select', 'my_model'])
            **kwargs: Additional parameters to override command options
            
        Returns:
            dbtRunnerResult: Result container with success status and data
        """

dbtRunnerResult

Container for results from dbtRunner invocations.

@dataclass
class dbtRunnerResult:
    """Contains the result of an invocation of the dbtRunner."""
    
    success: bool
    """Whether the dbt operation succeeded."""
    
    exception: Optional[BaseException] = None
    """Exception if the operation failed."""
    
    result: Union[
        bool,  # debug command
        CatalogArtifact,  # docs generate
        List[str],  # list/ls commands
        Manifest,  # parse command
        None,  # clean, deps, init, source commands
        RunExecutionResult,  # build, compile, run, seed, snapshot, test, run-operation
    ] = None
    """Result data, type varies by command executed."""

Usage Patterns

Basic Execution

Execute dbt commands and handle results:

from dbt.cli.main import dbtRunner, dbtRunnerResult

# Initialize runner
runner = dbtRunner()

# Execute dbt run
result: dbtRunnerResult = runner.invoke(['run'])

if result.success:
    print("dbt run succeeded")
    # Access run results
    run_results = result.result
    print(f"Executed {len(run_results.results)} nodes")
else:
    print(f"dbt run failed: {result.exception}")

Model Selection

Use dbt selection syntax to target specific models:

runner = dbtRunner()

# Run specific models
result = runner.invoke(['run', '--select', 'my_model'])

# Run changed models (requires state)
result = runner.invoke(['run', '--select', 'state:modified'])

# Run model and downstream dependencies
result = runner.invoke(['run', '--select', 'my_model+'])

# Run with exclusions
result = runner.invoke(['run', '--exclude', 'tag:deprecated'])

Configuration Override

Override command options via kwargs:

runner = dbtRunner()

# Override threading and target
result = runner.invoke(
    ['run'],
    threads=8,
    target='production',
    full_refresh=True
)

# Supply variables
result = runner.invoke(
    ['run'],
    vars='{"my_var": "my_value"}'
)

Event Callbacks

Register callbacks to handle dbt events during execution:

from dbt_common.events.base_types import EventMsg

def event_callback(event: EventMsg) -> None:
    """Handle dbt events during execution."""
    print(f"Event: {event.info.name} - {event.info.msg}")

# Initialize runner with callbacks
runner = dbtRunner(callbacks=[event_callback])

# Events will be sent to callback during execution
result = runner.invoke(['run'])

Pre-loaded Manifest

For better performance when running multiple commands, pre-load the manifest:

# First, parse the manifest
parse_result = runner.invoke(['parse'])
if parse_result.success:
    manifest = parse_result.result
    
    # Create new runner with pre-loaded manifest
    fast_runner = dbtRunner(manifest=manifest)
    
    # Subsequent runs will be faster
    run_result = fast_runner.invoke(['run'])
    test_result = fast_runner.invoke(['test'])

Command-Specific Results

Build, Run, Test Commands

These commands return RunExecutionResult with detailed execution information:

result = runner.invoke(['run'])
if result.success:
    run_results = result.result
    
    # Access individual node results
    for node_result in run_results.results:
        print(f"Node: {node_result.unique_id}")
        print(f"Status: {node_result.status}")
        print(f"Execution time: {node_result.execution_time}")
        
        if hasattr(node_result, 'adapter_response'):
            print(f"Rows affected: {node_result.adapter_response.rows_affected}")

Parse Command

Returns the parsed Manifest object:

result = runner.invoke(['parse'])
if result.success:
    manifest = result.result
    
    # Access manifest contents
    print(f"Found {len(manifest.nodes)} nodes")
    print(f"Found {len(manifest.sources)} sources")
    
    # Access specific nodes
    for node_id, node in manifest.nodes.items():
        if node.resource_type == 'model':
            print(f"Model: {node.name} in {node.original_file_path}")

List Commands

Return list of resource names:

result = runner.invoke(['list', '--resource-type', 'model'])
if result.success:
    model_names = result.result
    print(f"Found models: {', '.join(model_names)}")

Docs Generate

Returns CatalogArtifact with generated documentation:

result = runner.invoke(['docs', 'generate'])
if result.success:
    catalog = result.result
    print(f"Generated docs for {len(catalog.nodes)} nodes")

Debug Command

Returns boolean indicating if debug checks passed:

result = runner.invoke(['debug'])
if result.success and result.result:
    print("All debug checks passed")
else:
    print("Debug checks failed")

Error Handling

Handle different types of failures:

from dbt.cli.exceptions import DbtUsageException, DbtInternalException

result = runner.invoke(['run', '--invalid-option'])

if not result.success:
    if isinstance(result.exception, DbtUsageException):
        print(f"Usage error: {result.exception}")
    elif isinstance(result.exception, DbtInternalException):
        print(f"Internal error: {result.exception}")
    else:
        print(f"Unexpected error: {result.exception}")

Integration Examples

Airflow Integration

from airflow import DAG
from airflow.operators.python import PythonOperator
from dbt.cli.main import dbtRunner

def run_dbt_models(**context):
    runner = dbtRunner()
    result = runner.invoke(['run'])
    
    if not result.success:
        raise Exception(f"dbt run failed: {result.exception}")
    
    return result.result

dag = DAG('dbt_pipeline', ...)
dbt_task = PythonOperator(
    task_id='run_dbt',
    python_callable=run_dbt_models,
    dag=dag
)

CI/CD Pipeline

import sys
from dbt.cli.main import dbtRunner

def ci_pipeline():
    runner = dbtRunner()
    
    # Parse project
    parse_result = runner.invoke(['parse'])
    if not parse_result.success:
        print("Parse failed")
        return False
    
    # Run models
    run_result = runner.invoke(['run'])
    if not run_result.success:
        print("Run failed")
        return False
    
    # Run tests
    test_result = runner.invoke(['test'])
    if not test_result.success:
        print("Tests failed")
        return False
    
    print("All checks passed")
    return True

if __name__ == "__main__":
    success = ci_pipeline()
    sys.exit(0 if success else 1)

Install with Tessl CLI

npx tessl i tessl/pypi-dbt-core

docs

artifacts.md

cli-commands.md

configuration.md

exceptions.md

index.md

programmatic-api.md

version.md

tile.json