or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md
tile.json

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-databricks@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-databricks@0.27.0

index.mddocs/

Dagster Databricks

A comprehensive integration library for connecting Dagster data orchestration framework with Databricks analytics platform. Enables users to execute Dagster ops and assets on Databricks clusters through multiple execution patterns including PySpark step launcher, Databricks job runner, and Dagster Pipes integration.

Package Information

  • Package Name: dagster-databricks
  • Language: Python
  • Installation: pip install dagster-databricks

Core Imports

from dagster_databricks import (
    DatabricksClient,
    DatabricksError,
    DatabricksJobRunner,
    DatabricksPySparkStepLauncher,
    databricks_pyspark_step_launcher,
    PipesDatabricksClient,
    PipesDbfsContextInjector,
    PipesDbfsMessageReader,
    PipesDbfsLogReader,
    DatabricksClientResource,
    databricks_client,
    create_databricks_run_now_op,
    create_databricks_submit_run_op,
)

Basic Usage

from dagster import job, op, Config
from dagster_databricks import (
    DatabricksClientResource,
    create_databricks_run_now_op,
    PipesDatabricksClient,
    PipesDbfsContextInjector,
    PipesDbfsMessageReader,
)

# Define Databricks resource
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    token="your-access-token"
)

# Create an op to run existing Databricks job
run_databricks_job = create_databricks_run_now_op(
    databricks_job_id=123,
    databricks_job_configuration={
        "python_params": ["--input", "table1", "--output", "table2"]
    }
)

# Use Pipes for bidirectional communication
@op
def process_with_pipes(context):
    client = PipesDatabricksClient(
        client=context.resources.databricks.workspace_client,
        context_injector=PipesDbfsContextInjector(
            client=context.resources.databricks.workspace_client
        ),
        message_reader=PipesDbfsMessageReader(
            client=context.resources.databricks.workspace_client
        ),
    )
    
    return client.run(
        context=context,
        task={
            "notebook_task": {
                "notebook_path": "/path/to/notebook",
                "base_parameters": {"param1": "value1"}
            }
        },
        cluster={"existing": "cluster-id"}
    )

@job(resource_defs={"databricks": databricks_resource})
def my_databricks_job():
    run_databricks_job()
    process_with_pipes()

Architecture

The dagster-databricks integration provides multiple execution patterns:

  • Direct Client API: Low-level access to Databricks REST API through DatabricksClient
  • Job Runner: High-level job submission and monitoring via DatabricksJobRunner
  • Step Launcher: Execute individual Dagster ops on Databricks clusters using DatabricksPySparkStepLauncher
  • Pipes Integration: Bidirectional communication with external Databricks processes through PipesDatabricksClient
  • Op Factories: Pre-built ops for common Databricks workflows using create_databricks_run_now_op and create_databricks_submit_run_op

This multi-layered approach enables seamless workflow orchestration across Databricks environments while providing flexibility for different integration needs.

Capabilities

Core Client API

Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. Supports multiple authentication methods including PAT, OAuth, and Azure service principal.

class DatabricksClient:
    def __init__(
        self,
        host: Optional[str] = None,
        token: Optional[str] = None,
        oauth_client_id: Optional[str] = None,
        oauth_client_secret: Optional[str] = None,
        azure_client_id: Optional[str] = None,
        azure_client_secret: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        workspace_id: Optional[str] = None,
    ): ...

    @property
    def workspace_client(self) -> WorkspaceClient: ...
    
    def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes: ...
    def put_file(self, file_obj: IO, dbfs_path: str, overwrite: bool = False, block_size: int = 1024**2) -> None: ...
    def get_run_state(self, databricks_run_id: int) -> DatabricksRunState: ...
    def wait_for_run_to_complete(
        self,
        logger: logging.Logger,
        databricks_run_id: int,
        poll_interval_sec: float,
        max_wait_time_sec: float,
        verbose_logs: bool = True,
    ) -> None: ...

Core Client API

Job Management

High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. Handles job configuration, library installation, cluster management, and execution lifecycle.

class DatabricksJobRunner:
    def __init__(
        self,
        host: Optional[str] = None,
        token: Optional[str] = None,
        oauth_client_id: Optional[str] = None,
        oauth_client_secret: Optional[str] = None,
        azure_client_id: Optional[str] = None,
        azure_client_secret: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        poll_interval_sec: float = 5,
        max_wait_time_sec: float = 86400,
    ): ...

    @property
    def client(self) -> DatabricksClient: ...
    
    def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int: ...
    def retrieve_logs_for_run_id(self, log: logging.Logger, databricks_run_id: int) -> Optional[tuple[Optional[str], Optional[str]]]: ...

