Package for Databricks-specific Dagster framework op and resource components.
—
Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. The DatabricksClient serves as the foundation for all Databricks interactions and supports multiple authentication methods.
The primary client class that wraps the Databricks REST API with authentication and essential operations for job management and file handling.
class DatabricksClient:
"""A thin wrapper over the Databricks REST API."""
def __init__(
self,
host: Optional[str] = None,
token: Optional[str] = None,
oauth_client_id: Optional[str] = None,
oauth_client_secret: Optional[str] = None,
azure_client_id: Optional[str] = None,
azure_client_secret: Optional[str] = None,
azure_tenant_id: Optional[str] = None,
workspace_id: Optional[str] = None,
):
"""
Initialize the Databricks client with authentication credentials.
Parameters:
- host: Databricks workspace URL (e.g., https://your-workspace.cloud.databricks.com)
- token: Personal access token for authentication
- oauth_client_id: OAuth client ID for service principal authentication
- oauth_client_secret: OAuth client secret for service principal authentication
- azure_client_id: Azure service principal client ID
- azure_client_secret: Azure service principal client secret
- azure_tenant_id: Azure tenant ID
- workspace_id: Databricks workspace ID (deprecated)
"""Access to the underlying Databricks SDK WorkspaceClient for advanced operations.
@property
def workspace_client(self) -> WorkspaceClient:
"""
Retrieve a reference to the underlying Databricks Workspace client.
Returns:
WorkspaceClient: The authenticated Databricks SDK Workspace Client
"""DBFS (Databricks File System) operations for reading and writing files to Databricks storage.
def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes:
"""
Read a file from DBFS to a byte string.
Parameters:
- dbfs_path: Path to file in DBFS (can include 'dbfs://' prefix)
- block_size: Block size for reading in bytes (default 1MB)
Returns:
bytes: File contents as byte string
"""
def put_file(
self,
file_obj: IO,
dbfs_path: str,
overwrite: bool = False,
block_size: int = 1024**2
) -> None:
"""
Upload an arbitrary large file to DBFS.
Parameters:
- file_obj: File-like object to upload
- dbfs_path: Destination path in DBFS (can include 'dbfs://' prefix)
- overwrite: Whether to overwrite existing file
- block_size: Block size for uploading in bytes (default 1MB)
"""Methods for monitoring and polling Databricks job run states.
def get_run_state(self, databricks_run_id: int) -> DatabricksRunState:
"""
Get the state of a run by Databricks run ID.
Parameters:
- databricks_run_id: ID of the Databricks run
Returns:
DatabricksRunState: Object containing lifecycle state, result state, and message
"""
def poll_run_state(
self,
logger: logging.Logger,
start_poll_time: float,
databricks_run_id: int,
max_wait_time_sec: float,
verbose_logs: bool = True,
) -> bool:
"""
Poll the state of a run once and return whether it completed successfully.
Parameters:
- logger: Logger for status messages
- start_poll_time: Start time for timeout calculation
- databricks_run_id: ID of the Databricks run
- max_wait_time_sec: Maximum time to wait before timing out
- verbose_logs: Whether to log detailed status messages
Returns:
bool: True if run completed successfully, False if still running
Raises:
DatabricksError: If run failed or timed out
"""
def wait_for_run_to_complete(
self,
logger: logging.Logger,
databricks_run_id: int,
poll_interval_sec: float,
max_wait_time_sec: float,
verbose_logs: bool = True,
) -> None:
"""
Wait for a Databricks run to complete by polling its state.
Parameters:
- logger: Logger for status messages
- databricks_run_id: ID of the Databricks run
- poll_interval_sec: Time between polling attempts
- max_wait_time_sec: Maximum time to wait before timing out
- verbose_logs: Whether to log detailed status messages
Raises:
DatabricksError: If run failed or timed out
"""from dagster_databricks import DatabricksClient
# Using personal access token
client = DatabricksClient(
host="https://your-workspace.cloud.databricks.com",
token="your-access-token"
)
# Using OAuth service principal
client = DatabricksClient(
host="https://your-workspace.cloud.databricks.com",
oauth_client_id="your-client-id",
oauth_client_secret="your-client-secret"
)
# Using Azure service principal
client = DatabricksClient(
host="https://your-workspace.cloud.databricks.com",
azure_client_id="your-azure-client-id",
azure_client_secret="your-azure-client-secret",
azure_tenant_id="your-azure-tenant-id"
)# Read a file from DBFS
file_contents = client.read_file("/path/to/file.txt")
text_content = file_contents.decode('utf-8')
# Upload a file to DBFS
with open("local_file.txt", "rb") as f:
client.put_file(f, "/dbfs/path/to/destination.txt", overwrite=True)import logging
logger = logging.getLogger(__name__)
# Get current state of a run
run_state = client.get_run_state(run_id=12345)
print(f"Run state: {run_state.life_cycle_state}")
print(f"Result: {run_state.result_state}")
# Wait for run to complete
client.wait_for_run_to_complete(
logger=logger,
databricks_run_id=12345,
poll_interval_sec=10,
max_wait_time_sec=3600, # 1 hour timeout
verbose_logs=True
)Install with Tessl CLI
npx tessl i tessl/pypi-dagster-databricks