CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

databricks-observability

tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill databricks-observability
github.com/jeremylongshore/claude-code-plugins-plus-skills

Set up comprehensive observability for Databricks with metrics, traces, and alerts. Use when implementing monitoring for Databricks jobs, setting up dashboards, or configuring alerting for pipeline health. Trigger with phrases like "databricks monitoring", "databricks metrics", "databricks observability", "monitor databricks", "databricks alerts", "databricks logging".

Review Score

78%

Validation Score

12/16

Implementation Score

65%

Activation Score

90%

Databricks Observability

Overview

Set up comprehensive observability for Databricks workloads.

Prerequisites

  • Access to system tables
  • SQL Warehouse for dashboards
  • Notification destinations configured
  • Alert recipients defined

Metrics Collection

Key Metrics

MetricSourceDescription
Job success ratesystem.lakeflow.job_run_timeline% of successful job runs
Job durationsystem.lakeflow.job_run_timelineRun time in minutes
Cluster utilizationsystem.compute.cluster_eventsCPU/memory usage
Data freshnesstable historyHours since last update
DBU consumptionsystem.billing.usageCost tracking

Instructions

Step 1: System Tables Access

-- Enable system tables (workspace admin)
-- Check available system tables
SELECT * FROM system.information_schema.tables
WHERE table_schema = 'billing' OR table_schema = 'lakeflow';

-- Job run history
SELECT
    job_id,
    job_name,
    run_id,
    start_time,
    end_time,
    result_state,
    error_message,
    (end_time - start_time) / 60000 as duration_minutes
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY start_time DESC;

-- Cluster events
SELECT
    cluster_id,
    timestamp,
    type,
    details
FROM system.compute.cluster_events
WHERE timestamp > current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp DESC;

Step 2: Create Monitoring Views

-- Job health summary view
CREATE OR REPLACE VIEW monitoring.job_health_summary AS
SELECT
    job_name,
    COUNT(*) as total_runs,
    SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) as successes,
    SUM(CASE WHEN result_state = 'FAILED' THEN 1 ELSE 0 END) as failures,
    ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate,
    AVG((end_time - start_time) / 60000) as avg_duration_minutes,
    PERCENTILE((end_time - start_time) / 60000, 0.95) as p95_duration_minutes,
    MAX(start_time) as last_run_time,
    MAX(CASE WHEN result_state = 'FAILED' THEN start_time END) as last_failure_time
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY job_name;

-- Data freshness view
CREATE OR REPLACE VIEW monitoring.data_freshness AS
SELECT
    table_catalog,
    table_schema,
    table_name,
    MAX(commit_timestamp) as last_update,
    TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) as hours_since_update,
    CASE
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 1 THEN 'FRESH'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 6 THEN 'RECENT'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 24 THEN 'STALE'
        ELSE 'VERY_STALE'
    END as freshness_status
FROM system.information_schema.table_history
GROUP BY table_catalog, table_schema, table_name;

-- Cost tracking view
CREATE OR REPLACE VIEW monitoring.daily_costs AS
SELECT
    DATE(usage_date) as date,
    workspace_id,
    sku_name,
    usage_type,
    SUM(usage_quantity) as total_dbus,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 30 DAYS
GROUP BY DATE(usage_date), workspace_id, sku_name, usage_type
ORDER BY date DESC, estimated_cost DESC;

Step 3: Configure Alerts

-- Alert: Job failure
CREATE ALERT job_failure_alert
AS SELECT
    job_name,
    run_id,
    error_message,
    start_time
FROM system.lakeflow.job_run_timeline
WHERE result_state = 'FAILED'
  AND start_time > current_timestamp() - INTERVAL 15 MINUTES
SCHEDULE CRON '*/15 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com'],
    webhook_destinations = ['slack-alerts']
);

-- Alert: Long-running jobs
CREATE ALERT long_running_job_alert
AS SELECT
    job_name,
    run_id,
    start_time,
    TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) as running_minutes
FROM system.lakeflow.job_run_timeline
WHERE end_time IS NULL
  AND TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) > 120
SCHEDULE CRON '*/30 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com']
);

-- Alert: Data freshness SLA breach
CREATE ALERT data_freshness_sla
AS SELECT
    table_name,
    hours_since_update
FROM monitoring.data_freshness
WHERE table_schema = 'gold'
  AND hours_since_update > 6
SCHEDULE CRON '0 * * * *'
NOTIFICATIONS (
    email_addresses = ['data-team@company.com']
);

-- Alert: Cost spike
CREATE ALERT daily_cost_spike
AS SELECT
    date,
    estimated_cost,
    LAG(estimated_cost) OVER (ORDER BY date) as prev_day_cost,
    (estimated_cost - LAG(estimated_cost) OVER (ORDER BY date)) /
        NULLIF(LAG(estimated_cost) OVER (ORDER BY date), 0) * 100 as percent_change
FROM monitoring.daily_costs
WHERE date = current_date() - 1
HAVING percent_change > 50  -- 50% increase
SCHEDULE CRON '0 8 * * *'
NOTIFICATIONS (
    email_addresses = ['finops@company.com']
);

Step 4: Structured Logging

