CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

84

1.00x
Overview
Eval results
Files

hooks.mddocs/

dbt Cloud Hook

The DbtCloudHook provides a comprehensive low-level interface for interacting with the dbt Cloud API. It handles authentication, connection management, and provides methods for all major dbt Cloud operations including account management, project operations, job execution, and artifact retrieval.

Capabilities

Connection and Authentication

Manages authentication and connection setup for dbt Cloud API interactions.

class DbtCloudHook:
    def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...
    
    def get_conn(self, *args, **kwargs) -> Session:
        """
        Returns authenticated session for dbt Cloud API.
        
        Returns:
            requests.Session: Authenticated session with proper headers
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the dbt Cloud connection.
        
        Returns:
            tuple[bool, str]: (success, message) tuple indicating connection status
        """

Account Management

Methods for retrieving and managing dbt Cloud account information.

def list_accounts(self) -> list[Response]:
    """
    Retrieve all authorized accounts.
    
    Returns:
        list[Response]: List of account information responses
    """

def get_account(self, account_id: int | None = None) -> Response:
    """
    Get specific account metadata.
    
    Args:
        account_id: Account ID (defaults to connection default)
        
    Returns:
        Response: Account metadata response
    """

Project Management

Methods for managing dbt Cloud projects within accounts.

def list_projects(
    self, 
    account_id: int | None = None, 
    name_contains: str | None = None
) -> list[Response]:
    """
    Retrieve projects in an account.
    
    Args:
        account_id: Account ID (defaults to connection default)
        name_contains: Filter projects by name substring
        
    Returns:
        list[Response]: List of project information responses
    """

def get_project(self, project_id: int, account_id: int | None = None) -> Response:
    """
    Get specific project details.
    
    Args:
        project_id: Project ID to retrieve
        account_id: Account ID (defaults to connection default)
        
    Returns:
        Response: Project details response
    """

Environment Management

Methods for managing dbt Cloud environments within projects.

def list_environments(
    self,
    project_id: int,
    *,
    name_contains: str | None = None,
    account_id: int | None = None
) -> list[Response]:
    """
    Retrieve environments for a project.
    
    Args:
        project_id: Project ID to query
        name_contains: Filter environments by name substring
        account_id: Account ID (defaults to connection default)
        
    Returns:
        list[Response]: List of environment information responses
    """

def get_environment(
    self,
    project_id: int,
    environment_id: int,
    *,
    account_id: int | None = None
) -> Response:
    """
    Get specific environment details.
    
    Args:
        project_id: Project ID containing the environment
        environment_id: Environment ID to retrieve
        account_id: Account ID (defaults to connection default)
        
    Returns:
        Response: Environment details response
    """

Job Management

Methods for managing dbt Cloud jobs and their configurations.

def list_jobs(
    self,
    account_id: int | None = None,
    order_by: str | None = None,
    project_id: int | None = None,
    environment_id: int | None = None,
    name_contains: str | None = None
) -> list[Response]:
    """
    Retrieve jobs with optional filtering.
    
    Args:
        account_id: Account ID (defaults to connection default)
        order_by: Field to order results by
        project_id: Filter jobs by project ID
        environment_id: Filter jobs by environment ID
        name_contains: Filter jobs by name substring
        
    Returns:
        list[Response]: List of job information responses
    """

def get_job(self, job_id: int, account_id: int | None = None) -> Response:
    """
    Get specific job details.
    
    Args:
        job_id: Job ID to retrieve
        account_id: Account ID (defaults to connection default)
        
    Returns:
        Response: Job details response
    """

def get_job_by_name(
    self,
    *,
    project_name: str,
    environment_name: str,
    job_name: str,
    account_id: int | None = None
) -> dict:
    """
    Lookup job by project, environment, and job names.
    
    Args:
        project_name: Name of the project containing the job
        environment_name: Name of the environment containing the job
        job_name: Name of the job to find
        account_id: Account ID (defaults to connection default)
        
    Returns:
        dict: Job information with job_id and related metadata
        
    Raises:
        DbtCloudResourceLookupError: If job cannot be found
    """

Job Run Execution

Methods for triggering and managing dbt Cloud job runs.

def trigger_job_run(
    self,
    job_id: int,
    cause: str,
    account_id: int | None = None,
    steps_override: list[str] | None = None,
    schema_override: str | None = None,
    retry_from_failure: bool = False,
    additional_run_config: dict[str, Any] | None = None
) -> Response:
    """
    Trigger execution of a dbt Cloud job.
    
    Args:
        job_id: ID of job to execute
        cause: Reason for triggering the job (max 255 chars)
        account_id: Account ID (defaults to connection default)
        steps_override: Custom list of dbt steps to run
        schema_override: Override default schema/dataset
        retry_from_failure: Resume from last failure point
        additional_run_config: Additional configuration for the run
        
    Returns:
        Response: Job run creation response containing run_id
    """

def cancel_job_run(self, run_id: int, account_id: int | None = None) -> None:
    """
    Cancel a running job.
    
    Args:
        run_id: ID of job run to cancel
        account_id: Account ID (defaults to connection default)
    """

def retry_failed_job_run(
    self, job_id: int, account_id: int | None = None
) -> Response:
    """
    Retry the most recent failed run of a job.
    
    Args:
        job_id: ID of job to retry
        account_id: Account ID (defaults to connection default)
        
    Returns:
        Response: New job run creation response
    """

Job Run Monitoring

Methods for monitoring and querying job run status and details.

