Package for Databricks-specific Dagster framework op and resource components.
npx @tessl/cli install tessl/pypi-dagster-databricks@0.27.0A comprehensive integration library for connecting Dagster data orchestration framework with Databricks analytics platform. Enables users to execute Dagster ops and assets on Databricks clusters through multiple execution patterns including PySpark step launcher, Databricks job runner, and Dagster Pipes integration.
pip install dagster-databricksfrom dagster_databricks import (
DatabricksClient,
DatabricksError,
DatabricksJobRunner,
DatabricksPySparkStepLauncher,
databricks_pyspark_step_launcher,
PipesDatabricksClient,
PipesDbfsContextInjector,
PipesDbfsMessageReader,
PipesDbfsLogReader,
DatabricksClientResource,
databricks_client,
create_databricks_run_now_op,
create_databricks_submit_run_op,
)from dagster import job, op, Config
from dagster_databricks import (
DatabricksClientResource,
create_databricks_run_now_op,
PipesDatabricksClient,
PipesDbfsContextInjector,
PipesDbfsMessageReader,
)
# Define Databricks resource
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
token="your-access-token"
)
# Create an op to run existing Databricks job
run_databricks_job = create_databricks_run_now_op(
databricks_job_id=123,
databricks_job_configuration={
"python_params": ["--input", "table1", "--output", "table2"]
}
)
# Use Pipes for bidirectional communication
@op
def process_with_pipes(context):
client = PipesDatabricksClient(
client=context.resources.databricks.workspace_client,
context_injector=PipesDbfsContextInjector(
client=context.resources.databricks.workspace_client
),
message_reader=PipesDbfsMessageReader(
client=context.resources.databricks.workspace_client
),
)
return client.run(
context=context,
task={
"notebook_task": {
"notebook_path": "/path/to/notebook",
"base_parameters": {"param1": "value1"}
}
},
cluster={"existing": "cluster-id"}
)
@job(resource_defs={"databricks": databricks_resource})
def my_databricks_job():
run_databricks_job()
process_with_pipes()The dagster-databricks integration provides multiple execution patterns:
DatabricksClientDatabricksJobRunnerDatabricksPySparkStepLauncherPipesDatabricksClientcreate_databricks_run_now_op and create_databricks_submit_run_opThis multi-layered approach enables seamless workflow orchestration across Databricks environments while providing flexibility for different integration needs.
Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. Supports multiple authentication methods including PAT, OAuth, and Azure service principal.
class DatabricksClient:
def __init__(
self,
host: Optional[str] = None,
token: Optional[str] = None,
oauth_client_id: Optional[str] = None,
oauth_client_secret: Optional[str] = None,
azure_client_id: Optional[str] = None,
azure_client_secret: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
workspace_id: Optional[str] = None,
): ...
@property
def workspace_client(self) -> WorkspaceClient: ...
def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes: ...
def put_file(self, file_obj: IO, dbfs_path: str, overwrite: bool = False, block_size: int = 1024**2) -> None: ...
def get_run_state(self, databricks_run_id: int) -> DatabricksRunState: ...
def wait_for_run_to_complete(
self,
logger: logging.Logger,
databricks_run_id: int,
poll_interval_sec: float,
max_wait_time_sec: float,
verbose_logs: bool = True,
) -> None: ...High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. Handles job configuration, library installation, cluster management, and execution lifecycle.
class DatabricksJobRunner:
def __init__(
self,
host: Optional[str] = None,
token: Optional[str] = None,
oauth_client_id: Optional[str] = None,
oauth_client_secret: Optional[str] = None,
azure_client_id: Optional[str] = None,
azure_client_secret: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
poll_interval_sec: float = 5,
max_wait_time_sec: float = 86400,
): ...
@property
def client(self) -> DatabricksClient: ...
def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int: ...
def retrieve_logs_for_run_id(self, log: logging.Logger, databricks_run_id: int) -> Optional[tuple[Optional[str], Optional[str]]]: ...Step launcher that executes individual Dagster ops on Databricks clusters using PySpark. Provides cluster provisioning, code packaging, dependency management, and result collection.
class DatabricksPySparkStepLauncher:
"""Step launcher for running PySpark steps on Databricks clusters."""
def databricks_pyspark_step_launcher(init_context: InitResourceContext) -> DatabricksPySparkStepLauncher: ...
class DatabricksConfig:
"""Configuration schema for Databricks step launcher."""Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Supports both standard and serverless Databricks environments.
class PipesDatabricksClient(BasePipesDatabricksClient):
def __init__(
self,
client: WorkspaceClient,
context_injector: Optional[PipesContextInjector] = None,
message_reader: Optional[PipesMessageReader] = None,
poll_interval_seconds: float = 5,
forward_termination: bool = True,
): ...
def run(
self,
*,
context: Union[OpExecutionContext, AssetExecutionContext],
extras: Optional[PipesExtras] = None,
**kwargs,
): ...
class PipesDbfsContextInjector(PipesContextInjector):
def __init__(self, *, client: WorkspaceClient): ...
class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
def __init__(
self,
*,
interval: float = 10,
client: WorkspaceClient,
include_stdio_in_messages: bool = False,
log_readers: Optional[Sequence[PipesLogReader]] = None,
): ...
class PipesDbfsLogReader(PipesChunkedLogReader):
def __init__(
self,
*,
interval: float = 10,
remote_log_name: Literal["stdout", "stderr"],
target_stream: TextIO,
client: WorkspaceClient,
debug_info: Optional[str] = None,
): ...Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling.
class DatabricksClientResource(ConfigurableResource):
host: Optional[str] = None
token: Optional[str] = None
oauth_credentials: Optional[OauthCredentials] = None
azure_credentials: Optional[AzureServicePrincipalCredentials] = None
workspace_id: Optional[str] = None
def get_client(self) -> DatabricksClient: ...
def databricks_client(init_context) -> DatabricksClient: ...
class OauthCredentials:
client_id: str
client_secret: str
class AzureServicePrincipalCredentials:
azure_client_id: str
azure_client_secret: str
azure_tenant_id: strFactory functions for creating pre-configured ops that handle common Databricks workflows including running existing jobs and submitting one-time tasks.
def create_databricks_run_now_op(
databricks_job_id: int,
databricks_job_configuration: Optional[dict] = None,
poll_interval_seconds: float = 10,
max_wait_time_seconds: float = 86400,
name: Optional[str] = None,
databricks_resource_key: str = "databricks",
) -> OpDefinition: ...
def create_databricks_submit_run_op(
databricks_job_configuration: dict,
poll_interval_seconds: float = 10,
max_wait_time_seconds: float = 86400,
name: Optional[str] = None,
databricks_resource_key: str = "databricks",
) -> OpDefinition: ...Core type definitions used throughout the Databricks integration:
class DatabricksRunState(NamedTuple):
life_cycle_state: Optional[DatabricksRunLifeCycleState]
result_state: Optional[DatabricksRunResultState]
state_message: Optional[str]
def has_terminated(self) -> bool: ...
def is_skipped(self) -> bool: ...
def is_successful(self) -> bool: ...
@classmethod
def from_databricks(cls, run_state: jobs.RunState) -> DatabricksRunState: ...
class DatabricksRunResultState(str, Enum):
CANCELED = "CANCELED"
FAILED = "FAILED"
SUCCESS = "SUCCESS"
TIMEDOUT = "TIMEDOUT"
def is_successful(self) -> bool: ...
class DatabricksRunLifeCycleState(str, Enum):
BLOCKED = "BLOCKED"
INTERNAL_ERROR = "INTERNAL_ERROR"
QUEUED = "QUEUED"
PENDING = "PENDING"
RUNNING = "RUNNING"
SKIPPED = "SKIPPED"
TERMINATED = "TERMINATED"
TERMINATING = "TERMINATING"
WAITING_FOR_RETRY = "WAITING_FOR_RETRY"
def has_terminated(self) -> bool: ...
def is_skipped(self) -> bool: ...
class DatabricksError(Exception):
"""Custom exception for Databricks-related errors."""