Job Management

PySpark Step Launcher

Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. Provides cluster provisioning, code packaging, dependency management, and result collection.

class DatabricksPySparkStepLauncher:
    """Step launcher for running PySpark steps on Databricks clusters."""

def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher: ...

class DatabricksConfig:
    """Configuration schema for Databricks step launcher."""

PySpark Step Launcher

Pipes Integration

Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Supports both standard and serverless Databricks environments.

class PipesDatabricksClient(BasePipesDatabricksClient):
    def __init__(
        self,
        client: WorkspaceClient,
        context_injector: Optional[PipesContextInjector] = None,
        message_reader: Optional[PipesMessageReader] = None,
        poll_interval_seconds: float = 5,
        forward_termination: bool = True,
    ): ...
    
    def run(
        self,
        *,
        context: Union[OpExecutionContext, AssetExecutionContext],
        extras: Optional[PipesExtras] = None,
        **kwargs,
    ): ...

class PipesDbfsContextInjector(PipesContextInjector):
    def __init__(self, *, client: WorkspaceClient): ...

class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
    def __init__(
        self,
        *,
        interval: float = 10,
        client: WorkspaceClient,
        include_stdio_in_messages: bool = False,
        log_readers: Optional[Sequence[PipesLogReader]] = None,
    ): ...

class PipesDbfsLogReader(PipesChunkedLogReader):
    def __init__(
        self,
        *,
        interval: float = 10,
        remote_log_name: Literal["stdout", "stderr"],
        target_stream: TextIO,
        client: WorkspaceClient,
        debug_info: Optional[str] = None,
    ): ...

Pipes Integration

Resource Management

Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling.

class DatabricksClientResource(ConfigurableResource):
    host: Optional[str] = None
    token: Optional[str] = None
    oauth_credentials: Optional[OauthCredentials] = None
    azure_credentials: Optional[AzureServicePrincipalCredentials] = None
    workspace_id: Optional[str] = None
    
    def get_client(self) -> DatabricksClient: ...

def databricks_client(init_context) -> DatabricksClient: ...

class OauthCredentials:
    client_id: str
    client_secret: str

class AzureServicePrincipalCredentials:
    azure_client_id: str
    azure_client_secret: str
    azure_tenant_id: str

Resource Management

Op Factories

Factory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks.

def create_databricks_run_now_op(
    databricks_job_id: int,
    databricks_job_configuration: Optional[dict] = None,
    poll_interval_seconds: float = 10,
    max_wait_time_seconds: float = 86400,
    name: Optional[str] = None,
    databricks_resource_key: str = "databricks",
) -> OpDefinition: ...

def create_databricks_submit_run_op(
    databricks_job_configuration: dict,
    poll_interval_seconds: float = 10,
    max_wait_time_seconds: float = 86400,
    name: Optional[str] = None,
    databricks_resource_key: str = "databricks",
) -> OpDefinition: ...

Op Factories

Types

Core type definitions used throughout the Databricks integration:

class DatabricksRunState(NamedTuple):
    life_cycle_state: Optional[DatabricksRunLifeCycleState]
    result_state: Optional[DatabricksRunResultState]
    state_message: Optional[str]
    
    def has_terminated(self) -> bool: ...
    def is_skipped(self) -> bool: ...
    def is_successful(self) -> bool: ...
    @classmethod
    def from_databricks(cls, run_state: jobs.RunState) -> DatabricksRunState: ...

class DatabricksRunResultState(str, Enum):
    CANCELED = "CANCELED"
    FAILED = "FAILED"
    SUCCESS = "SUCCESS"
    TIMEDOUT = "TIMEDOUT"
    
    def is_successful(self) -> bool: ...

class DatabricksRunLifeCycleState(str, Enum):
    BLOCKED = "BLOCKED"
    INTERNAL_ERROR = "INTERNAL_ERROR"
    QUEUED = "QUEUED"
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    SKIPPED = "SKIPPED"
    TERMINATED = "TERMINATED"
    TERMINATING = "TERMINATING"
    WAITING_FOR_RETRY = "WAITING_FOR_RETRY"
    
    def has_terminated(self) -> bool: ...
    def is_skipped(self) -> bool: ...

class DatabricksError(Exception):
    """Custom exception for Databricks-related errors."""