CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

databricks-performance-tuning

tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill databricks-performance-tuning
github.com/jeremylongshore/claude-code-plugins-plus-skills

Optimize Databricks cluster and query performance. Use when jobs are running slowly, optimizing Spark configurations, or improving Delta Lake query performance. Trigger with phrases like "databricks performance", "spark tuning", "databricks slow", "optimize databricks", "cluster performance".

Review Score

78%

Validation Score

12/16

Implementation Score

65%

Activation Score

90%

Databricks Performance Tuning

Overview

Optimize Databricks cluster, Spark, and Delta Lake performance.

Prerequisites

  • Access to cluster configuration
  • Understanding of workload characteristics
  • Query history access

Instructions

Step 1: Cluster Sizing

# Cluster sizing calculator
def recommend_cluster_size(
    data_size_gb: float,
    complexity: str = "medium",  # low, medium, high
    parallelism_need: str = "standard",  # standard, high
) -> dict:
    """
    Recommend cluster configuration based on workload.

    Args:
        data_size_gb: Estimated data size to process
        complexity: Query/transform complexity
        parallelism_need: Required parallelism level

    Returns:
        Recommended cluster configuration
    """
    # Memory per executor (standard DS3_v2 = 14GB)
    memory_per_worker = 14

    # Base calculation
    workers_by_data = max(1, int(data_size_gb / memory_per_worker / 2))

    # Adjust for complexity
    complexity_multiplier = {"low": 1, "medium": 1.5, "high": 2.5}
    workers = int(workers_by_data * complexity_multiplier.get(complexity, 1.5))

    # Adjust for parallelism
    if parallelism_need == "high":
        workers = max(workers, 8)

    return {
        "node_type_id": "Standard_DS3_v2",
        "num_workers": workers,
        "autoscale": {
            "min_workers": max(1, workers // 2),
            "max_workers": workers * 2,
        },
        "spark_conf": {
            "spark.sql.shuffle.partitions": str(workers * 4),
            "spark.default.parallelism": str(workers * 4),
        }
    }

Step 2: Spark Configuration Optimization

# Optimized Spark configurations by workload type
spark_configs = {
    "etl_batch": {
        # Memory and parallelism
        "spark.sql.shuffle.partitions": "200",
        "spark.default.parallelism": "200",
        "spark.sql.files.maxPartitionBytes": "134217728",  # 128MB

        # Delta Lake optimizations
        "spark.databricks.delta.optimizeWrite.enabled": "true",
        "spark.databricks.delta.autoCompact.enabled": "true",
        "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",

        # Adaptive query execution
        "spark.sql.adaptive.enabled": "true",
        "spark.sql.adaptive.coalescePartitions.enabled": "true",
        "spark.sql.adaptive.skewJoin.enabled": "true",
    },

    "ml_training": {
        # Memory for ML workloads
        "spark.driver.memory": "16g",
        "spark.executor.memory": "16g",
        "spark.memory.fraction": "0.8",
        "spark.memory.storageFraction": "0.3",

        # Serialization for ML
        "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
        "spark.kryoserializer.buffer.max": "1024m",
    },

    "streaming": {
        # Streaming configurations
        "spark.sql.streaming.schemaInference": "true",
        "spark.sql.streaming.checkpointLocation": "/mnt/checkpoints",
        "spark.databricks.delta.autoCompact.minNumFiles": "10",

        # Micro-batch tuning
        "spark.sql.streaming.forEachBatch.enabled": "true",
    },

    "interactive": {
        # Fast startup
        "spark.databricks.cluster.profile": "singleNode",
        "spark.master": "local[*]",

        # Caching
        "spark.sql.inMemoryColumnarStorage.compressed": "true",
        "spark.sql.inMemoryColumnarStorage.batchSize": "10000",
    }
}

Step 3: Delta Lake Optimization

# delta_optimization.py
from pyspark.sql import SparkSession

def optimize_delta_table(
    spark: SparkSession,
    table_name: str,
    z_order_columns: list[str] = None,
    vacuum_hours: int = 168,  # 7 days
) -> dict:
    """
    Optimize Delta table for query performance.

    Args:
        spark: SparkSession
        table_name: Fully qualified table name
        z_order_columns: Columns for Z-ordering (max 4)
        vacuum_hours: Retention period for vacuum

    Returns:
        Optimization results
    """
    results = {}

    # 1. Run OPTIMIZE with Z-ordering
    if z_order_columns:
        z_order_clause = ", ".join(z_order_columns[:4])  # Max 4 columns
        spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({z_order_clause})")
        results["z_order"] = z_order_columns
    else:
        spark.sql(f"OPTIMIZE {table_name}")

    results["optimized"] = True

    # 2. Analyze table statistics
    spark.sql(f"ANALYZE TABLE {table_name} COMPUTE STATISTICS")
    results["statistics_computed"] = True

    # 3. Vacuum old files
    spark.sql(f"VACUUM {table_name} RETAIN {vacuum_hours} HOURS")
    results["vacuumed"] = True

    # 4. Get table metrics
    detail = spark.sql(f"DESCRIBE DETAIL {table_name}").first()
    results["metrics"] = {
        "num_files": detail.numFiles,
        "size_bytes": detail.sizeInBytes,
        "partitions": detail.partitionColumns,
    }

    return results

def enable_liquid_clustering(
    spark: SparkSession,
    table_name: str,
    cluster_columns: list[str],
) -> None:
    """
    Enable Liquid Clustering for automatic data layout optimization.

    Liquid Clustering replaces traditional partitioning and Z-ordering
    with automatic, incremental clustering.
    """
    columns = ", ".join(cluster_columns)
    spark.sql(f"""
        ALTER TABLE {table_name}
        CLUSTER BY ({columns})
    """)

def enable_predictive_optimization(
    spark: SparkSession,
    table_name: str,
) -> None:
    """Enable Databricks Predictive Optimization."""
    spark.sql(f"""
        ALTER TABLE {table_name}
        SET TBLPROPERTIES (
            'delta.enableDeletionVectors' = 'true',
            'delta.targetFileSize' = '104857600'
        )
    """)

Step 4: Query Performance Analysis

-- Find slow queries from query history
SELECT
    query_id,
    query_text,
    duration / 1000 as seconds,
    rows_produced,
    bytes_read,
    start_time
FROM system.query.history
WHERE duration > 60000  -- > 60 seconds
  AND start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY duration DESC
LIMIT 20;

-- Analyze query plan
EXPLAIN FORMATTED
SELECT * FROM main.silver.orders
WHERE order_date > '2024-01-01'
  AND region = 'US';

-- Check table scan statistics
SELECT
    table_name,
    SUM(bytes_read) / 1024 / 1024 / 1024 as gb_read,
    SUM(rows_produced) as total_rows,
    COUNT(*) as query_count
FROM system.query.history
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY table_name
ORDER BY gb_read DESC;

Step 5: Caching Strategy

# Intelligent caching for repeated queries
from pyspark.sql import DataFrame
from functools import lru_cache

class CacheManager:
    """Manage Spark DataFrame caching."""

    def __init__(self, spark: SparkSession):
        self.spark = spark
        self._cache_registry = {}

    def cache_table(
        self,
        table_name: str,
        cache_level: str = "MEMORY_AND_DISK",
    ) -> DataFrame:
        """Cache table with specified storage level."""
        if table_name in self._cache_registry:
            return self._cache_registry[table_name]

        df = self.spark.table(table_name)

        if cache_level == "MEMORY_ONLY":
            df.cache()
        elif cache_level == "MEMORY_AND_DISK":
            from pyspark import StorageLevel
            df.persist(StorageLevel.MEMORY_AND_DISK)
        elif cache_level == "DISK_ONLY":
            from pyspark import StorageLevel
            df.persist(StorageLevel.DISK_ONLY)

        # Trigger caching
        df.count()

        self._cache_registry[table_name] = df
        return df

    def uncache_all(self):
        """Clear all cached DataFrames."""
        for df in self._cache_registry.values():
            df.unpersist()
        self._cache_registry.clear()
        self.spark.catalog.clearCache()

# Delta Cache (automatic)
# Enable in cluster config:
# "spark.databricks.io.cache.enabled": "true"
# "spark.databricks.io.cache.maxDiskUsage": "50g"

Step 6: Join Optimization

from pyspark.sql import DataFrame
from pyspark.sql.functions import broadcast

def optimize_join(
    df_large: DataFrame,
    df_small: DataFrame,
    join_key: str,
    small_table_threshold_mb: int = 100,
) -> DataFrame:
    """
    Optimize join based on table sizes.

    Uses broadcast join for small tables,
    sort-merge join for large tables.
    """
    # Estimate small table size
    small_size_mb = df_small.count() * 100 / 1024 / 1024  # rough estimate

    if small_size_mb < small_table_threshold_mb:
        # Broadcast join (small table fits in memory)
        return df_large.join(broadcast(df_small), join_key)
    else:
        # Sort-merge join with bucketing hint
        return df_large.join(df_small, join_key, "inner")

# Bucketed tables for frequent joins
def create_bucketed_table(
    spark: SparkSession,
    df: DataFrame,
    table_name: str,
    bucket_columns: list[str],
    num_buckets: int = 100,
):
    """Create bucketed table for join optimization."""
    (
        df.write
        .bucketBy(num_buckets, *bucket_columns)
        .sortBy(*bucket_columns)
        .saveAsTable(table_name)
    )

Output

  • Optimized cluster configuration
  • Tuned Spark settings
  • Optimized Delta tables
  • Improved query performance

Error Handling

IssueCauseSolution
OOM errorsInsufficient memoryIncrease executor memory or reduce partition size
Skewed dataUneven distributionUse salting or AQE skew handling
Slow joinsLarge shuffleUse broadcast for small tables
Too many filesSmall files problemRun OPTIMIZE regularly

Examples

Performance Benchmark

import time

def benchmark_query(spark, query: str, runs: int = 3) -> dict:
    """Benchmark query execution time."""
    times = []
    for _ in range(runs):
        spark.catalog.clearCache()
        start = time.time()
        spark.sql(query).collect()
        times.append(time.time() - start)

    return {
        "min": min(times),
        "max": max(times),
        "avg": sum(times) / len(times),
        "runs": runs,
    }

Resources

  • Databricks Performance Guide
  • Delta Lake Optimization
  • Spark Tuning Guide

Next Steps

For cost optimization, see databricks-cost-tuning.