Package for Databricks-specific Dagster framework op and resource components.
—
High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. This component handles the complete lifecycle of Databricks jobs including configuration, library installation, cluster management, and execution monitoring.
High-level interface for submitting and managing Databricks jobs with comprehensive configuration options and automatic monitoring.
class DatabricksJobRunner:
"""Submits jobs created using Dagster config to Databricks, and monitors their progress."""
def __init__(
self,
host: Optional[str] = None,
token: Optional[str] = None,
oauth_client_id: Optional[str] = None,
oauth_client_secret: Optional[str] = None,
azure_client_id: Optional[str] = None,
azure_client_secret: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
poll_interval_sec: float = 5,
max_wait_time_sec: float = 86400,
):
"""
Initialize the Databricks job runner.
Parameters:
- host: Databricks workspace URL
- token: Personal access token for authentication
- oauth_client_id: OAuth client ID for service principal authentication
- oauth_client_secret: OAuth client secret for service principal authentication
- azure_client_id: Azure service principal client ID
- azure_client_secret: Azure service principal client secret
- azure_tenant_id: Azure tenant ID
- poll_interval_sec: How often to poll Databricks for run status
- max_wait_time_sec: How long to wait for a run to complete before failing (default 24 hours)
"""Access to the underlying DatabricksClient for advanced operations.
@property
def client(self) -> DatabricksClient:
"""Return the underlying DatabricksClient object."""Submit new Databricks runs with comprehensive configuration options including cluster management, library installation, and task specification.
def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int:
"""
Submit a new run using the 'Runs submit' API.
Parameters:
- run_config: Configuration for the run including cluster, libraries, and settings
- task: Task specification (notebook_task, spark_python_task, etc.)
Returns:
int: The run ID of the submitted job
Raises:
DatabricksError: If job submission fails
"""Retrieve execution logs from completed Databricks runs for debugging and monitoring.
def retrieve_logs_for_run_id(
self,
log: logging.Logger,
databricks_run_id: int
) -> Optional[tuple[Optional[str], Optional[str]]]:
"""
Retrieve the stdout and stderr logs for a run.
Parameters:
- log: Logger for status messages
- databricks_run_id: ID of the completed run
Returns:
Optional[tuple[Optional[str], Optional[str]]]: (stdout, stderr) logs or None if not available
"""
def wait_for_dbfs_logs(
self,
log: logging.Logger,
prefix: str,
cluster_id: str,
filename: str,
waiter_delay: int = 10,
waiter_max_attempts: int = 10,
) -> Optional[str]:
"""
Attempt to get logs from DBFS with retry logic.
Parameters:
- log: Logger for status messages
- prefix: DBFS prefix path for logs
- cluster_id: Databricks cluster ID
- filename: Log filename (stdout/stderr)
- waiter_delay: Delay between retry attempts in seconds
- waiter_max_attempts: Maximum number of retry attempts
Returns:
Optional[str]: Log content or None if retrieval fails
"""The run_config parameter for submit_run supports comprehensive job configuration:
# Using existing cluster
run_config = {
"cluster": {
"existing": "cluster-id-here"
}
}
# Using new cluster
run_config = {
"cluster": {
"new": {
"nodes": {
"node_types": {
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.xlarge" # optional
}
},
"size": {
"num_workers": 2
# OR autoscaling:
# "autoscale": {"min_workers": 1, "max_workers": 5}
},
"spark_version": "11.3.x-scala2.12",
"custom_tags": {"project": "my-project"}
}
}
}run_config = {
"libraries": [
{"pypi": {"package": "pandas==1.5.0"}},
{"pypi": {"package": "numpy>=1.20.0"}},
{"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}},
{"jar": "s3://my-bucket/my-jar.jar"}
],
"install_default_libraries": True # Automatically install dagster dependencies
}# Notebook task
task = {
"notebook_task": {
"notebook_path": "/Users/user@example.com/MyNotebook",
"base_parameters": {"param1": "value1", "param2": "value2"}
}
}
# Python task
task = {
"spark_python_task": {
"python_file": "s3://my-bucket/my-script.py",
"parameters": ["--input", "table1", "--output", "table2"]
}
}
# JAR task
task = {
"spark_jar_task": {
"main_class_name": "com.example.MyMainClass",
"parameters": ["arg1", "arg2"]
}
}from dagster_databricks import DatabricksJobRunner
runner = DatabricksJobRunner(
host="https://your-workspace.cloud.databricks.com",
token="your-access-token",
poll_interval_sec=10,
max_wait_time_sec=3600
)
run_config = {
"run_name": "My Dagster Job",
"cluster": {"existing": "existing-cluster-id"},
"libraries": [
{"pypi": {"package": "pandas==1.5.0"}}
]
}
task = {
"notebook_task": {
"notebook_path": "/Users/user@example.com/DataProcessing",
"base_parameters": {
"input_table": "raw_data",
"output_table": "processed_data"
}
}
}
# Submit and get run ID
run_id = runner.submit_run(run_config, task)
print(f"Submitted job with run ID: {run_id}")run_config = {
"run_name": "Advanced Processing Job",
"cluster": {
"new": {
"nodes": {
"node_types": {
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge"
}
},
"size": {
"autoscale": {"min_workers": 1, "max_workers": 10}
},
"spark_version": "11.3.x-scala2.12",
"custom_tags": {
"project": "data-pipeline",
"environment": "production"
}
}
},
"libraries": [
{"pypi": {"package": "scikit-learn==1.1.0"}},
{"pypi": {"package": "boto3==1.24.0"}}
],
"timeout_seconds": 7200, # 2 hour timeout
"email_notifications": {
"on_start": ["admin@company.com"],
"on_success": ["admin@company.com"],
"on_failure": ["admin@company.com", "oncall@company.com"]
}
}
task = {
"spark_python_task": {
"python_file": "s3://my-bucket/ml-pipeline.py",
"parameters": [
"--model", "random-forest",
"--data-path", "s3://data-bucket/training-data/",
"--output-path", "s3://model-bucket/models/"
]
}
}
run_id = runner.submit_run(run_config, task)import logging
logger = logging.getLogger(__name__)
# Wait for job completion (automatic in submit_run)
# Then retrieve logs
logs = runner.retrieve_logs_for_run_id(logger, run_id)
if logs:
stdout, stderr = logs
if stdout:
print("STDOUT:", stdout)
if stderr:
print("STDERR:", stderr)
else:
print("Logs not available")Install with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks