CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

databricks-sdk-patterns

tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill databricks-sdk-patterns
github.com/jeremylongshore/claude-code-plugins-plus-skills

Apply production-ready Databricks SDK patterns for Python and REST API. Use when implementing Databricks integrations, refactoring SDK usage, or establishing team coding standards for Databricks. Trigger with phrases like "databricks SDK patterns", "databricks best practices", "databricks code patterns", "idiomatic databricks".

Review Score

78%

Validation Score

12/16

Implementation Score

65%

Activation Score

90%

Databricks SDK Patterns

Overview

Production-ready patterns for Databricks SDK usage in Python.

Prerequisites

  • Completed databricks-install-auth setup
  • Familiarity with async/await patterns
  • Understanding of error handling best practices

Instructions

Step 1: Implement Singleton Pattern

# src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os

@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
    """Get singleton WorkspaceClient instance."""
    return WorkspaceClient(profile=profile)

# Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}

def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
    """Get or create client for specific workspace."""
    if workspace_name not in _clients:
        config = get_workspace_config(workspace_name)
        _clients[workspace_name] = WorkspaceClient(
            host=config["host"],
            token=config["token"]
        )
    return _clients[workspace_name]

def get_workspace_config(name: str) -> dict:
    """Load workspace config from environment."""
    return {
        "host": os.environ[f"DATABRICKS_{name.upper()}_HOST"],
        "token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"],
    }

Step 2: Add Error Handling Wrapper

# src/databricks/errors.py
from databricks.sdk.errors import (
    DatabricksError,
    NotFound,
    ResourceAlreadyExists,
    PermissionDenied,
    ResourceConflict,
    BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")

@dataclass
class Result(Generic[T]):
    """Result wrapper for API operations."""
    data: Optional[T] = None
    error: Optional[Exception] = None

    @property
    def is_success(self) -> bool:
        return self.error is None

    def unwrap(self) -> T:
        if self.error:
            raise self.error
        return self.data

def safe_databricks_call(operation):
    """Decorator for safe Databricks API calls."""
    def wrapper(*args, **kwargs) -> Result:
        try:
            result = operation(*args, **kwargs)
            return Result(data=result)
        except NotFound as e:
            logger.warning(f"Resource not found: {e}")
            return Result(error=e)
        except ResourceAlreadyExists as e:
            logger.warning(f"Resource already exists: {e}")
            return Result(error=e)
        except PermissionDenied as e:
            logger.error(f"Permission denied: {e}")
            return Result(error=e)
        except DatabricksError as e:
            logger.error(f"Databricks error: {e.error_code} - {e.message}")
            return Result(error=e)
    return wrapper

Step 3: Implement Retry Logic with Backoff

# src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
    DatabricksError,
    TemporarilyUnavailable,
    TooManyRequests,
)

T = TypeVar("T")

def with_retry(
    operation: Callable[[], T],
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: float = 0.5,
) -> T:
    """Execute operation with exponential backoff retry."""
    import random

    for attempt in range(max_retries + 1):
        try:
            return operation()
        except (TemporarilyUnavailable, TooManyRequests) as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            delay += random.uniform(0, jitter)
            print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
            time.sleep(delay)
        except DatabricksError as e:
            # Don't retry client errors
            if e.error_code and e.error_code.startswith("4"):
                raise
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            time.sleep(delay)
    raise RuntimeError("Unreachable")

Step 4: Context Manager for Clusters

# src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time

@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
    """Context manager for cluster lifecycle."""
    try:
        # Start cluster if not running
        cluster = w.clusters.get(cluster_id)
        if cluster.state != State.RUNNING:
            print(f"Starting cluster {cluster_id}...")
            w.clusters.start(cluster_id)
            w.clusters.wait_get_cluster_running(cluster_id)

        yield cluster_id
    finally:
        # Optionally terminate after use
        pass  # Or: w.clusters.delete(cluster_id)

@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
    """Create temporary cluster that auto-deletes."""
    cluster = w.clusters.create_and_wait(**spec)
    try:
        yield cluster.cluster_id
    finally:
        w.clusters.permanent_delete(cluster.cluster_id)

Step 5: Type-Safe Job Builders

# src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    Task, NotebookTask, PythonWheelTask,
    JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class JobBuilder:
    """Fluent builder for Databricks jobs."""
    name: str
    tasks: list[Task] = field(default_factory=list)
    job_clusters: list[JobCluster] = field(default_factory=list)
    tags: dict[str, str] = field(default_factory=dict)

    def add_notebook_task(
        self,
        task_key: str,
        notebook_path: str,
        cluster_key: str,
        parameters: Optional[dict] = None,
    ) -> "JobBuilder":
        """Add a notebook task."""
        self.tasks.append(Task(
            task_key=task_key,
            job_cluster_key=cluster_key,
            notebook_task=NotebookTask(
                notebook_path=notebook_path,
                base_parameters=parameters or {},
            )
        ))
        return self

    def add_wheel_task(
        self,
        task_key: str,
        package_name: str,
        entry_point: str,
        cluster_key: str,
        parameters: Optional[list[str]] = None,
    ) -> "JobBuilder":
        """Add a Python wheel task."""
        self.tasks.append(Task(
            task_key=task_key,
            job_cluster_key=cluster_key,
            python_wheel_task=PythonWheelTask(
                package_name=package_name,
                entry_point=entry_point,
                parameters=parameters or [],
            )
        ))
        return self

    def create(self, w: WorkspaceClient) -> int:
        """Create the job and return job_id."""
        job = w.jobs.create(
            name=self.name,
            tasks=self.tasks,
            job_clusters=self.job_clusters,
            tags=self.tags,
        )
        return job.job_id

# Usage
job_id = (
    JobBuilder("etl-pipeline")
    .add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
    .add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
    .create(w)
)

Output

  • Type-safe client singleton
  • Robust error handling with structured logging
  • Automatic retry with exponential backoff
  • Fluent job builder pattern

Error Handling

PatternUse CaseBenefit
Result wrapperAll API callsType-safe error handling
Retry logicTransient failuresImproves reliability
Context managersCluster lifecycleResource cleanup
BuildersJob creationType safety and fluency

Examples

Factory Pattern (Multi-Tenant)

from functools import lru_cache

@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
    """Get workspace client for tenant."""
    config = get_tenant_config(tenant_id)
    return WorkspaceClient(
        host=config.workspace_url,
        token=config.token,
    )

Async Operations

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_list_clusters(w: WorkspaceClient):
    """Async wrapper for cluster listing."""
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as executor:
        clusters = await loop.run_in_executor(
            executor,
            lambda: list(w.clusters.list())
        )
    return clusters

Resources

  • Databricks SDK for Python
  • API Reference
  • Error Handling

Next Steps

Apply patterns in databricks-core-workflow-a for Delta Lake ETL.