or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

distributions-api.mddocs/

Distributions API

The Distributions API provides a framework for defining data distribution requirements in Apache Spark Catalyst. This API allows data sources to specify how data should be distributed across partitions to optimize query performance, especially for operations like joins, aggregations, and sorting.

Overview

The distributions API enables data sources to communicate their distribution characteristics and requirements to Spark's query planner. This information is crucial for the Catalyst optimizer to make informed decisions about data shuffling, partition pruning, and join strategies.

Core Distribution Interface

Distribution

Base interface for all distribution types:

package org.apache.spark.sql.connector.distributions;

import org.apache.spark.annotation.Experimental;

@Experimental
public interface Distribution {
    // Marker interface for distribution types
}

Distribution Types

UnspecifiedDistribution

Represents a distribution where no guarantees are made about data co-location:

package org.apache.spark.sql.connector.distributions;

import org.apache.spark.annotation.Experimental;

@Experimental
public interface UnspecifiedDistribution extends Distribution {
    // No specific distribution requirements
}

Usage: Use when the data source makes no promises about how data is distributed across partitions. This is the most flexible but least optimized distribution type.

ClusteredDistribution

Represents a distribution where tuples sharing the same values for clustering expressions are co-located in the same partition:

package org.apache.spark.sql.connector.distributions;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Expression;

@Experimental
public interface ClusteredDistribution extends Distribution {
    /**
     * Returns the clustering expressions that determine data co-location
     */
    Expression[] clustering();
}

Usage: Use when data is partitioned by specific columns or expressions, ensuring that all rows with the same clustering key values are in the same partition. This is optimal for hash-based joins and group-by operations.

OrderedDistribution

Represents a distribution where tuples are ordered across partitions according to ordering expressions:

package org.apache.spark.sql.connector.distributions;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.SortOrder;

@Experimental
public interface OrderedDistribution extends Distribution {
    /**
     * Returns the sort orders that define the ordering across partitions
     */
    SortOrder[] ordering();
}

Usage: Use when data is globally sorted across all partitions. This distribution is optimal for range-based operations and merge joins.

Distribution Factory Methods

Distributions

Helper class providing factory methods for creating distribution instances:

package org.apache.spark.sql.connector.distributions;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.SortOrder;

@Experimental
public class Distributions {
    
    /**
     * Creates an unspecified distribution
     */
    public static UnspecifiedDistribution unspecified() {
        return LogicalDistributions.unspecified();
    }
    
    /**
     * Creates a clustered distribution with the specified clustering expressions
     */
    public static ClusteredDistribution clustered(Expression[] clustering) {
        return LogicalDistributions.clustered(clustering);
    }
    
    /**
     * Creates an ordered distribution with the specified sort orders
     */
    public static OrderedDistribution ordered(SortOrder[] ordering) {
        return LogicalDistributions.ordered(ordering);
    }
}

Supporting Expression Types

Expression

Base interface for all expressions used in distributions:

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Evolving;

@Evolving
public interface Expression {
    Expression[] EMPTY_EXPRESSION = new Expression[0];
    NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
    
    /**
     * Human-readable description of this expression
     */
    String describe();
    
    /**
     * Child expressions of this expression
     */
    Expression[] children();
    
    /**
     * Named references used by this expression
     */
    NamedReference[] references();
}

NamedReference

Reference to a named field or column:

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Evolving;

@Evolving
public interface NamedReference extends Expression {
    /**
     * Field name path (supporting nested fields)
     */
    String[] fieldNames();
}

SortOrder

Represents a sort order used in ordered distributions:

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Experimental;

@Experimental
public interface SortOrder extends Expression {
    /**
     * The expression to sort by
     */
    Expression expression();
    
    /**
     * Sort direction (ascending or descending)
     */
    SortDirection direction();
    
    /**
     * Null ordering behavior
     */
    NullOrdering nullOrdering();
}

SortDirection

Enumeration of sort directions:

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Experimental;

@Experimental
public enum SortDirection {
    ASCENDING(NullOrdering.NULLS_FIRST), 
    DESCENDING(NullOrdering.NULLS_LAST);
    
    /**
     * Default null ordering for this sort direction
     */
    public NullOrdering defaultNullOrdering() {
        return defaultNullOrdering;
    }
}

NullOrdering

