CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

pipes-integration.mddocs/

Pipes Integration

Bidirectional communication system for executing external code on Databricks with full context injection and result collection. Dagster Pipes enables seamless integration between Dagster orchestration and external Databricks workloads, supporting both standard and serverless environments.

Capabilities

PipesDatabricksClient

Main client for running external code on Databricks with bidirectional communication through the Dagster Pipes protocol.

class PipesDatabricksClient(BasePipesDatabricksClient):
    """Pipes client for running external code on Databricks with bidirectional communication."""
    
    def __init__(
        self,
        client: WorkspaceClient,
        context_injector: Optional[PipesContextInjector] = None,
        message_reader: Optional[PipesMessageReader] = None,
        poll_interval_seconds: float = 5,
        forward_termination: bool = True,
    ):
        """
        Initialize the Pipes Databricks client.
        
        Parameters:
        - client: Databricks WorkspaceClient for API interactions
        - context_injector: Component for injecting Dagster context into external process
        - message_reader: Component for reading messages from external process
        - poll_interval_seconds: How often to poll for job completion
        - forward_termination: Whether to forward termination signals to external process
        """

    def run(
        self,
        *,
        context: Union[OpExecutionContext, AssetExecutionContext],
        extras: Optional[PipesExtras] = None,
        **kwargs,
    ) -> PipesClientCompletedInvocation:
        """
        Execute external code on Databricks with bidirectional communication.
        
        Parameters:
        - context: Dagster execution context (op or asset)
        - extras: Additional context data to pass to external process
        - **kwargs: Additional arguments including task and cluster configuration
        
        Returns:
        PipesClientCompletedInvocation: Results from external process execution
        """

Context Injection

Component for injecting Dagster context data into external Databricks processes via DBFS.

class PipesDbfsContextInjector(PipesContextInjector):
    """A context injector that injects context into a Databricks job by writing a JSON file to DBFS."""
    
    def __init__(self, *, client: WorkspaceClient):
        """
        Initialize the DBFS context injector.
        
        Parameters:
        - client: Databricks WorkspaceClient for DBFS operations
        """

    def inject_context(self, context: PipesContextData) -> Iterator[PipesParams]:
        """
        Inject context to external environment by writing it to an automatically-generated
        DBFS temporary file as JSON and exposing the path to the file.
        
        Parameters:
        - context: The context data to inject
        
        Yields:
        PipesParams: Parameters that can be used by the external process to locate and
            load the injected context data
        """

Message Reading

Component for reading messages and logs from external Databricks processes via DBFS.

class PipesDbfsMessageReader(PipesBlobStoreMessageReader):
    """Message reader that reads messages by periodically reading message chunks from an
    automatically-generated temporary directory on DBFS."""
    
    def __init__(
        self,
        *,
        interval: float = 10,
        client: WorkspaceClient,
        include_stdio_in_messages: bool = False,
        log_readers: Optional[Sequence[PipesLogReader]] = None,
    ):
        """
        Initialize the DBFS message reader.
        
        Parameters:
        - interval: Interval in seconds between attempts to download a chunk
        - client: Databricks WorkspaceClient for DBFS operations
        - include_stdio_in_messages: Whether to send stdout/stderr to Dagster via Pipes messages
        - log_readers: Additional log readers for logs on DBFS
        """

    def get_params(self) -> Iterator[PipesParams]:
        """
        Get parameters for the external process to write messages.
        
        Yields:
        PipesParams: Parameters including DBFS path for message writing
        """

    def messages_are_readable(self, params: PipesParams) -> bool:
        """
        Check if messages are available to read from DBFS.
        
        Parameters:
        - params: Parameters from get_params()
        
        Returns:
        bool: True if messages can be read
        """

    def download_messages_chunk(self, index: int, params: PipesParams) -> Optional[str]:
        """
        Download a specific message chunk from DBFS.
        
        Parameters:
        - index: Index of the message chunk to download
        - params: Parameters from get_params()
        
        Returns:
        Optional[str]: Message chunk content or None if not available
        """

Log Reading

Component for reading execution logs from DBFS files, typically used for capturing stdout/stderr from Databricks clusters.

class PipesDbfsLogReader(PipesChunkedLogReader):
    """Reader that reads a log file from DBFS."""
    
    def __init__(
        self,
        *,
        interval: float = 10,
        remote_log_name: Literal["stdout", "stderr"],
        target_stream: TextIO,
        client: WorkspaceClient,
        debug_info: Optional[str] = None,
    ):
        """
        Initialize the DBFS log reader.
        
        Parameters:
        - interval: Interval in seconds between attempts to download a log chunk
        - remote_log_name: The name of the log file to read ("stdout" or "stderr")
        - target_stream: The stream to which to forward log chunks that have been read
        - client: Databricks WorkspaceClient for DBFS operations
        - debug_info: Optional message containing debug information about the log reader
        """

Serverless Support

Client variant for Databricks serverless environments with specialized handling.

class PipesDatabricksServerlessClient(BasePipesDatabricksClient):
    """Pipes client for Databricks serverless environments."""

Usage Examples

Basic Pipes Setup

from dagster import asset, AssetExecutionContext
from dagster_databricks import (
    PipesDatabricksClient,
    PipesDbfsContextInjector,
    PipesDbfsMessageReader,
    DatabricksClientResource,
)

databricks_resource = DatabricksClientResource(
    host="https://your-workspace.cloud.databricks.com",
    token={"env": "DATABRICKS_TOKEN"}
)

