CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

resource-management.mddocs/

Resource Management

Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling. These resources provide standardized ways to configure and access Databricks clients across different Dagster components.

Capabilities

DatabricksClientResource

Primary configurable resource for managing Databricks client instances with comprehensive authentication options and validation.

class DatabricksClientResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
    """Resource which provides a Python client for interacting with Databricks within an
    op or asset."""
    
    host: Optional[str] = None
    token: Optional[str] = None
    oauth_credentials: Optional[OauthCredentials] = None
    azure_credentials: Optional[AzureServicePrincipalCredentials] = None
    workspace_id: Optional[str] = None
    
    def get_client(self) -> DatabricksClient:
        """
        Create and return a configured DatabricksClient instance.
        
        Returns:
        DatabricksClient: Configured client with specified authentication
        """

Legacy Resource Function

Traditional resource function for backward compatibility and simple use cases.

def databricks_client(init_context) -> DatabricksClient:
    """
    Create a DatabricksClient from resource context configuration.
    
    Parameters:
    - init_context: Dagster resource initialization context
    
    Returns:
    DatabricksClient: Configured Databricks client
    """

Authentication Credential Classes

Configuration classes for different authentication methods.

class OauthCredentials:
    """OAuth credentials for Databricks service principal authentication."""
    
    client_id: str
    client_secret: str

class AzureServicePrincipalCredentials:
    """Azure service principal credentials for Azure Databricks."""
    
    azure_client_id: str
    azure_client_secret: str
    azure_tenant_id: str

Authentication Methods

Personal Access Token (PAT)

Most common authentication method using a personal access token.

from dagster import job
from dagster_databricks import DatabricksClientResource

# Direct token configuration
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    token="dapi1234567890abcdef"
)

# Environment variable configuration
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    token={"env": "DATABRICKS_TOKEN"}
)

@job(resource_defs={"databricks": databricks_resource})
def my_job():
    my_op()

OAuth Service Principal

Secure authentication using Databricks OAuth service principal credentials.

from dagster_databricks import DatabricksClientResource, OauthCredentials

# Direct credential configuration
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    oauth_credentials=OauthCredentials(
        client_id="your-client-id",
        client_secret="your-client-secret"
    )
)

# Environment variable configuration
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    oauth_credentials=OauthCredentials(
        client_id={"env": "DATABRICKS_CLIENT_ID"},
        client_secret={"env": "DATABRICKS_CLIENT_SECRET"}
    )
)

Azure Service Principal

Authentication for Azure Databricks using Azure service principal credentials.

from dagster_databricks import DatabricksClientResource, AzureServicePrincipalCredentials

databricks_resource = DatabricksClientResource(
    host="https://your-workspace.azuredatabricks.net",
    azure_credentials=AzureServicePrincipalCredentials(
        azure_client_id={"env": "AZURE_CLIENT_ID"},
        azure_client_secret={"env": "AZURE_CLIENT_SECRET"},
        azure_tenant_id={"env": "AZURE_TENANT_ID"}
    )
)

Default Credentials

Automatic credential resolution from environment variables or configuration files.

# No explicit credentials - will read from environment or ~/.databrickscfg
databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com"
)

# Or let it auto-detect host as well
databricks_resource = DatabricksClientResource()

Configuration Validation

The resource includes comprehensive validation to ensure proper credential configuration:

Single Authentication Method

Only one authentication method can be specified at a time:

# Valid - only token specified
DatabricksClientResource(
    host="https://workspace.cloud.databricks.com",
    token="your-token"
)

# Invalid - multiple auth methods (will raise ValueError)
DatabricksClientResource(
    host="https://workspace.cloud.databricks.com",
    token="your-token",
    oauth_credentials=OauthCredentials(client_id="id", client_secret="secret")
)

Required Credential Components

Each authentication method requires all necessary components:

# Valid OAuth configuration
oauth_credentials=OauthCredentials(
    client_id="your-client-id",
    client_secret="your-client-secret"
)

# Invalid - missing client_secret (will raise ValueError)
oauth_credentials=OauthCredentials(
    client_id="your-client-id"
)

Usage Examples

Basic Resource Setup

from dagster import job, op, Config
from dagster_databricks import DatabricksClientResource

@op
def process_data(context):
    # Access the Databricks client
    databricks_client = context.resources.databricks
    
    # Use the client for operations
    workspace_client = databricks_client.workspace_client
    jobs_api = workspace_client.jobs
    
    # Submit a job
    run_result = jobs_api.submit(
        tasks=[{
            "task_key": "process",
            "existing_cluster_id": "cluster-id",
            "notebook_task": {
                "notebook_path": "/path/to/notebook"
            }
        }]
    )
    
    return run_result.run_id

@job(
    resource_defs={
        "databricks": DatabricksClientResource(
            host={"env": "DATABRICKS_HOST"},
            token={"env": "DATABRICKS_TOKEN"}
        )
    }
)
def data_processing_job():
    process_data()

Multi-Environment Configuration

from dagster import job, EnvVar
from dagster_databricks import DatabricksClientResource, OauthCredentials

# Development environment configuration
dev_databricks = DatabricksClientResource(
    host="https://dev-workspace.cloud.databricks.com",
    token=EnvVar("DEV_DATABRICKS_TOKEN")
)

# Production environment configuration
prod_databricks = DatabricksClientResource(
    host="https://prod-workspace.cloud.databricks.com",
    oauth_credentials=OauthCredentials(
        client_id=EnvVar("PROD_DATABRICKS_CLIENT_ID"),
        client_secret=EnvVar("PROD_DATABRICKS_CLIENT_SECRET")
    )
)

# Job definition with environment-specific resources
@job(
    resource_defs={
        "databricks": dev_databricks  # Switch based on deployment
    }
)
def my_pipeline():
    extract_data()
    transform_data()
    load_data()

Integration with Other Resources

from dagster import job, op, resource
from dagster_databricks import DatabricksClientResource

@resource
def data_lake_config():
    return {
        "bucket": "my-data-lake",
        "prefix": "processed-data/"
    }

@op(required_resource_keys={"databricks", "data_lake"})
def etl_operation(context):
    databricks = context.resources.databricks
    data_lake = context.resources.data_lake
    
    # Use both resources together
    workspace_client = databricks.workspace_client
    
    # Submit job with data lake configuration
    run_result = workspace_client.jobs.submit(
        tasks=[{
            "task_key": "etl",
            "existing_cluster_id": "etl-cluster",
            "spark_python_task": {
                "python_file": "s3://scripts/etl.py",
                "parameters": [
                    "--bucket", data_lake["bucket"],
                    "--prefix", data_lake["prefix"]
                ]
            }
        }]
    )
    
    return run_result.run_id

@job(
    resource_defs={
        "databricks": DatabricksClientResource(
            host={"env": "DATABRICKS_HOST"},
            token={"env": "DATABRICKS_TOKEN"}
        ),
        "data_lake": data_lake_config
    }
)
def etl_pipeline():
    etl_operation()

Legacy Resource Usage

For backwards compatibility, the traditional resource function is still available:

from dagster import job, op
from dagster_databricks import databricks_client

@op(required_resource_keys={"databricks"})
def legacy_op(context):
    client = context.resources.databricks
    return client.get_run_state(12345)

@job(
    resource_defs={
        "databricks": databricks_client.configured({
            "host": {"env": "DATABRICKS_HOST"},
            "token": {"env": "DATABRICKS_TOKEN"}
        })
    }
)
def legacy_job():
    legacy_op()

Resource Testing

from dagster import build_op_context
from dagster_databricks import DatabricksClientResource

def test_databricks_operation():
    # Create resource for testing
    databricks_resource = DatabricksClientResource(
        host="https://test-workspace.cloud.databricks.com",
        token="test-token"
    )
    
    # Build context with resource
    context = build_op_context(
        resources={"databricks": databricks_resource}
    )
    
    # Test op execution
    result = process_data(context)
    assert result is not None

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json