0
# Distributions API
1
2
The Distributions API provides a framework for defining data distribution requirements in Apache Spark Catalyst. This API allows data sources to specify how data should be distributed across partitions to optimize query performance, especially for operations like joins, aggregations, and sorting.
3
4
## Overview
5
6
The distributions API enables data sources to communicate their distribution characteristics and requirements to Spark's query planner. This information is crucial for the Catalyst optimizer to make informed decisions about data shuffling, partition pruning, and join strategies.
7
8
## Core Distribution Interface
9
10
### Distribution
11
12
Base interface for all distribution types:
13
14
```java { .api }
15
package org.apache.spark.sql.connector.distributions;
16
17
import org.apache.spark.annotation.Experimental;
18
19
@Experimental
20
public interface Distribution {
21
// Marker interface for distribution types
22
}
23
```
24
25
## Distribution Types
26
27
### UnspecifiedDistribution
28
29
Represents a distribution where no guarantees are made about data co-location:
30
31
```java { .api }
32
package org.apache.spark.sql.connector.distributions;
33
34
import org.apache.spark.annotation.Experimental;
35
36
@Experimental
37
public interface UnspecifiedDistribution extends Distribution {
38
// No specific distribution requirements
39
}
40
```
41
42
**Usage:** Use when the data source makes no promises about how data is distributed across partitions. This is the most flexible but least optimized distribution type.
43
44
### ClusteredDistribution
45
46
Represents a distribution where tuples sharing the same values for clustering expressions are co-located in the same partition:
47
48
```java { .api }
49
package org.apache.spark.sql.connector.distributions;
50
51
import org.apache.spark.annotation.Experimental;
52
import org.apache.spark.sql.connector.expressions.Expression;
53
54
@Experimental
55
public interface ClusteredDistribution extends Distribution {
56
/**
57
* Returns the clustering expressions that determine data co-location
58
*/
59
Expression[] clustering();
60
}
61
```
62
63
**Usage:** Use when data is partitioned by specific columns or expressions, ensuring that all rows with the same clustering key values are in the same partition. This is optimal for hash-based joins and group-by operations.
64
65
### OrderedDistribution
66
67
Represents a distribution where tuples are ordered across partitions according to ordering expressions:
68
69
```java { .api }
70
package org.apache.spark.sql.connector.distributions;
71
72
import org.apache.spark.annotation.Experimental;
73
import org.apache.spark.sql.connector.expressions.SortOrder;
74
75
@Experimental
76
public interface OrderedDistribution extends Distribution {
77
/**
78
* Returns the sort orders that define the ordering across partitions
79
*/
80
SortOrder[] ordering();
81
}
82
```
83
84
**Usage:** Use when data is globally sorted across all partitions. This distribution is optimal for range-based operations and merge joins.
85
86
## Distribution Factory Methods
87
88
### Distributions
89
90
Helper class providing factory methods for creating distribution instances:
91
92
```java { .api }
93
package org.apache.spark.sql.connector.distributions;
94
95
import org.apache.spark.annotation.Experimental;
96
import org.apache.spark.sql.connector.expressions.Expression;
97
import org.apache.spark.sql.connector.expressions.SortOrder;
98
99
@Experimental
100
public class Distributions {
101
102
/**
103
* Creates an unspecified distribution
104
*/
105
public static UnspecifiedDistribution unspecified() {
106
return LogicalDistributions.unspecified();
107
}
108
109
/**
110
* Creates a clustered distribution with the specified clustering expressions
111
*/
112
public static ClusteredDistribution clustered(Expression[] clustering) {
113
return LogicalDistributions.clustered(clustering);
114
}
115
116
/**
117
* Creates an ordered distribution with the specified sort orders
118
*/
119
public static OrderedDistribution ordered(SortOrder[] ordering) {
120
return LogicalDistributions.ordered(ordering);
121
}
122
}
123
```
124
125
## Supporting Expression Types
126
127
### Expression
128
129
Base interface for all expressions used in distributions:
130
131
```java { .api }
132
package org.apache.spark.sql.connector.expressions;
133
134
import org.apache.spark.annotation.Evolving;
135
136
@Evolving
137
public interface Expression {
138
Expression[] EMPTY_EXPRESSION = new Expression[0];
139
NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
140
141
/**
142
* Human-readable description of this expression
143
*/
144
String describe();
145
146
/**
147
* Child expressions of this expression
148
*/
149
Expression[] children();
150
151
/**
152
* Named references used by this expression
153
*/
154
NamedReference[] references();
155
}
156
```
157
158
### NamedReference
159
160
Reference to a named field or column:
161
162
```java { .api }
163
package org.apache.spark.sql.connector.expressions;
164
165
import org.apache.spark.annotation.Evolving;
166
167
@Evolving
168
public interface NamedReference extends Expression {
169
/**
170
* Field name path (supporting nested fields)
171
*/
172
String[] fieldNames();
173
}
174
```
175
176
### SortOrder
177
178
Represents a sort order used in ordered distributions:
179
180
```java { .api }
181
package org.apache.spark.sql.connector.expressions;
182
183
import org.apache.spark.annotation.Experimental;
184
185
@Experimental
186
public interface SortOrder extends Expression {
187
/**
188
* The expression to sort by
189
*/
190
Expression expression();
191
192
/**
193
* Sort direction (ascending or descending)
194
*/
195
SortDirection direction();
196
197
/**
198
* Null ordering behavior
199
*/
200
NullOrdering nullOrdering();
201
}
202
```
203
204
### SortDirection
205
206
Enumeration of sort directions:
207
208
```java { .api }
209
package org.apache.spark.sql.connector.expressions;
210
211
import org.apache.spark.annotation.Experimental;
212
213
@Experimental
214
public enum SortDirection {
215
ASCENDING(NullOrdering.NULLS_FIRST),
216
DESCENDING(NullOrdering.NULLS_LAST);
217
218
/**
219
* Default null ordering for this sort direction
220
*/
221
public NullOrdering defaultNullOrdering() {
222
return defaultNullOrdering;
223
}
224
}
225
```
226
227
### NullOrdering
228
229
Enumeration of null ordering behaviors:
230
231
```java { .api }
232
package org.apache.spark.sql.connector.expressions;
233
234
import org.apache.spark.annotation.Experimental;
235
236
@Experimental
237
public enum NullOrdering {
238
NULLS_FIRST,
239
NULLS_LAST
240
}
241
```
242
243
## Usage Examples
244
245
### Creating an Unspecified Distribution
246
247
```java
248
import org.apache.spark.sql.connector.distributions.Distribution;
249
import org.apache.spark.sql.connector.distributions.Distributions;
250
251
// For data sources that make no distribution guarantees
252
Distribution distribution = Distributions.unspecified();
253
```
254
255
### Creating a Clustered Distribution
256
257
```java
258
import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
259
import org.apache.spark.sql.connector.distributions.Distributions;
260
import org.apache.spark.sql.connector.expressions.Expression;
261
import org.apache.spark.sql.connector.expressions.FieldReference;
262
263
// Cluster by customer_id and region columns
264
Expression[] clusteringExprs = new Expression[] {
265
FieldReference.column("customer_id"),
266
FieldReference.column("region")
267
};
268
269
ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);
270
```
271
272
### Creating an Ordered Distribution
273
274
```java
275
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
276
import org.apache.spark.sql.connector.distributions.Distributions;
277
import org.apache.spark.sql.connector.expressions.SortOrder;
278
import org.apache.spark.sql.connector.expressions.SortDirection;
279
import org.apache.spark.sql.connector.expressions.NullOrdering;
280
import org.apache.spark.sql.connector.expressions.FieldReference;
281
282
// Order by timestamp descending, then by id ascending
283
SortOrder[] ordering = new SortOrder[] {
284
new SortOrderImpl(
285
FieldReference.column("timestamp"),
286
SortDirection.DESCENDING,
287
NullOrdering.NULLS_LAST
288
),
289
new SortOrderImpl(
290
FieldReference.column("id"),
291
SortDirection.ASCENDING,
292
NullOrdering.NULLS_FIRST
293
)
294
};
295
296
OrderedDistribution distribution = Distributions.ordered(ordering);
297
```
298
299
### Using Distributions in Data Source Implementation
300
301
```java
302
import org.apache.spark.sql.connector.read.ScanBuilder;
303
import org.apache.spark.sql.connector.read.Scan;
304
import org.apache.spark.sql.connector.distributions.Distribution;
305
import org.apache.spark.sql.connector.distributions.Distributions;
306
307
public class MyDataSourceScanBuilder implements ScanBuilder {
308
309
@Override
310
public Scan build() {
311
return new MyDataSourceScan();
312
}
313
314
private static class MyDataSourceScan implements Scan {
315
316
@Override
317
public Distribution outputDistribution() {
318
// Return the actual distribution of the data
319
// This helps Spark optimize query execution
320
321
if (isDataPartitionedByKey()) {
322
Expression[] partitionExprs = getPartitionExpressions();
323
return Distributions.clustered(partitionExprs);
324
} else if (isDataSorted()) {
325
SortOrder[] sortOrders = getSortOrders();
326
return Distributions.ordered(sortOrders);
327
} else {
328
return Distributions.unspecified();
329
}
330
}
331
332
// Other Scan methods...
333
}
334
}
335
```
336
337
### Complex Distribution Requirements
338
339
```java
340
import org.apache.spark.sql.connector.expressions.Expression;
341
import org.apache.spark.sql.connector.expressions.Transform;
342
import org.apache.spark.sql.connector.expressions.Expressions;
343
344
// Cluster by a transformed expression (e.g., hash bucket)
345
Expression bucketExpr = Expressions.bucket(10, "user_id");
346
Expression[] clusteringExprs = new Expression[] { bucketExpr };
347
ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);
348
349
// Or cluster by multiple columns with different data types
350
Expression[] multiColumnClustering = new Expression[] {
351
FieldReference.column("year"), // Partition by year
352
FieldReference.column("month"), // Then by month
353
FieldReference.column("region") // Then by region
354
};
355
ClusteredDistribution complexDistribution = Distributions.clustered(multiColumnClustering);
356
```
357
358
## Import Statements
359
360
```java
361
// Core distribution interfaces
362
import org.apache.spark.sql.connector.distributions.Distribution;
363
import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
364
import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
365
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
366
367
// Distribution factory
368
import org.apache.spark.sql.connector.distributions.Distributions;
369
370
// Expression interfaces for distribution definitions
371
import org.apache.spark.sql.connector.expressions.Expression;
372
import org.apache.spark.sql.connector.expressions.NamedReference;
373
import org.apache.spark.sql.connector.expressions.SortOrder;
374
import org.apache.spark.sql.connector.expressions.SortDirection;
375
import org.apache.spark.sql.connector.expressions.NullOrdering;
376
377
// Utility classes for creating expressions
378
import org.apache.spark.sql.connector.expressions.FieldReference;
379
import org.apache.spark.sql.connector.expressions.Expressions;
380
```
381
382
## Performance Considerations
383
384
### Clustered Distribution Benefits
385
- **Hash Joins**: When data is clustered by join keys, Spark can perform more efficient hash joins without shuffling
386
- **Aggregations**: Group-by operations on clustering columns avoid expensive shuffles
387
- **Partition Pruning**: Filters on clustering columns can eliminate entire partitions
388
389
### Ordered Distribution Benefits
390
- **Range Joins**: Enables efficient merge joins for range-based predicates
391
- **Sorting**: Eliminates the need for global sorting when data is already ordered
392
- **Top-K Operations**: Efficient execution of ORDER BY with LIMIT queries
393
394
### Best Practices
395
1. **Choose appropriate distribution**: Match the distribution to your query patterns
396
2. **Minimize clustering expressions**: Too many clustering columns can reduce effectiveness
397
3. **Consider data skew**: Ensure clustering expressions provide good data distribution
398
4. **Update distributions**: Keep distribution metadata in sync with actual data layout
399
400
## Integration with Catalyst Optimizer
401
402
The distributions API integrates seamlessly with Spark's Catalyst optimizer:
403
404
1. **Physical Plan Generation**: Distribution information influences physical operator selection
405
2. **Shuffle Elimination**: Proper distributions can eliminate unnecessary shuffle operations
406
3. **Join Strategy Selection**: Affects whether broadcast, hash, or merge joins are chosen
407
4. **Partition-wise Operations**: Enables partition-wise execution of operations when data is properly distributed
408
409
This API is essential for building high-performance data sources that can take full advantage of Spark's distributed computing capabilities.