@asset
def my_databricks_asset(context: AssetExecutionContext):
    client = PipesDatabricksClient(
        client=context.resources.databricks.workspace_client,
        context_injector=PipesDbfsContextInjector(
            client=context.resources.databricks.workspace_client
        ),
        message_reader=PipesDbfsMessageReader(
            client=context.resources.databricks.workspace_client
        ),
    )
    
    return client.run(
        context=context,
        task={
            "notebook_task": {
                "notebook_path": "/Workspace/Users/user@example.com/my_notebook",
                "base_parameters": {
                    "input_table": "raw_data",
                    "output_table": "processed_data"
                }
            }
        },
        cluster={"existing": "your-cluster-id"}
    ).get_results()

Advanced Pipes Configuration

@asset
def advanced_databricks_processing(context: AssetExecutionContext):
    # Configure message reader with logging
    message_reader = PipesDbfsMessageReader(
        client=context.resources.databricks.workspace_client,
        interval=5,  # Check for messages every 5 seconds
        include_stdio_in_messages=True,  # Include stdout/stderr
    )
    
    # Configure context injector
    context_injector = PipesDbfsContextInjector(
        client=context.resources.databricks.workspace_client
    )
    
    client = PipesDatabricksClient(
        client=context.resources.databricks.workspace_client,
        context_injector=context_injector,
        message_reader=message_reader,
        poll_interval_seconds=10,
        forward_termination=True,
    )
    
    # Run with comprehensive configuration
    result = client.run(
        context=context,
        extras={"custom_param": "custom_value"},  # Additional context
        task={
            "spark_python_task": {
                "python_file": "dbfs:/FileStore/scripts/my_script.py",
                "parameters": ["--mode", "production", "--debug", "false"]
            }
        },
        cluster={
            "new": {
                "spark_version": "11.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
                "custom_tags": {"project": "data-pipeline"}
            }
        },
        libraries=[
            {"pypi": {"package": "dagster-pipes"}},
            {"pypi": {"package": "pandas>=1.5.0"}},
        ],
        timeout_seconds=3600,
    )
    
    return result.get_results()

External Script Integration

Example of external Python script that uses Pipes for communication:

# my_script.py - runs on Databricks
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter

def main():
    # Initialize Pipes communication
    with open_dagster_pipes(
        context_loader=PipesDbfsContextLoader(),
        message_writer=PipesDbfsMessageWriter(),
    ) as pipes:
        # Access Dagster context
        dagster_context = pipes.get_dagster_context()
        
        # Log messages back to Dagster
        pipes.log.info("Starting external processing")
        
        # Perform work
        import pandas as pd
        df = pd.read_parquet("/dbfs/data/input.parquet")
        
        # Report progress
        pipes.log.info(f"Loaded {len(df)} rows")
        
        # Process data
        result_df = df.groupby("category").agg({"value": "sum"})
        
        # Save results
        result_df.to_parquet("/dbfs/data/output.parquet")
        
        # Report asset materialization
        pipes.report_asset_materialization(
            asset_key="processed_data",
            metadata={
                "num_rows": len(result_df),
                "num_categories": result_df["category"].nunique(),
            }
        )
        
        # Return final results
        return {"status": "success", "rows_processed": len(df)}

if __name__ == "__main__":
    main()

Notebook Integration

Example of using Pipes in a Databricks notebook:

# Cell 1: Initialize Pipes
from dagster_pipes import open_dagster_pipes, PipesDbfsContextLoader, PipesDbfsMessageWriter

context_loader = PipesDbfsContextLoader()
message_writer = PipesDbfsMessageWriter()

pipes = open_dagster_pipes(
    context_loader=context_loader,
    message_writer=message_writer,
).__enter__()

# Cell 2: Access Dagster context
dagster_context = pipes.get_dagster_context()
pipes.log.info("Notebook execution started")

# Cell 3: Data processing
import pandas as pd
import numpy as np

# Load data
df = spark.read.table("my_catalog.my_schema.input_table").toPandas()
pipes.log.info(f"Loaded {len(df)} rows from input table")

# Process data
df["processed_value"] = df["raw_value"] * 2
result_df = df.groupby("category").agg({"processed_value": ["sum", "mean", "count"]})

# Cell 4: Save and report results
# Save to Delta table
spark.createDataFrame(result_df).write.mode("overwrite").saveAsTable("my_catalog.my_schema.output_table")

# Report to Dagster
pipes.report_asset_materialization(
    asset_key="processed_output",
    metadata={
        "num_categories": len(result_df),
        "total_value": float(result_df[("processed_value", "sum")].sum()),
    }
)

pipes.log.info("Processing completed successfully")

# Cell 5: Cleanup
pipes.__exit__(None, None, None)

Error Handling

@asset
def robust_databricks_processing(context: AssetExecutionContext):
    client = PipesDatabricksClient(
        client=context.resources.databricks.workspace_client,
        context_injector=PipesDbfsContextInjector(
            client=context.resources.databricks.workspace_client
        ),
        message_reader=PipesDbfsMessageReader(
            client=context.resources.databricks.workspace_client,
            include_stdio_in_messages=True,  # Capture errors in logs
        ),
    )
    
    try:
        result = client.run(
            context=context,
            task={
                "notebook_task": {
                    "notebook_path": "/path/to/notebook",
                }
            },
            cluster={"existing": "cluster-id"},
            timeout_seconds=1800,  # 30 minute timeout
        )
        
        # Check for errors in the result
        if result.get_exit_code() != 0:
            context.log.error(f"External process failed with exit code: {result.get_exit_code()}")
            raise Exception("External Databricks process failed")
        
        return result.get_results()
        
    except Exception as e:
        context.log.error(f"Databricks execution failed: {str(e)}")
        # Optionally retrieve logs for debugging
        raise

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json