CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

job-management.mddocs/

Job Management

High-level job submission, monitoring, and log retrieval functionality through the DatabricksJobRunner. This component handles the complete lifecycle of Databricks jobs including configuration, library installation, cluster management, and execution monitoring.

Capabilities

DatabricksJobRunner

High-level interface for submitting and managing Databricks jobs with comprehensive configuration options and automatic monitoring.

class DatabricksJobRunner:
    """Submits jobs created using Dagster config to Databricks, and monitors their progress."""
    
    def __init__(
        self,
        host: Optional[str] = None,
        token: Optional[str] = None,
        oauth_client_id: Optional[str] = None,
        oauth_client_secret: Optional[str] = None,
        azure_client_id: Optional[str] = None,
        azure_client_secret: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        poll_interval_sec: float = 5,
        max_wait_time_sec: float = 86400,
    ):
        """
        Initialize the Databricks job runner.
        
        Parameters:
        - host: Databricks workspace URL
        - token: Personal access token for authentication
        - oauth_client_id: OAuth client ID for service principal authentication
        - oauth_client_secret: OAuth client secret for service principal authentication
        - azure_client_id: Azure service principal client ID
        - azure_client_secret: Azure service principal client secret
        - azure_tenant_id: Azure tenant ID
        - poll_interval_sec: How often to poll Databricks for run status
        - max_wait_time_sec: How long to wait for a run to complete before failing (default 24 hours)
        """

Client Access

Access to the underlying DatabricksClient for advanced operations.

@property
def client(self) -> DatabricksClient:
    """Return the underlying DatabricksClient object."""

Job Submission

Submit new Databricks runs with comprehensive configuration options including cluster management, library installation, and task specification.

def submit_run(self, run_config: Mapping[str, Any], task: Mapping[str, Any]) -> int:
    """
    Submit a new run using the 'Runs submit' API.
    
    Parameters:
    - run_config: Configuration for the run including cluster, libraries, and settings
    - task: Task specification (notebook_task, spark_python_task, etc.)
    
    Returns:
    int: The run ID of the submitted job
    
    Raises:
    DatabricksError: If job submission fails
    """

Log Retrieval

Retrieve execution logs from completed Databricks runs for debugging and monitoring.

def retrieve_logs_for_run_id(
    self, 
    log: logging.Logger, 
    databricks_run_id: int
) -> Optional[tuple[Optional[str], Optional[str]]]:
    """
    Retrieve the stdout and stderr logs for a run.
    
    Parameters:
    - log: Logger for status messages
    - databricks_run_id: ID of the completed run
    
    Returns:
    Optional[tuple[Optional[str], Optional[str]]]: (stdout, stderr) logs or None if not available
    """

def wait_for_dbfs_logs(
    self,
    log: logging.Logger,
    prefix: str,
    cluster_id: str,
    filename: str,
    waiter_delay: int = 10,
    waiter_max_attempts: int = 10,
) -> Optional[str]:
    """
    Attempt to get logs from DBFS with retry logic.
    
    Parameters:
    - log: Logger for status messages
    - prefix: DBFS prefix path for logs
    - cluster_id: Databricks cluster ID
    - filename: Log filename (stdout/stderr)
    - waiter_delay: Delay between retry attempts in seconds
    - waiter_max_attempts: Maximum number of retry attempts
    
    Returns:
    Optional[str]: Log content or None if retrieval fails
    """

Run Configuration Structure

The run_config parameter for submit_run supports comprehensive job configuration:

Cluster Configuration

# Using existing cluster
run_config = {
    "cluster": {
        "existing": "cluster-id-here"
    }
}

# Using new cluster
run_config = {
    "cluster": {
        "new": {
            "nodes": {
                "node_types": {
                    "node_type_id": "i3.xlarge",
                    "driver_node_type_id": "i3.xlarge"  # optional
                }
            },
            "size": {
                "num_workers": 2
                # OR autoscaling:
                # "autoscale": {"min_workers": 1, "max_workers": 5}
            },
            "spark_version": "11.3.x-scala2.12",
            "custom_tags": {"project": "my-project"}
        }
    }
}

Library Configuration

run_config = {
    "libraries": [
        {"pypi": {"package": "pandas==1.5.0"}},
        {"pypi": {"package": "numpy>=1.20.0"}},
        {"maven": {"coordinates": "org.apache.spark:spark-sql_2.12:3.3.0"}},
        {"jar": "s3://my-bucket/my-jar.jar"}
    ],
    "install_default_libraries": True  # Automatically install dagster dependencies
}

Task Configuration

# Notebook task
task = {
    "notebook_task": {
        "notebook_path": "/Users/user@example.com/MyNotebook",
        "base_parameters": {"param1": "value1", "param2": "value2"}
    }
}

# Python task
task = {
    "spark_python_task": {
        "python_file": "s3://my-bucket/my-script.py",
        "parameters": ["--input", "table1", "--output", "table2"]
    }
}

# JAR task
task = {
    "spark_jar_task": {
        "main_class_name": "com.example.MyMainClass",
        "parameters": ["arg1", "arg2"]
    }
}

Usage Examples

Basic Job Submission

from dagster_databricks import DatabricksJobRunner

runner = DatabricksJobRunner(
    host="https://your-workspace.cloud.databricks.com",
    token="your-access-token",
    poll_interval_sec=10,
    max_wait_time_sec=3600
)

run_config = {
    "run_name": "My Dagster Job",
    "cluster": {"existing": "existing-cluster-id"},
    "libraries": [
        {"pypi": {"package": "pandas==1.5.0"}}
    ]
}

task = {
    "notebook_task": {
        "notebook_path": "/Users/user@example.com/DataProcessing",
        "base_parameters": {
            "input_table": "raw_data",
            "output_table": "processed_data"
        }
    }
}

# Submit and get run ID
run_id = runner.submit_run(run_config, task)
print(f"Submitted job with run ID: {run_id}")

Advanced Configuration with New Cluster

run_config = {
    "run_name": "Advanced Processing Job",
    "cluster": {
        "new": {
            "nodes": {
                "node_types": {
                    "node_type_id": "i3.xlarge",
                    "driver_node_type_id": "i3.2xlarge"
                }
            },
            "size": {
                "autoscale": {"min_workers": 1, "max_workers": 10}
            },
            "spark_version": "11.3.x-scala2.12",
            "custom_tags": {
                "project": "data-pipeline",
                "environment": "production"
            }
        }
    },
    "libraries": [
        {"pypi": {"package": "scikit-learn==1.1.0"}},
        {"pypi": {"package": "boto3==1.24.0"}}
    ],
    "timeout_seconds": 7200,  # 2 hour timeout
    "email_notifications": {
        "on_start": ["admin@company.com"],
        "on_success": ["admin@company.com"],
        "on_failure": ["admin@company.com", "oncall@company.com"]
    }
}

task = {
    "spark_python_task": {
        "python_file": "s3://my-bucket/ml-pipeline.py",
        "parameters": [
            "--model", "random-forest",
            "--data-path", "s3://data-bucket/training-data/",
            "--output-path", "s3://model-bucket/models/"
        ]
    }
}

run_id = runner.submit_run(run_config, task)

Log Retrieval

import logging

logger = logging.getLogger(__name__)

# Wait for job completion (automatic in submit_run)
# Then retrieve logs
logs = runner.retrieve_logs_for_run_id(logger, run_id)
if logs:
    stdout, stderr = logs
    if stdout:
        print("STDOUT:", stdout)
    if stderr:
        print("STDERR:", stderr)
else:
    print("Logs not available")

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json