The Distributions API provides a framework for defining data distribution requirements in Apache Spark Catalyst. This API allows data sources to specify how data should be distributed across partitions to optimize query performance, especially for operations like joins, aggregations, and sorting.
The distributions API enables data sources to communicate their distribution characteristics and requirements to Spark's query planner. This information is crucial for the Catalyst optimizer to make informed decisions about data shuffling, partition pruning, and join strategies.
Base interface for all distribution types:
package org.apache.spark.sql.connector.distributions;
import org.apache.spark.annotation.Experimental;
@Experimental
public interface Distribution {
// Marker interface for distribution types
}Represents a distribution where no guarantees are made about data co-location:
package org.apache.spark.sql.connector.distributions;
import org.apache.spark.annotation.Experimental;
@Experimental
public interface UnspecifiedDistribution extends Distribution {
// No specific distribution requirements
}Usage: Use when the data source makes no promises about how data is distributed across partitions. This is the most flexible but least optimized distribution type.
Represents a distribution where tuples sharing the same values for clustering expressions are co-located in the same partition:
package org.apache.spark.sql.connector.distributions;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Expression;
@Experimental
public interface ClusteredDistribution extends Distribution {
/**
* Returns the clustering expressions that determine data co-location
*/
Expression[] clustering();
}Usage: Use when data is partitioned by specific columns or expressions, ensuring that all rows with the same clustering key values are in the same partition. This is optimal for hash-based joins and group-by operations.
Represents a distribution where tuples are ordered across partitions according to ordering expressions:
package org.apache.spark.sql.connector.distributions;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.SortOrder;
@Experimental
public interface OrderedDistribution extends Distribution {
/**
* Returns the sort orders that define the ordering across partitions
*/
SortOrder[] ordering();
}Usage: Use when data is globally sorted across all partitions. This distribution is optimal for range-based operations and merge joins.
Helper class providing factory methods for creating distribution instances:
package org.apache.spark.sql.connector.distributions;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.SortOrder;
@Experimental
public class Distributions {
/**
* Creates an unspecified distribution
*/
public static UnspecifiedDistribution unspecified() {
return LogicalDistributions.unspecified();
}
/**
* Creates a clustered distribution with the specified clustering expressions
*/
public static ClusteredDistribution clustered(Expression[] clustering) {
return LogicalDistributions.clustered(clustering);
}
/**
* Creates an ordered distribution with the specified sort orders
*/
public static OrderedDistribution ordered(SortOrder[] ordering) {
return LogicalDistributions.ordered(ordering);
}
}Base interface for all expressions used in distributions:
package org.apache.spark.sql.connector.expressions;
import org.apache.spark.annotation.Evolving;
@Evolving
public interface Expression {
Expression[] EMPTY_EXPRESSION = new Expression[0];
NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
/**
* Human-readable description of this expression
*/
String describe();
/**
* Child expressions of this expression
*/
Expression[] children();
/**
* Named references used by this expression
*/
NamedReference[] references();
}Reference to a named field or column:
package org.apache.spark.sql.connector.expressions;
import org.apache.spark.annotation.Evolving;
@Evolving
public interface NamedReference extends Expression {
/**
* Field name path (supporting nested fields)
*/
String[] fieldNames();
}Represents a sort order used in ordered distributions:
package org.apache.spark.sql.connector.expressions;
import org.apache.spark.annotation.Experimental;
@Experimental
public interface SortOrder extends Expression {
/**
* The expression to sort by
*/
Expression expression();
/**
* Sort direction (ascending or descending)
*/
SortDirection direction();
/**
* Null ordering behavior
*/
NullOrdering nullOrdering();
}Enumeration of sort directions:
package org.apache.spark.sql.connector.expressions;
import org.apache.spark.annotation.Experimental;
@Experimental
public enum SortDirection {
ASCENDING(NullOrdering.NULLS_FIRST),
DESCENDING(NullOrdering.NULLS_LAST);
/**
* Default null ordering for this sort direction
*/
public NullOrdering defaultNullOrdering() {
return defaultNullOrdering;
}
}Enumeration of null ordering behaviors:
package org.apache.spark.sql.connector.expressions;
import org.apache.spark.annotation.Experimental;
@Experimental
public enum NullOrdering {
NULLS_FIRST,
NULLS_LAST
}import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
// For data sources that make no distribution guarantees
Distribution distribution = Distributions.unspecified();import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.FieldReference;
// Cluster by customer_id and region columns
Expression[] clusteringExprs = new Expression[] {
FieldReference.column("customer_id"),
FieldReference.column("region")
};
ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);import org.apache.spark.sql.connector.distributions.OrderedDistribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.NullOrdering;
import org.apache.spark.sql.connector.expressions.FieldReference;
// Order by timestamp descending, then by id ascending
SortOrder[] ordering = new SortOrder[] {
new SortOrderImpl(
FieldReference.column("timestamp"),
SortDirection.DESCENDING,
NullOrdering.NULLS_LAST
),
new SortOrderImpl(
FieldReference.column("id"),
SortDirection.ASCENDING,
NullOrdering.NULLS_FIRST
)
};
OrderedDistribution distribution = Distributions.ordered(ordering);import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
public class MyDataSourceScanBuilder implements ScanBuilder {
@Override
public Scan build() {
return new MyDataSourceScan();
}
private static class MyDataSourceScan implements Scan {
@Override
public Distribution outputDistribution() {
// Return the actual distribution of the data
// This helps Spark optimize query execution
if (isDataPartitionedByKey()) {
Expression[] partitionExprs = getPartitionExpressions();
return Distributions.clustered(partitionExprs);
} else if (isDataSorted()) {
SortOrder[] sortOrders = getSortOrders();
return Distributions.ordered(sortOrders);
} else {
return Distributions.unspecified();
}
}
// Other Scan methods...
}
}import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.expressions.Expressions;
// Cluster by a transformed expression (e.g., hash bucket)
Expression bucketExpr = Expressions.bucket(10, "user_id");
Expression[] clusteringExprs = new Expression[] { bucketExpr };
ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);
// Or cluster by multiple columns with different data types
Expression[] multiColumnClustering = new Expression[] {
FieldReference.column("year"), // Partition by year
FieldReference.column("month"), // Then by month
FieldReference.column("region") // Then by region
};
ClusteredDistribution complexDistribution = Distributions.clustered(multiColumnClustering);// Core distribution interfaces
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
// Distribution factory
import org.apache.spark.sql.connector.distributions.Distributions;
// Expression interfaces for distribution definitions
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.NullOrdering;
// Utility classes for creating expressions
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.Expressions;The distributions API integrates seamlessly with Spark's Catalyst optimizer:
This API is essential for building high-performance data sources that can take full advantage of Spark's distributed computing capabilities.