Package for Databricks-specific Dagster framework op and resource components.
—
Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Dagster Pipes enables seamless integration between Dagster orchestration and external Databricks workloads, supporting both standard and serverless environments.
Main client for running external code on Databricks with bidirectional communication through the Dagster Pipes protocol.
class PipesDatabricksClient(BasePipesDatabricksClient):
"""Pipes client for running external code on Databricks with bidirectional communication."""
def __init__(
self,
client: WorkspaceClient,
context_injector: Optional[PipesContextInjector] = None,
message_reader: Optional[PipesMessageReader] = None,
poll_interval_seconds: float = 5,
forward_termination: bool = True,
):
"""
Initialize the Pipes Databricks client.
Parameters:
- client: Databricks WorkspaceClient for API interactions
- context_injector: Component for injecting Dagster context into external process
- message_reader: Component for reading messages from external process
- poll_interval_seconds: How often to poll for job completion
- forward_termination: Whether to forward termination signals to external process
"""
def run(
self,
*,
context: Union[OpExecutionContext, AssetExecutionContext],
extras: Optional[PipesExtras] = None,
**kwargs,
) -> PipesClientCompletedInvocation:
"""
Execute external code on Databricks with bidirectional communication.
Parameters:
- context: Dagster execution context (op or asset)
- extras: Additional context data to pass to external process
- **kwargs: Additional arguments including task and cluster configuration
Returns:
PipesClientCompletedInvocation: Results from external process execution
"""Component for injecting Dagster context data into external Databricks processes via DBFS.
class PipesDbfsContextInjector(PipesContextInjector):
"""A context injector that injects context into a Databricks job by writing a JSON file to DBFS."""
def __init__(self, *, client: WorkspaceClient):
"""
Initialize the DBFS context injector.
Parameters:
- client: Databricks WorkspaceClient for DBFS operations
"""
def inject_context(self, context: PipesContextData) -> Iterator[PipesParams]:
"""
Inject context to external environment by writing it to an automatically-generated
DBFS temporary file as JSON and exposing the path to the file.
Parameters:
- context: The context data to inject
Yields:
PipesParams: Parameters that can be used by the external process to locate and
load the injected context data
"""Component for reading messages and logs from external Databricks processes via DBFS.
class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
"""Message reader that reads messages by periodically reading message chunks from an
automatically-generated temporary directory on DBFS."""
def __init__(
self,
*,
interval: float = 10,
client: WorkspaceClient,
include_stdio_in_messages: bool = False,
log_readers: Optional[Sequence[PipesLogReader]] = None,
):
"""
Initialize the DBFS message reader.
Parameters:
- interval: Interval in seconds between attempts to download a chunk
- client: Databricks WorkspaceClient for DBFS operations
- include_stdio_in_messages: Whether to send stdout/stderr to Dagster via Pipes messages
- log_readers: Additional log readers for logs on DBFS
"""
def get_params(self) -> Iterator[PipesParams]:
"""
Get parameters for the external process to write messages.
Yields:
PipesParams: Parameters including DBFS path for message writing
"""
def messages_are_readable(self, params: PipesParams) -> bool:
"""
Check if messages are available to read from DBFS.
Parameters:
- params: Parameters from get_params()
Returns:
bool: True if messages can be read
"""
def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]:
"""
Download a specific message chunk from DBFS.
Parameters:
- index: Index of the message chunk to download
- params: Parameters from get_params()
Returns:
Optional[str]: Message chunk content or None if not available
"""Component for reading execution logs from DBFS files, typically used for capturing stdout/stderr from Databricks clusters.
class PipesDbfsLogReader(PipesChunkedLogReader):
"""Reader that reads a log file from DBFS."""
def __init__(
self,
*,
interval: float = 10,
remote_log_name: Literal["stdout", "stderr"],
target_stream: TextIO,
client: WorkspaceClient,
debug_info: Optional[str] = None,
):
"""
Initialize the DBFS log reader.
Parameters:
- interval: Interval in seconds between attempts to download a log chunk
- remote_log_name: The name of the log file to read ("stdout" or "stderr")
- target_stream: The stream to which to forward log chunks that have been read
- client: Databricks WorkspaceClient for DBFS operations
- debug_info: Optional message containing debug information about the log reader
"""Client variant for Databricks serverless environments with specialized handling.
class PipesDatabricksServerlessClient(BasePipesDatabricksClient):
"""Pipes client for Databricks serverless environments."""from dagster import asset, AssetExecutionContext
from dagster_databricks import (
PipesDatabricksClient,
PipesDbfsContextInjector,
PipesDbfsMessageReader,
DatabricksClientResource,
)
databricks_resource = DatabricksClientResource(
host="https://your-workspace.cloud.databricks.com",
token={"env": "DATABRICKS_TOKEN"}
)
@asset
def my_databricks_asset(context: AssetExecutionContext):
client = PipesDatabricksClient(
client=context.resources.databricks.workspace_client,
context_injector=PipesDbfsContextInjector(
client=context.resources.databricks.workspace_client
),
message_reader=PipesDbfsMessageReader(
client=context.resources.databricks.workspace_client
),
)
return client.run(
context=context,
task={
"notebook_task": {
"notebook_path": "/Workspace/Users/user@example.com/my_notebook",
"base_parameters": {
"input_table": "raw_data",
"output_table": "processed_data"
}
}
},
cluster={"existing": "your-cluster-id"}
).get_results()@asset
def advanced_databricks_processing(context: AssetExecutionContext):
# Configure message reader with logging
message_reader = PipesDbfsMessageReader(
client=context.resources.databricks.workspace_client,
interval=5, # Check for messages every 5 seconds
include_stdio_in_messages=True, # Include stdout/stderr
)
# Configure context injector
context_injector = PipesDbfsContextInjector(
client=context.resources.databricks.workspace_client
)
client = PipesDatabricksClient(
client=context.resources.databricks.workspace_client,
context_injector=context_injector,
message_reader=message_reader,
poll_interval_seconds=10,
forward_termination=True,
)
# Run with comprehensive configuration
result = client.run(
context=context,
extras={"custom_param": "custom_value"}, # Additional context
task={
"spark_python_task": {
"python_file": "dbfs:/FileStore/scripts/my_script.py",
"parameters": ["--mode", "production", "--debug", "false"]
}
},
cluster={
"new": {
"spark_version": "11.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
"custom_tags": {"project": "data-pipeline"}
}
},
libraries=[
{"pypi": {"package": "dagster-pipes"}},
{"pypi": {"package": "pandas>=1.5.0"}},
],
timeout_seconds=3600,
)
return result.get_results()Example of external Python script that uses Pipes for communication:
# my_script.py - runs on Databricks
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter
def main():
# Initialize Pipes communication
with open_dagster_pipes(
context_loader=PipesDbfsContextLoader(),
message_writer=PipesDbfsMessageWriter(),
) as pipes:
# Access Dagster context
dagster_context = pipes.get_dagster_context()
# Log messages back to Dagster
pipes.log.info("Starting external processing")
# Perform work
import pandas as pd
df = pd.read_parquet("/dbfs/data/input.parquet")
# Report progress
pipes.log.info(f"Loaded {len(df)} rows")
# Process data
result_df = df.groupby("category").agg({"value": "sum"})
# Save results
result_df.to_parquet("/dbfs/data/output.parquet")
# Report asset materialization
pipes.report_asset_materialization(
asset_key="processed_data",
metadata={
"num_rows": len(result_df),
"num_categories": result_df["category"].nunique(),
}
)
# Return final results
return {"status": "success", "rows_processed": len(df)}
if __name__ == "__main__":
main()Example of using Pipes in a Databricks notebook:
# Cell 1: Initialize Pipes
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter
context_loader = PipesDbfsContextLoader()
message_writer = PipesDbfsMessageWriter()
pipes = open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
).__enter__()
# Cell 2: Access Dagster context
dagster_context = pipes.get_dagster_context()
pipes.log.info("Notebook execution started")
# Cell 3: Data processing
import pandas as pd
import numpy as np
# Load data
df = spark.read.table("my_catalog.my_schema.input_table").toPandas()
pipes.log.info(f"Loaded {len(df)} rows from input table")
# Process data
df["processed_value"] = df["raw_value"] * 2
result_df = df.groupby("category").agg({"processed_value": ["sum", "mean", "count"]})
# Cell 4: Save and report results
# Save to Delta table
spark.createDataFrame(result_df).write.mode("overwrite").saveAsTable("my_catalog.my_schema.output_table")
# Report to Dagster
pipes.report_asset_materialization(
asset_key="processed_output",
metadata={
"num_categories": len(result_df),
"total_value": float(result_df[("processed_value", "sum")].sum()),
}
)
pipes.log.info("Processing completed successfully")
# Cell 5: Cleanup
pipes.__exit__(None, None, None)@asset
def robust_databricks_processing(context: AssetExecutionContext):
client = PipesDatabricksClient(
client=context.resources.databricks.workspace_client,
context_injector=PipesDbfsContextInjector(
client=context.resources.databricks.workspace_client
),
message_reader=PipesDbfsMessageReader(
client=context.resources.databricks.workspace_client,
include_stdio_in_messages=True, # Capture errors in logs
),
)
try:
result = client.run(
context=context,
task={
"notebook_task": {
"notebook_path": "/path/to/notebook",
}
},
cluster={"existing": "cluster-id"},
timeout_seconds=1800, # 30 minute timeout
)
# Check for errors in the result
if result.get_exit_code() != 0:
context.log.error(f"External process failed with exit code: {result.get_exit_code()}")
raise Exception("External Databricks process failed")
return result.get_results()
except Exception as e:
context.log.error(f"Databricks execution failed: {str(e)}")
# Optionally retrieve logs for debugging
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks