Package for Databricks-specific Dagster framework op and resource components.
—
Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling. These resources provide standardized ways to configure and access Databricks clients across different Dagster components.
Primary configurable resource for managing Databricks client instances with comprehensive authentication options and validation.
class DatabricksClientResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
"""Resource which provides a Python client for interacting with Databricks within an
op or asset."""
host: Optional[str] = None
token: Optional[str] = None
oauth_credentials: Optional[OauthCredentials] = None
azure_credentials: Optional[AzureServicePrincipalCredentials] = None
workspace_id: Optional[str] = None
def get_client(self) -> DatabricksClient:
"""
Create and return a configured DatabricksClient instance.
Returns:
DatabricksClient: Configured client with specified authentication
"""Traditional resource function for backward compatibility and simple use cases.
def databricks_client(init_context) -> DatabricksClient:
"""
Create a DatabricksClient from resource context configuration.
Parameters:
- init_context: Dagster resource initialization context
Returns:
DatabricksClient: Configured Databricks client
"""Configuration classes for different authentication methods.
class OauthCredentials:
"""OAuth credentials for Databricks service principal authentication."""
client_id: str
client_secret: str
class AzureServicePrincipalCredentials:
"""Azure service principal credentials for Azure Databricks."""
azure_client_id: str
azure_client_secret: str
azure_tenant_id: strMost common authentication method using a personal access token.
from dagster import job
from dagster_databricks import DatabricksClientResource
# Direct token configuration
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
token="dapi1234567890abcdef"
)
# Environment variable configuration
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"}
)
@job(resource_defs={"databricks": databricks_resource})
def my_job():
my_op()Secure authentication using Databricks OAuth service principal credentials.
from dagster_databricks import DatabricksClientResource, OauthCredentials
# Direct credential configuration
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
oauth_credentials=OauthCredentials(
client_id="your-client-id",
client_secret="your-client-secret"
)
)
# Environment variable configuration
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
oauth_credentials=OauthCredentials(
client_id={"env": "DATABRICKS_CLIENT_ID"},
client_secret={"env": "DATABRICKS_CLIENT_SECRET"}
)
)Authentication for Azure Databricks using Azure service principal credentials.
from dagster_databricks import DatabricksClientResource, AzureServicePrincipalCredentials
databricks_resource = DatabricksClientResource(
host="https://your-workspace.azuredatabricks.net",
azure_credentials=AzureServicePrincipalCredentials(
azure_client_id={"env": "AZURE_CLIENT_ID"},
azure_client_secret={"env": "AZURE_CLIENT_SECRET"},
azure_tenant_id={"env": "AZURE_TENANT_ID"}
)
)Automatic credential resolution from environment variables or configuration files.
# No explicit credentials - will read from environment or ~/.databrickscfg
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com"
)
# Or let it auto-detect host as well
databricks_resource = DatabricksClientResource()The resource includes comprehensive validation to ensure proper credential configuration:
Only one authentication method can be specified at a time:
# Valid - only token specified
DatabricksClientResource(
host="https://workspace.cloud.databricks.com",
token="your-token"
)
# Invalid - multiple auth methods (will raise ValueError)
DatabricksClientResource(
host="https://workspace.cloud.databricks.com",
token="your-token",
oauth_credentials=OauthCredentials(client_id="id", client_secret="secret")
)Each authentication method requires all necessary components:
# Valid OAuth configuration
oauth_credentials=OauthCredentials(
client_id="your-client-id",
client_secret="your-client-secret"
)
# Invalid - missing client_secret (will raise ValueError)
oauth_credentials=OauthCredentials(
client_id="your-client-id"
)from dagster import job, op, Config
from dagster_databricks import DatabricksClientResource
@op
def process_data(context):
# Access the Databricks client
databricks_client = context.resources.databricks
# Use the client for operations
workspace_client = databricks_client.workspace_client
jobs_api = workspace_client.jobs
# Submit a job
run_result = jobs_api.submit(
tasks=[{
"task_key": "process",
"existing_cluster_id": "cluster-id",
"notebook_task": {
"notebook_path": "/path/to/notebook"
}
}]
)
return run_result.run_id
@job(
resource_defs={
"databricks": DatabricksClientResource(
host={"env": "DATABRICKS_HOST"},
token={"env": "DATABRICKS_TOKEN"}
)
}
)
def data_processing_job():
process_data()from dagster import job, EnvVar
from dagster_databricks import DatabricksClientResource, OauthCredentials
# Development environment configuration
dev_databricks = DatabricksClientResource(
host="https://dev-workspace.cloud.databricks.com",
token=EnvVar("DEV_DATABRICKS_TOKEN")
)
# Production environment configuration
prod_databricks = DatabricksClientResource(
host="https://prod-workspace.cloud.databricks.com",
oauth_credentials=OauthCredentials(
client_id=EnvVar("PROD_DATABRICKS_CLIENT_ID"),
client_secret=EnvVar("PROD_DATABRICKS_CLIENT_SECRET")
)
)
# Job definition with environment-specific resources
@job(
resource_defs={
"databricks": dev_databricks # Switch based on deployment
}
)
def my_pipeline():
extract_data()
transform_data()
load_data()from dagster import job, op, resource
from dagster_databricks import DatabricksClientResource
@resource
def data_lake_config():
return {
"bucket": "my-data-lake",
"prefix": "processed-data/"
}
@op(required_resource_keys={"databricks", "data_lake"})
def etl_operation(context):
databricks = context.resources.databricks
data_lake = context.resources.data_lake
# Use both resources together
workspace_client = databricks.workspace_client
# Submit job with data lake configuration
run_result = workspace_client.jobs.submit(
tasks=[{
"task_key": "etl",
"existing_cluster_id": "etl-cluster",
"spark_python_task": {
"python_file": "s3://scripts/etl.py",
"parameters": [
"--bucket", data_lake["bucket"],
"--prefix", data_lake["prefix"]
]
}
}]
)
return run_result.run_id
@job(
resource_defs={
"databricks": DatabricksClientResource(
host={"env": "DATABRICKS_HOST"},
token={"env": "DATABRICKS_TOKEN"}
),
"data_lake": data_lake_config
}
)
def etl_pipeline():
etl_operation()For backwards compatibility, the traditional resource function is still available:
from dagster import job, op
from dagster_databricks import databricks_client
@op(required_resource_keys={"databricks"})
def legacy_op(context):
client = context.resources.databricks
return client.get_run_state(12345)
@job(
resource_defs={
"databricks": databricks_client.configured({
"host": {"env": "DATABRICKS_HOST"},
"token": {"env": "DATABRICKS_TOKEN"}
})
}
)
def legacy_job():
legacy_op()from dagster import build_op_context
from dagster_databricks import DatabricksClientResource
def test_databricks_operation():
# Create resource for testing
databricks_resource = DatabricksClientResource(
host="https://test-workspace.cloud.databricks.com",
token="test-token"
)
# Build context with resource
context = build_op_context(
resources={"databricks": databricks_resource}
)
# Test op execution
result = process_data(context)
assert result is not NoneInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks