or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md

distributions-api.mddocs/

0

# Distributions API

1

2

The Distributions API provides a framework for defining data distribution requirements in Apache Spark Catalyst. This API allows data sources to specify how data should be distributed across partitions to optimize query performance, especially for operations like joins, aggregations, and sorting.

3

4

## Overview

5

6

The distributions API enables data sources to communicate their distribution characteristics and requirements to Spark's query planner. This information is crucial for the Catalyst optimizer to make informed decisions about data shuffling, partition pruning, and join strategies.

7

8

## Core Distribution Interface

9

10

### Distribution

11

12

Base interface for all distribution types:

13

14

```java { .api }

15

package org.apache.spark.sql.connector.distributions;

16

17

import org.apache.spark.annotation.Experimental;

18

19

@Experimental

20

public interface Distribution {

21

// Marker interface for distribution types

22

}

23

```

24

25

## Distribution Types

26

27

### UnspecifiedDistribution

28

29

Represents a distribution where no guarantees are made about data co-location:

30

31

```java { .api }

32

package org.apache.spark.sql.connector.distributions;

33

34

import org.apache.spark.annotation.Experimental;

35

36

@Experimental

37

public interface UnspecifiedDistribution extends Distribution {

38

// No specific distribution requirements

39

}

40

```

41

42

**Usage:** Use when the data source makes no promises about how data is distributed across partitions. This is the most flexible but least optimized distribution type.

43

44

### ClusteredDistribution

45

46

Represents a distribution where tuples sharing the same values for clustering expressions are co-located in the same partition:

47

48

```java { .api }

49

package org.apache.spark.sql.connector.distributions;

50

51

import org.apache.spark.annotation.Experimental;

52

import org.apache.spark.sql.connector.expressions.Expression;

53

54

@Experimental

55

public interface ClusteredDistribution extends Distribution {

56

/**

57

* Returns the clustering expressions that determine data co-location

58

*/

59

Expression[] clustering();

60

}

61

```

62

63

**Usage:** Use when data is partitioned by specific columns or expressions, ensuring that all rows with the same clustering key values are in the same partition. This is optimal for hash-based joins and group-by operations.

64

65

### OrderedDistribution

66

67

Represents a distribution where tuples are ordered across partitions according to ordering expressions:

68

69

```java { .api }

70

package org.apache.spark.sql.connector.distributions;

71

72

import org.apache.spark.annotation.Experimental;

73

import org.apache.spark.sql.connector.expressions.SortOrder;

74

75

@Experimental

76

public interface OrderedDistribution extends Distribution {

77

/**

78

* Returns the sort orders that define the ordering across partitions

79

*/

80

SortOrder[] ordering();

81

}

82

```

83

84

**Usage:** Use when data is globally sorted across all partitions. This distribution is optimal for range-based operations and merge joins.

85

86

## Distribution Factory Methods

87

88

### Distributions

89

90

Helper class providing factory methods for creating distribution instances:

91

92

```java { .api }

93

package org.apache.spark.sql.connector.distributions;

94

95

import org.apache.spark.annotation.Experimental;

96

import org.apache.spark.sql.connector.expressions.Expression;

97

import org.apache.spark.sql.connector.expressions.SortOrder;

98

99

@Experimental

100

public class Distributions {

101

102

/**

103

* Creates an unspecified distribution

104

*/

105

public static UnspecifiedDistribution unspecified() {

106

return LogicalDistributions.unspecified();

107

}

108

109

/**

110

* Creates a clustered distribution with the specified clustering expressions

111

*/

112

public static ClusteredDistribution clustered(Expression[] clustering) {

113

return LogicalDistributions.clustered(clustering);

114

}

115

116

/**

117

* Creates an ordered distribution with the specified sort orders

118

*/

119

public static OrderedDistribution ordered(SortOrder[] ordering) {

120

return LogicalDistributions.ordered(ordering);

121

}

122

}

123

```

124

125

## Supporting Expression Types

126

127

### Expression

128

129

Base interface for all expressions used in distributions:

130

131

```java { .api }

132

package org.apache.spark.sql.connector.expressions;

133

134

import org.apache.spark.annotation.Evolving;

135

136

@Evolving

137

public interface Expression {

138

Expression[] EMPTY_EXPRESSION = new Expression[0];

139

NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];

140

141

/**

142

* Human-readable description of this expression

143

*/

144

String describe();

145

146

/**

147

* Child expressions of this expression

148

*/

149

Expression[] children();

150

151

/**

152

* Named references used by this expression

153

*/

154

NamedReference[] references();

155

}

156

```

157

158

### NamedReference

159

160

Reference to a named field or column:

161

162

```java { .api }

163

package org.apache.spark.sql.connector.expressions;

164

165

import org.apache.spark.annotation.Evolving;

166

167

@Evolving

168

public interface NamedReference extends Expression {

169

/**

170

* Field name path (supporting nested fields)

171

*/

172

String[] fieldNames();

173

}

174

```

175

176

### SortOrder

177

178

Represents a sort order used in ordered distributions:

179

180

```java { .api }

181

package org.apache.spark.sql.connector.expressions;

182

183

import org.apache.spark.annotation.Experimental;

184

185

@Experimental

186

public interface SortOrder extends Expression {

187

/**

188

* The expression to sort by

189

*/

190

Expression expression();

191

192

/**

193

* Sort direction (ascending or descending)

194

*/

195

SortDirection direction();

196

197

/**

198

* Null ordering behavior

199

*/

200

NullOrdering nullOrdering();

201

}

202

```

203

204

### SortDirection

205

206

Enumeration of sort directions:

207

208

```java { .api }

209

package org.apache.spark.sql.connector.expressions;

210

211

import org.apache.spark.annotation.Experimental;

212

213

@Experimental

214

public enum SortDirection {

215

ASCENDING(NullOrdering.NULLS_FIRST),

216

DESCENDING(NullOrdering.NULLS_LAST);

217

218

/**

219

* Default null ordering for this sort direction

220

*/

221

public NullOrdering defaultNullOrdering() {

222

return defaultNullOrdering;

223

}

224

}

225

```

226

227

### NullOrdering

228

229

Enumeration of null ordering behaviors:

230

231

```java { .api }

232

package org.apache.spark.sql.connector.expressions;

233

234

import org.apache.spark.annotation.Experimental;

235

236

@Experimental

237

public enum NullOrdering {

238

NULLS_FIRST,

239

NULLS_LAST

240

}

241

```

242

243

## Usage Examples

244

245

### Creating an Unspecified Distribution

246

247

```java

248

import org.apache.spark.sql.connector.distributions.Distribution;

249

import org.apache.spark.sql.connector.distributions.Distributions;

250

251

// For data sources that make no distribution guarantees

252

Distribution distribution = Distributions.unspecified();

253

```

254

255

### Creating a Clustered Distribution

256

257

```java

258

import org.apache.spark.sql.connector.distributions.ClusteredDistribution;

259

import org.apache.spark.sql.connector.distributions.Distributions;

260

import org.apache.spark.sql.connector.expressions.Expression;

261

import org.apache.spark.sql.connector.expressions.FieldReference;

262

263

// Cluster by customer_id and region columns

264

Expression[] clusteringExprs = new Expression[] {

265

FieldReference.column("customer_id"),

266

FieldReference.column("region")

267

};

268

269

ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);

270

```

271

272

### Creating an Ordered Distribution

273

274

```java

275

import org.apache.spark.sql.connector.distributions.OrderedDistribution;

276

import org.apache.spark.sql.connector.distributions.Distributions;

277

import org.apache.spark.sql.connector.expressions.SortOrder;

278

import org.apache.spark.sql.connector.expressions.SortDirection;

279

import org.apache.spark.sql.connector.expressions.NullOrdering;

280

import org.apache.spark.sql.connector.expressions.FieldReference;

281

282

// Order by timestamp descending, then by id ascending

283

SortOrder[] ordering = new SortOrder[] {

284

new SortOrderImpl(

285

FieldReference.column("timestamp"),

286

SortDirection.DESCENDING,

287

NullOrdering.NULLS_LAST

288

),

289

new SortOrderImpl(

290

FieldReference.column("id"),

291

SortDirection.ASCENDING,

292

NullOrdering.NULLS_FIRST

293

)

294

};

295

296

OrderedDistribution distribution = Distributions.ordered(ordering);

297

```

298

299

### Using Distributions in Data Source Implementation

300

301

