tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill databricks-sdk-patternsApply production-ready Databricks SDK patterns for Python and REST API. Use when implementing Databricks integrations, refactoring SDK usage, or establishing team coding standards for Databricks. Trigger with phrases like "databricks SDK patterns", "databricks best practices", "databricks code patterns", "idiomatic databricks".
Review Score
78%
Validation Score
12/16
Implementation Score
65%
Activation Score
90%
Production-ready patterns for Databricks SDK usage in Python.
databricks-install-auth setup# src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os
@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
"""Get singleton WorkspaceClient instance."""
return WorkspaceClient(profile=profile)
# Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
"""Get or create client for specific workspace."""
if workspace_name not in _clients:
config = get_workspace_config(workspace_name)
_clients[workspace_name] = WorkspaceClient(
host=config["host"],
token=config["token"]
)
return _clients[workspace_name]
def get_workspace_config(name: str) -> dict:
"""Load workspace config from environment."""
return {
"host": os.environ[f"DATABRICKS_{name.upper()}_HOST"],
"token": os.environ[f"DATABRICKS_{name.upper()}_TOKEN"],
}# src/databricks/errors.py
from databricks.sdk.errors import (
DatabricksError,
NotFound,
ResourceAlreadyExists,
PermissionDenied,
ResourceConflict,
BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
"""Result wrapper for API operations."""
data: Optional[T] = None
error: Optional[Exception] = None
@property
def is_success(self) -> bool:
return self.error is None
def unwrap(self) -> T:
if self.error:
raise self.error
return self.data
def safe_databricks_call(operation):
"""Decorator for safe Databricks API calls."""
def wrapper(*args, **kwargs) -> Result:
try:
result = operation(*args, **kwargs)
return Result(data=result)
except NotFound as e:
logger.warning(f"Resource not found: {e}")
return Result(error=e)
except ResourceAlreadyExists as e:
logger.warning(f"Resource already exists: {e}")
return Result(error=e)
except PermissionDenied as e:
logger.error(f"Permission denied: {e}")
return Result(error=e)
except DatabricksError as e:
logger.error(f"Databricks error: {e.error_code} - {e.message}")
return Result(error=e)
return wrapper# src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TemporarilyUnavailable,
TooManyRequests,
)
T = TypeVar("T")
def with_retry(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5,
) -> T:
"""Execute operation with exponential backoff retry."""
import random
for attempt in range(max_retries + 1):
try:
return operation()
except (TemporarilyUnavailable, TooManyRequests) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
raise RuntimeError("Unreachable")# src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time
@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
"""Context manager for cluster lifecycle."""
try:
# Start cluster if not running
cluster = w.clusters.get(cluster_id)
if cluster.state != State.RUNNING:
print(f"Starting cluster {cluster_id}...")
w.clusters.start(cluster_id)
w.clusters.wait_get_cluster_running(cluster_id)
yield cluster_id
finally:
# Optionally terminate after use
pass # Or: w.clusters.delete(cluster_id)
@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
"""Create temporary cluster that auto-deletes."""
cluster = w.clusters.create_and_wait(**spec)
try:
yield cluster.cluster_id
finally:
w.clusters.permanent_delete(cluster.cluster_id)# src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, PythonWheelTask,
JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class JobBuilder:
"""Fluent builder for Databricks jobs."""
name: str
tasks: list[Task] = field(default_factory=list)
job_clusters: list[JobCluster] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
self,
task_key: str,
notebook_path: str,
cluster_key: str,
parameters: Optional[dict] = None,
) -> "JobBuilder":
"""Add a notebook task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
notebook_task=NotebookTask(
notebook_path=notebook_path,
base_parameters=parameters or {},
)
))
return self
def add_wheel_task(
self,
task_key: str,
package_name: str,
entry_point: str,
cluster_key: str,
parameters: Optional[list[str]] = None,
) -> "JobBuilder":
"""Add a Python wheel task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
python_wheel_task=PythonWheelTask(
package_name=package_name,
entry_point=entry_point,
parameters=parameters or [],
)
))
return self
def create(self, w: WorkspaceClient) -> int:
"""Create the job and return job_id."""
job = w.jobs.create(
name=self.name,
tasks=self.tasks,
job_clusters=self.job_clusters,
tags=self.tags,
)
return job.job_id
# Usage
job_id = (
JobBuilder("etl-pipeline")
.add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
.add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
.create(w)
)| Pattern | Use Case | Benefit |
|---|---|---|
| Result wrapper | All API calls | Type-safe error handling |
| Retry logic | Transient failures | Improves reliability |
| Context managers | Cluster lifecycle | Resource cleanup |
| Builders | Job creation | Type safety and fluency |
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
"""Get workspace client for tenant."""
config = get_tenant_config(tenant_id)
return WorkspaceClient(
host=config.workspace_url,
token=config.token,
)import asyncio
from concurrent.futures import ThreadPoolExecutor
async def async_list_clusters(w: WorkspaceClient):
"""Async wrapper for cluster listing."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
clusters = await loop.run_in_executor(
executor,
lambda: list(w.clusters.list())
)
return clustersApply patterns in databricks-core-workflow-a for Delta Lake ETL.