def list_job_runs(
    self,
    account_id: int | None = None,
    include_related: list[str] | None = None,
    job_definition_id: int | None = None,
    order_by: str | None = None
) -> list[Response]:
    """
    Retrieve job runs with optional filtering.
    
    Args:
        account_id: Account ID (defaults to connection default)
        include_related: Related resources to include in response
        job_definition_id: Filter runs by job ID
        order_by: Field to order results by
        
    Returns:
        list[Response]: List of job run information responses
    """

def get_job_runs(
    self,
    account_id: int | None = None,
    payload: dict[str, Any] | None = None
) -> Response:
    """
    Get job runs with advanced filtering via payload.
    
    Args:
        account_id: Account ID (defaults to connection default)
        payload: Advanced filter and pagination parameters
        
    Returns:
        Response: Job runs response with pagination info
    """

def get_job_run(
    self,
    run_id: int,
    account_id: int | None = None,
    include_related: list[str] | None = None
) -> Response:
    """
    Get specific job run details.
    
    Args:
        run_id: Job run ID to retrieve
        account_id: Account ID (defaults to connection default)
        include_related: Related resources to include in response
        
    Returns:
        Response: Job run details response
    """

def get_job_run_status(self, run_id: int, account_id: int | None = None) -> int:
    """
    Get current status of a job run.
    
    Args:
        run_id: Job run ID to check
        account_id: Account ID (defaults to connection default)
        
    Returns:
        int: Status code from DbtCloudJobRunStatus enum
    """

def wait_for_job_run_status(
    self,
    run_id: int,
    account_id: int | None = None,
    expected_statuses: int | Sequence[int] | set[int] = DbtCloudJobRunStatus.SUCCESS.value,
    check_interval: int = 60,
    timeout: int = 60 * 60 * 24 * 7
) -> bool:
    """
    Wait for job run to reach expected status.
    
    Args:
        run_id: Job run ID to monitor
        account_id: Account ID (defaults to connection default)
        expected_statuses: Status(es) to wait for
        check_interval: Seconds between status checks
        timeout: Maximum seconds to wait
        
    Returns:
        bool: True if expected status reached, False if timeout
        
    Raises:
        DbtCloudJobRunException: If job run fails or is cancelled
    """

Artifact Management

Methods for retrieving job run artifacts and outputs.

def list_job_run_artifacts(
    self,
    run_id: int,
    account_id: int | None = None,
    step: int | None = None
) -> list[Response]:
    """
    List available artifacts for a job run.
    
    Args:
        run_id: Job run ID to query
        account_id: Account ID (defaults to connection default)
        step: Specific step number to list artifacts for
        
    Returns:
        list[Response]: List of available artifact information
    """

def get_job_run_artifact(
    self,
    run_id: int,
    path: str,
    account_id: int | None = None,
    step: int | None = None
) -> Response:
    """
    Download a specific job run artifact.
    
    Args:
        run_id: Job run ID containing the artifact
        path: Path to the artifact (e.g., 'manifest.json', 'run_results.json')
        account_id: Account ID (defaults to connection default)
        step: Specific step number to get artifact from
        
    Returns:
        Response: Artifact content response
    """

async def get_job_run_artifacts_concurrently(
    self,
    run_id: int,
    artifacts: list[str],
    account_id: int | None = None,
    step: int | None = None
):
    """
    Download multiple artifacts concurrently.
    
    Args:
        run_id: Job run ID containing the artifacts
        artifacts: List of artifact paths to download
        account_id: Account ID (defaults to connection default)
        step: Specific step number to get artifacts from
        
    Returns:
        List of artifact content responses
    """

Usage Examples

Basic Job Execution

from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook

# Initialize hook
hook = DbtCloudHook(dbt_cloud_conn_id='dbt_cloud_default')

# Trigger a job run
response = hook.trigger_job_run(
    job_id=12345,
    cause="Airflow scheduled run",
    steps_override=["dbt run", "dbt test"],
    schema_override="dev_schema"
)
run_id = response.json()['data']['id']

# Wait for completion
success = hook.wait_for_job_run_status(
    run_id=run_id,
    expected_statuses=DbtCloudJobRunStatus.SUCCESS,
    check_interval=60,
    timeout=3600
)

Artifact Retrieval

# List available artifacts
artifacts = hook.list_job_run_artifacts(run_id=run_id)

# Download specific artifacts
manifest = hook.get_job_run_artifact(run_id=run_id, path='manifest.json')
run_results = hook.get_job_run_artifact(run_id=run_id, path='run_results.json')

Resource Discovery

# Find job by name instead of ID
job_info = hook.get_job_by_name(
    project_name="analytics",
    environment_name="production",
    job_name="daily_transform"
)
job_id = job_info['job_id']

# List all jobs in a project
jobs = hook.list_jobs(project_id=123, name_contains="daily")

Types

from enum import IntEnum
from typing import TypedDict
from requests.auth import AuthBase
from requests import PreparedRequest, Session, Response

class DbtCloudJobRunStatus(Enum):
    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    ERROR = 20
    CANCELLED = 30
    NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)
    TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)
    
    @classmethod
    def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...
    
    @classmethod
    def is_terminal(cls, status: int) -> bool: ...

class JobRunInfo(TypedDict):
    account_id: int | None
    run_id: int

class TokenAuth(AuthBase):
    def __init__(self, token: str): ...
    def __call__(self, request: PreparedRequest) -> PreparedRequest: ...

class DbtCloudJobRunException(Exception):
    """Exception raised when a dbt Cloud job run fails."""

class DbtCloudResourceLookupError(Exception):
    """Exception raised when a dbt Cloud resource cannot be found."""

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-dbt-cloud

docs

hooks.md

index.md

openlineage.md

operators.md

sensors.md

triggers.md

tile.json