```java

302

import org.apache.spark.sql.connector.read.ScanBuilder;

303

import org.apache.spark.sql.connector.read.Scan;

304

import org.apache.spark.sql.connector.distributions.Distribution;

305

import org.apache.spark.sql.connector.distributions.Distributions;

306

307

public class MyDataSourceScanBuilder implements ScanBuilder {

308

309

@Override

310

public Scan build() {

311

return new MyDataSourceScan();

312

}

313

314

private static class MyDataSourceScan implements Scan {

315

316

@Override

317

public Distribution outputDistribution() {

318

// Return the actual distribution of the data

319

// This helps Spark optimize query execution

320

321

if (isDataPartitionedByKey()) {

322

Expression[] partitionExprs = getPartitionExpressions();

323

return Distributions.clustered(partitionExprs);

324

} else if (isDataSorted()) {

325

SortOrder[] sortOrders = getSortOrders();

326

return Distributions.ordered(sortOrders);

327

} else {

328

return Distributions.unspecified();

329

}

330

}

331

332

// Other Scan methods...

333

}

334

}

335

```

336

337

### Complex Distribution Requirements

338

339

```java

340

import org.apache.spark.sql.connector.expressions.Expression;

341

import org.apache.spark.sql.connector.expressions.Transform;

342

import org.apache.spark.sql.connector.expressions.Expressions;

343

344

// Cluster by a transformed expression (e.g., hash bucket)

345

Expression bucketExpr = Expressions.bucket(10, "user_id");

346

Expression[] clusteringExprs = new Expression[] { bucketExpr };

347

ClusteredDistribution distribution = Distributions.clustered(clusteringExprs);

348

349

// Or cluster by multiple columns with different data types

350

Expression[] multiColumnClustering = new Expression[] {

351

FieldReference.column("year"), // Partition by year

352

FieldReference.column("month"), // Then by month

353

FieldReference.column("region") // Then by region

354

};

355

ClusteredDistribution complexDistribution = Distributions.clustered(multiColumnClustering);

356

```

357

358

## Import Statements

359

360

```java

361

// Core distribution interfaces

362

import org.apache.spark.sql.connector.distributions.Distribution;

363

import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;

364

import org.apache.spark.sql.connector.distributions.ClusteredDistribution;

365

import org.apache.spark.sql.connector.distributions.OrderedDistribution;

366

367

// Distribution factory

368

import org.apache.spark.sql.connector.distributions.Distributions;

369

370

// Expression interfaces for distribution definitions

371

import org.apache.spark.sql.connector.expressions.Expression;

372

import org.apache.spark.sql.connector.expressions.NamedReference;

373

import org.apache.spark.sql.connector.expressions.SortOrder;

374

import org.apache.spark.sql.connector.expressions.SortDirection;

375

import org.apache.spark.sql.connector.expressions.NullOrdering;

376

377

// Utility classes for creating expressions

378

import org.apache.spark.sql.connector.expressions.FieldReference;

379

import org.apache.spark.sql.connector.expressions.Expressions;

380

```

381

382

## Performance Considerations

383

384

### Clustered Distribution Benefits

385

- **Hash Joins**: When data is clustered by join keys, Spark can perform more efficient hash joins without shuffling

386

- **Aggregations**: Group-by operations on clustering columns avoid expensive shuffles

387

- **Partition Pruning**: Filters on clustering columns can eliminate entire partitions

388

389

### Ordered Distribution Benefits

390

- **Range Joins**: Enables efficient merge joins for range-based predicates

391

- **Sorting**: Eliminates the need for global sorting when data is already ordered

392

- **Top-K Operations**: Efficient execution of ORDER BY with LIMIT queries

393

394

### Best Practices

395

1. **Choose appropriate distribution**: Match the distribution to your query patterns

396

2. **Minimize clustering expressions**: Too many clustering columns can reduce effectiveness

397

3. **Consider data skew**: Ensure clustering expressions provide good data distribution

398

4. **Update distributions**: Keep distribution metadata in sync with actual data layout

399

400

## Integration with Catalyst Optimizer

401

402

The distributions API integrates seamlessly with Spark's Catalyst optimizer:

403

404

1. **Physical Plan Generation**: Distribution information influences physical operator selection

405

2. **Shuffle Elimination**: Proper distributions can eliminate unnecessary shuffle operations

406

3. **Join Strategy Selection**: Affects whether broadcast, hash, or merge joins are chosen

407

4. **Partition-wise Operations**: Enables partition-wise execution of operations when data is properly distributed

408

409

This API is essential for building high-performance data sources that can take full advantage of Spark's distributed computing capabilities.