CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-databricks

Package for Databricks-specific Dagster framework op and resource components.

Pending
Overview
Eval results
Files

core-client.mddocs/

Core Client API

Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. The DatabricksClient serves as the foundation for all Databricks interactions and supports multiple authentication methods.

Capabilities

DatabricksClient

The primary client class that wraps the Databricks REST API with authentication and essential operations for job management and file handling.

class DatabricksClient:
    """A thin wrapper over the Databricks REST API."""
    
    def __init__(
        self,
        host: Optional[str] = None,
        token: Optional[str] = None,
        oauth_client_id: Optional[str] = None,
        oauth_client_secret: Optional[str] = None,
        azure_client_id: Optional[str] = None,
        azure_client_secret: Optional[str] = None,
        azure_tenant_id: Optional[str] = None,
        workspace_id: Optional[str] = None,
    ):
        """
        Initialize the Databricks client with authentication credentials.
        
        Parameters:
        - host: Databricks workspace URL (e.g., https://your-workspace.cloud.databricks.com)
        - token: Personal access token for authentication
        - oauth_client_id: OAuth client ID for service principal authentication
        - oauth_client_secret: OAuth client secret for service principal authentication
        - azure_client_id: Azure service principal client ID
        - azure_client_secret: Azure service principal client secret
        - azure_tenant_id: Azure tenant ID
        - workspace_id: Databricks workspace ID (deprecated)
        """

Workspace Client Access

Access to the underlying Databricks SDK WorkspaceClient for advanced operations.

@property
def workspace_client(self) -> WorkspaceClient:
    """
    Retrieve a reference to the underlying Databricks Workspace client.
    
    Returns:
    WorkspaceClient: The authenticated Databricks SDK Workspace Client
    """

File Operations

DBFS (Databricks File System) operations for reading and writing files to Databricks storage.

def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes:
    """
    Read a file from DBFS to a byte string.
    
    Parameters:
    - dbfs_path: Path to file in DBFS (can include 'dbfs://' prefix)
    - block_size: Block size for reading in bytes (default 1MB)
    
    Returns:
    bytes: File contents as byte string
    """

def put_file(
    self, 
    file_obj: IO, 
    dbfs_path: str, 
    overwrite: bool = False, 
    block_size: int = 1024**2
) -> None:
    """
    Upload an arbitrary large file to DBFS.
    
    Parameters:
    - file_obj: File-like object to upload
    - dbfs_path: Destination path in DBFS (can include 'dbfs://' prefix)
    - overwrite: Whether to overwrite existing file
    - block_size: Block size for uploading in bytes (default 1MB)
    """

Run State Management

Methods for monitoring and polling Databricks job run states.

def get_run_state(self, databricks_run_id: int) -> DatabricksRunState:
    """
    Get the state of a run by Databricks run ID.
    
    Parameters:
    - databricks_run_id: ID of the Databricks run
    
    Returns:
    DatabricksRunState: Object containing lifecycle state, result state, and message
    """

def poll_run_state(
    self,
    logger: logging.Logger,
    start_poll_time: float,
    databricks_run_id: int,
    max_wait_time_sec: float,
    verbose_logs: bool = True,
) -> bool:
    """
    Poll the state of a run once and return whether it completed successfully.
    
    Parameters:
    - logger: Logger for status messages
    - start_poll_time: Start time for timeout calculation
    - databricks_run_id: ID of the Databricks run
    - max_wait_time_sec: Maximum time to wait before timing out
    - verbose_logs: Whether to log detailed status messages
    
    Returns:
    bool: True if run completed successfully, False if still running
    
    Raises:
    DatabricksError: If run failed or timed out
    """

def wait_for_run_to_complete(
    self,
    logger: logging.Logger,
    databricks_run_id: int,
    poll_interval_sec: float,
    max_wait_time_sec: float,
    verbose_logs: bool = True,
) -> None:
    """
    Wait for a Databricks run to complete by polling its state.
    
    Parameters:
    - logger: Logger for status messages
    - databricks_run_id: ID of the Databricks run
    - poll_interval_sec: Time between polling attempts
    - max_wait_time_sec: Maximum time to wait before timing out
    - verbose_logs: Whether to log detailed status messages
    
    Raises:
    DatabricksError: If run failed or timed out
    """

Usage Examples

Basic Client Setup

from dagster_databricks import DatabricksClient

# Using personal access token
client = DatabricksClient(
    host="https://your-workspace.cloud.databricks.com",
    token="your-access-token"
)

# Using OAuth service principal
client = DatabricksClient(
    host="https://your-workspace.cloud.databricks.com",
    oauth_client_id="your-client-id",
    oauth_client_secret="your-client-secret"
)

# Using Azure service principal
client = DatabricksClient(
    host="https://your-workspace.cloud.databricks.com",
    azure_client_id="your-azure-client-id",
    azure_client_secret="your-azure-client-secret",
    azure_tenant_id="your-azure-tenant-id"
)

File Operations

# Read a file from DBFS
file_contents = client.read_file("/path/to/file.txt")
text_content = file_contents.decode('utf-8')

# Upload a file to DBFS
with open("local_file.txt", "rb") as f:
    client.put_file(f, "/dbfs/path/to/destination.txt", overwrite=True)

Run Monitoring

import logging

logger = logging.getLogger(__name__)

# Get current state of a run
run_state = client.get_run_state(run_id=12345)
print(f"Run state: {run_state.life_cycle_state}")
print(f"Result: {run_state.result_state}")

# Wait for run to complete
client.wait_for_run_to_complete(
    logger=logger,
    databricks_run_id=12345,
    poll_interval_sec=10,
    max_wait_time_sec=3600,  # 1 hour timeout
    verbose_logs=True
)

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-databricks

docs

core-client.md

index.md

job-management.md

op-factories.md

pipes-integration.md

pyspark-step-launcher.md

resource-management.md

tile.json