# src/utils/logging.py
import logging
import json
from datetime import datetime
from typing import Any

class StructuredLogger:
    """Structured logging for Databricks notebooks."""

    def __init__(self, job_name: str, run_id: str = None):
        self.job_name = job_name
        self.run_id = run_id or str(datetime.now().timestamp())
        self.logger = logging.getLogger(job_name)
        self.logger.setLevel(logging.INFO)

        # JSON formatter
        handler = logging.StreamHandler()
        handler.setFormatter(JsonFormatter())
        self.logger.addHandler(handler)

    def _log(self, level: str, message: str, **context):
        """Log with structured context."""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "job_name": self.job_name,
            "run_id": self.run_id,
            "level": level,
            "message": message,
            **context
        }
        getattr(self.logger, level.lower())(json.dumps(log_entry))

    def info(self, message: str, **context):
        self._log("INFO", message, **context)

    def error(self, message: str, **context):
        self._log("ERROR", message, **context)

    def metric(self, name: str, value: Any, **tags):
        """Log a metric for monitoring."""
        self._log("METRIC", f"{name}={value}", metric_name=name, metric_value=value, **tags)

class JsonFormatter(logging.Formatter):
    """JSON log formatter."""
    def format(self, record):
        return record.getMessage()

# Usage in notebooks
logger = StructuredLogger("etl-pipeline", dbutils.notebook.entry_point.getDbutils().notebook().getContext().runId())
logger.info("Starting bronze ingestion", source="s3://bucket/raw")
logger.metric("rows_processed", 10000, table="orders")

Step 5: Custom Metrics Dashboard

# src/monitoring/dashboard.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import (
    Dashboard,
    Widget,
    Query,
)

def create_monitoring_dashboard(w: WorkspaceClient) -> str:
    """Create operational monitoring dashboard."""

    # Create dashboard
    dashboard = w.dashboards.create(
        name="Data Platform Monitoring",
        tags=["monitoring", "operations"],
    )

    # Job Success Rate Widget
    job_success_query = """
    SELECT
        DATE(start_time) as date,
        job_name,
        ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate
    FROM system.lakeflow.job_run_timeline
    WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
    GROUP BY DATE(start_time), job_name
    ORDER BY date, job_name
    """

    # Add widgets...
    # (Dashboard API implementation)

    return dashboard.id

# Generate Grafana dashboard JSON
def generate_grafana_dashboard() -> dict:
    """Generate Grafana dashboard configuration."""
    return {
        "dashboard": {
            "title": "Databricks Monitoring",
            "panels": [
                {
                    "title": "Job Success Rate",
                    "type": "timeseries",
                    "targets": [{
                        "rawSql": """
                            SELECT
                                start_time as time,
                                success_rate
                            FROM monitoring.job_health_summary
                        """
                    }]
                },
                {
                    "title": "Daily DBU Usage",
                    "type": "bargauge",
                    "targets": [{
                        "rawSql": """
                            SELECT date, total_dbus
                            FROM monitoring.daily_costs
                            WHERE date > current_date() - 7
                        """
                    }]
                }
            ]
        }
    }

Step 6: Integration with External Monitoring

# src/monitoring/external.py
import requests
from dataclasses import dataclass

@dataclass
class MetricPoint:
    name: str
    value: float
    tags: dict
    timestamp: int = None

class DatadogExporter:
    """Export metrics to Datadog."""

    def __init__(self, api_key: str, app_key: str):
        self.api_key = api_key
        self.app_key = app_key
        self.base_url = "https://api.datadoghq.com/api/v2"

    def send_metrics(self, metrics: list[MetricPoint]):
        """Send metrics to Datadog."""
        series = []
        for m in metrics:
            series.append({
                "metric": f"databricks.{m.name}",
                "points": [[m.timestamp or int(time.time()), m.value]],
                "tags": [f"{k}:{v}" for k, v in m.tags.items()]
            })

        response = requests.post(
            f"{self.base_url}/series",
            headers={
                "DD-API-KEY": self.api_key,
                "DD-APPLICATION-KEY": self.app_key,
            },
            json={"series": series}
        )
        return response.status_code == 202

# Usage
exporter = DatadogExporter(api_key, app_key)
exporter.send_metrics([
    MetricPoint("job.duration_minutes", 45.2, {"job": "etl-pipeline", "env": "prod"}),
    MetricPoint("job.rows_processed", 1000000, {"job": "etl-pipeline", "env": "prod"}),
])

Output

  • System table queries configured
  • Monitoring views created
  • SQL alerts active
  • Structured logging implemented
  • External integrations ready

Error Handling

IssueCauseSolution
System tables unavailableFeature not enabledContact admin to enable
Alert not triggeringWrong scheduleCheck cron expression
Missing metricsQuery timeoutOptimize query or increase warehouse
High cardinalityToo many tagsReduce label dimensions

Examples

Quick Health Check Query

SELECT
    job_name,
    success_rate,
    avg_duration_minutes,
    last_run_time
FROM monitoring.job_health_summary
WHERE success_rate < 95
ORDER BY success_rate ASC;

Resources

  • Databricks System Tables
  • SQL Alerts
  • Dashboards

Next Steps

For incident response, see databricks-incident-runbook.