Enumeration of null ordering behaviors:

package org.apache.spark.sql.connector.expressions;

import org.apache.spark.annotation.Experimental;

@Experimental
public enum NullOrdering {
    NULLS_FIRST, 
    NULLS_LAST
}

Usage Examples

Creating an Unspecified Distribution

import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;

// For data sources that make no distribution guarantees
Distribution distribution = Distributions.unspecified();

Creating a Clustered Distribution

import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.FieldReference;

// Cluster by customer_id and region columns
Expression[] clusteringExprs = new Expression[] {
    FieldReference.column("customer_id"),
    FieldReference.column("region")
};

ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);

Creating an Ordered Distribution

import org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.NullOrdering;
import org.apache.spark.sql.connector.expressions.FieldReference;

// Order by timestamp descending, then by id ascending
SortOrder[] ordering = new SortOrder[] {
    new SortOrderImpl(
        FieldReference.column("timestamp"), 
        SortDirection.DESCENDING, 
        NullOrdering.NULLS_LAST
    ),
    new SortOrderImpl(
        FieldReference.column("id"), 
        SortDirection.ASCENDING, 
        NullOrdering.NULLS_FIRST
    )
};

OrderedDistribution distribution = Distributions.ordered(ordering);

Using Distributions in Data Source Implementation

import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;

public class MyDataSourceScanBuilder implements ScanBuilder {
    
    @Override
    public Scan build() {
        return new MyDataSourceScan();
    }
    
    private static class MyDataSourceScan implements Scan {
        
        @Override
        public Distribution outputDistribution() {
            // Return the actual distribution of the data
            // This helps Spark optimize query execution
            
            if (isDataPartitionedByKey()) {
                Expression[] partitionExprs = getPartitionExpressions();
                return Distributions.clustered(partitionExprs);
            } else if (isDataSorted()) {
                SortOrder[] sortOrders = getSortOrders();
                return Distributions.ordered(sortOrders);
            } else {
                return Distributions.unspecified();
            }
        }
        
        // Other Scan methods...
    }
}

Complex Distribution Requirements

import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.expressions.Expressions;

// Cluster by a transformed expression (e.g., hash bucket)
Expression bucketExpr = Expressions.bucket(10, "user_id");
Expression[] clusteringExprs = new Expression[] { bucketExpr };
ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);

// Or cluster by multiple columns with different data types
Expression[] multiColumnClustering = new Expression[] {
    FieldReference.column("year"),      // Partition by year
    FieldReference.column("month"),     // Then by month
    FieldReference.column("region")     // Then by region
};
ClusteredDistribution complexDistribution = Distributions.clustered(multiColumnClustering);

Import Statements

// Core distribution interfaces
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;

// Distribution factory
import org.apache.spark.sql.connector.distributions.Distributions;

// Expression interfaces for distribution definitions
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.NullOrdering;

// Utility classes for creating expressions
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.Expressions;

Performance Considerations

Clustered Distribution Benefits

  • Hash Joins: When data is clustered by join keys, Spark can perform more efficient hash joins without shuffling
  • Aggregations: Group-by operations on clustering columns avoid expensive shuffles
  • Partition Pruning: Filters on clustering columns can eliminate entire partitions

Ordered Distribution Benefits

  • Range Joins: Enables efficient merge joins for range-based predicates
  • Sorting: Eliminates the need for global sorting when data is already ordered
  • Top-K Operations: Efficient execution of ORDER BY with LIMIT queries

Best Practices

  1. Choose appropriate distribution: Match the distribution to your query patterns
  2. Minimize clustering expressions: Too many clustering columns can reduce effectiveness
  3. Consider data skew: Ensure clustering expressions provide good data distribution
  4. Update distributions: Keep distribution metadata in sync with actual data layout

Integration with Catalyst Optimizer

The distributions API integrates seamlessly with Spark's Catalyst optimizer:

  1. Physical Plan Generation: Distribution information influences physical operator selection
  2. Shuffle Elimination: Proper distributions can eliminate unnecessary shuffle operations
  3. Join Strategy Selection: Affects whether broadcast, hash, or merge joins are chosen
  4. Partition-wise Operations: Enables partition-wise execution of operations when data is properly distributed

This API is essential for building high-performance data sources that can take full advantage of Spark's distributed computing capabilities.