or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-catalyst_2-12

Catalyst is a library for manipulating relational query plans within Apache Spark SQL

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-catalyst_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-catalyst_2-12@3.5.0

0

# Apache Spark Catalyst

1

2

Apache Spark Catalyst is the SQL engine and query optimization framework for Apache Spark. It provides a comprehensive set of APIs for building custom data sources, catalogs, expressions, and query optimizations.

3

4

## Package Information

5

6

- **Package Name**: spark-catalyst_2.12

7

- **Package Type**: maven

8

- **Language**: Scala/Java

9

- **Installation**: See installation examples below

10

11

## Installation

12

13

Add Catalyst to your project:

14

15

**Maven:**

16

```xml

17

<dependency>

18

<groupId>org.apache.spark</groupId>

19

<artifactId>spark-catalyst_2.12</artifactId>

20

<version>3.5.6</version>

21

</dependency>

22

```

23

24

**SBT:**

25

```scala

26

libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.5.6"

27

```

28

29

**Gradle:**

30

```gradle

31

implementation 'org.apache.spark:spark-catalyst_2.12:3.5.6'

32

```

33

34

## Core Imports

35

36

### Java Connector APIs (Stable Public APIs)

37

```java

38

// Catalog APIs

39

import org.apache.spark.sql.connector.catalog.*;

40

41

// Data Source V2 APIs

42

import org.apache.spark.sql.connector.read.*;

43

import org.apache.spark.sql.connector.write.*;

44

45

// Expression APIs

46

import org.apache.spark.sql.connector.expressions.*;

47

48

// Streaming APIs

49

import org.apache.spark.sql.connector.read.streaming.*;

50

import org.apache.spark.sql.connector.write.streaming.*;

51

52

// Utility classes

53

import org.apache.spark.sql.util.CaseInsensitiveStringMap;

54

import org.apache.spark.sql.vectorized.*;

55

```

56

57

### Scala Internal APIs (Advanced Extensions)

58

```scala

59

// Expression system

60

import org.apache.spark.sql.catalyst.expressions._

61

import org.apache.spark.sql.catalyst.expressions.codegen._

62

63

// Legacy Data Source V1

64

import org.apache.spark.sql.sources._

65

66

// Internal utilities

67

import org.apache.spark.sql.catalyst.InternalRow

68

import org.apache.spark.sql.catalyst.util._

69

```

70

71

## Basic Usage

72

73

### Creating a Custom Catalog

74

```java

75

public class MyCustomCatalog implements TableCatalog, SupportsNamespaces {

76

private String catalogName;

77

private CaseInsensitiveStringMap options;

78

79

@Override

80

public void initialize(String name, CaseInsensitiveStringMap options) {

81

this.catalogName = name;

82

this.options = options;

83

}

84

85

@Override

86

public String name() {

87

return catalogName;

88

}

89

90

@Override

91

public Identifier[] listTables(String[] namespace) {

92

// Implementation for listing tables

93

return new Identifier[0];

94

}

95

96

@Override

97

public Table loadTable(Identifier ident) {

98

// Implementation for loading table

99

return new MyCustomTable(ident);

100

}

101

102

// Additional method implementations...

103

}

104

```

105

106

### Implementing a Custom Data Source

107

```java

108

public class MyDataSource implements Table, SupportsRead, SupportsWrite {

109

private final String name;

110

private final StructType schema;

111

112

public MyDataSource(String name, StructType schema) {

113

this.name = name;

114

this.schema = schema;

115

}

116

117

@Override

118

public String name() {

119

return name;

120

}

121

122

@Override

123

public Column[] columns() {

124

// Convert StructType to Column array - implement custom conversion

125

return convertSchemaToColumns(schema);

126

}

127

128

private Column[] convertSchemaToColumns(StructType schema) {

129

return Arrays.stream(schema.fields())

130

.map(field -> new Column() {

131

@Override

132

public String name() { return field.name(); }

133

134

@Override

135

public DataType dataType() { return field.dataType(); }

136

137

@Override

138

public boolean nullable() { return field.nullable(); }

139

140

@Override

141

public String comment() { return field.getComment().orElse(null); }

142

143

@Override

144

public ColumnDefaultValue defaultValue() { return null; }

145

146

@Override

147

public MetadataColumn metadataColumn() { return null; }

148

})

149

.toArray(Column[]::new);

150

}

151

152

@Override

153

public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

154

return new MyScanBuilder(schema, options);

155

}

156

157

@Override

158

public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {

159

return new MyWriteBuilder(info);

160

}

161

162

@Override

163

public Set<TableCapability> capabilities() {

164

return Set.of(

165

TableCapability.BATCH_READ,

166

TableCapability.BATCH_WRITE,

167

TableCapability.ACCEPT_ANY_SCHEMA

168

);

169

}

170

}

171

```

172

173

### Creating Custom Expressions

174

```java

175

// Using the expression factory

176

Expression literalExpr = Expressions.literal(42);

177

NamedReference columnRef = Expressions.column("user_id");

178

Transform bucketTransform = Expressions.bucket(10, "user_id");

179

Transform yearTransform = Expressions.years("created_at");

180

181

// Creating complex expressions

182

Expression[] groupByExprs = new Expression[] {

183

Expressions.column("department"),

184

Expressions.years("hire_date")

185

};

186

```

187

188

### Working with Filters and Predicates

189

```java

190

// V2 filter predicates

191

Predicate equalsPredicate = new EqualTo(

192

Expressions.column("status"),

193

Expressions.literal("active")

194

);

195

196

Predicate rangePredicate = new And(

197

new GreaterThan(Expressions.column("age"), Expressions.literal(18)),

198

new LessThan(Expressions.column("age"), Expressions.literal(65))

199

);

200

201

// Legacy V1 filters (Scala)

202

val legacyFilter = EqualTo("status", "active")

203

```

204

205

## Architecture Overview

206

207

Catalyst is organized into several key components:

208

209

### 1. **Connector API Layer** (Java - Public)

210

- Provides stable, public APIs for external integrations

211

- Includes catalog, data source, expression, and streaming interfaces

212

- Designed for building custom data connectors and extensions

213

214

### 2. **Expression System** (Scala - Internal)

215

- Comprehensive framework for representing and evaluating expressions

216

- Supports code generation for high performance

217

- Extensible through custom expression implementations

218

219

### 3. **Query Planning and Optimization** (Scala - Internal)

220

- Tree-based representation of logical and physical plans

221

- Rule-based optimization framework

222

- Cost-based optimization capabilities

223

224

### 4. **Code Generation** (Scala - Internal)

225

- Just-in-time compilation of expressions and operators

226

- Optimized memory layouts (UnsafeRow)

227

- Vectorized processing support

228

229

## Key Concepts

230

231

### Table Capabilities

232

Tables declare their capabilities through the `TableCapability` enum:

233

234

```java { .api }

235

package org.apache.spark.sql.connector.catalog;

236

237

public enum TableCapability {

238

/**

239

* Signals that the table supports reads in batch execution mode.

240

*/

241

BATCH_READ,

242

243

/**

244

* Signals that the table supports reads in micro-batch streaming execution mode.

245

*/

246

MICRO_BATCH_READ,

247

248

/**

249

* Signals that the table supports reads in continuous streaming execution mode.

250

*/

251

CONTINUOUS_READ,

252

253

/**

254

* Signals that the table supports append writes in batch execution mode.

255

*/

256

BATCH_WRITE,

257

258

/**

259

* Signals that the table supports append writes in streaming execution mode.

260

*/

261

STREAMING_WRITE,

262

263

/**

264

* Signals that the table can be truncated in a write operation.

265

*/

266

TRUNCATE,

267

268

/**

269

* Signals that the table can replace existing data that matches a filter with appended data.

270

*/

271

OVERWRITE_BY_FILTER,

272

273

/**

274

* Signals that the table can dynamically replace existing data partitions with appended data.

275

*/

276

OVERWRITE_DYNAMIC,

277

278

/**

279

* Signals that the table accepts input of any schema in a write operation.

280

*/

281

ACCEPT_ANY_SCHEMA,

282

283

/**

284

* Signals that the table supports append writes using the V1 InsertableRelation interface.

285

*/

286

V1_BATCH_WRITE

287

}

288

```

289

290

### Pushdown Optimizations

291

Data sources can implement various pushdown interfaces to improve performance:

292

293

- **Filter Pushdown**: `SupportsPushDownFilters`, `SupportsPushDownV2Filters`

294

- **Column Pruning**: `SupportsPushDownRequiredColumns`

295

- **Aggregate Pushdown**: `SupportsPushDownAggregates`

296

- **Limit Pushdown**: `SupportsPushDownLimit`

297

- **Offset Pushdown**: `SupportsPushDownOffset`

298

- **TopN Pushdown**: `SupportsPushDownTopN`

299

300

### Expression Types

301

Catalyst supports various expression types:

302

303

- **Literals**: `Expressions.literal(value)`

304

- **Column References**: `Expressions.column(name)`

305

- **Transformations**: `Expressions.bucket()`, `Expressions.years()`

306

- **Aggregates**: `Count`, `Sum`, `Avg`, `Max`, `Min`

307

- **Predicates**: `EqualTo`, `GreaterThan`, `And`, `Or`, `Not`

308

309

### Data Distribution Requirements

310

Data sources can specify distribution requirements for optimal query execution:

311

312

```java { .api }

313

import org.apache.spark.sql.connector.distributions.*;

314

315

// Require data clustered by specific columns

316

Distribution clusteredDist = Distributions.clustered(

317

new NamedReference[] { Expressions.column("department") }

318

);

319

320

// Require data globally ordered

321

Distribution orderedDist = Distributions.ordered(

322

new SortOrder[] {

323

Expressions.sort(Expressions.column("timestamp"), SortDirection.DESCENDING)

324

}

325

);

326

```

327

328

### Custom Metrics Collection

329

Data sources can report custom metrics during query execution:

330

331

```java { .api }

332

import org.apache.spark.sql.connector.metric.*;

333

334

// Define custom metrics

335

CustomMetric recordsProcessed = new CustomSumMetric("recordsProcessed", "Records Processed");

336

CustomMetric avgRecordSize = new CustomAvgMetric("avgRecordSize", "Average Record Size");

337

338

// Report metrics from scan

339

public class MyScan implements Scan, SupportsReportStatistics {

340

@Override

341

public CustomMetric[] supportedCustomMetrics() {

342

return new CustomMetric[] { recordsProcessed, avgRecordSize };

343

}

344

}

345

```

346

347

## Type Definitions

348

349

### Supporting Types for Distribution and Ordering

350

351

```java { .api }

352

package org.apache.spark.sql.connector.expressions;

353

354

// Sort order specification

355

interface SortOrder extends Expression {

356

Expression expression();

357

SortDirection direction();

358

NullOrdering nullOrdering();

359

}

360

361

// Sort direction enumeration

362

enum SortDirection {

363

ASCENDING,

364

DESCENDING

365

}

366

367

// Null value ordering

368

enum NullOrdering {

369

NULLS_FIRST,

370

NULLS_LAST

371

}

372

373

// Factory methods for sort orders

374

class Expressions {

375

public static SortOrder sort(Expression expr, SortDirection direction);

376

public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nulls);

377

}

378

```

379

380

## API Documentation

381

382

This knowledge tile is organized into focused sections covering different aspects of the Catalyst API:

383

384

### Core APIs

385

- **[Catalog APIs](./catalog-apis.md)** - Complete catalog management interfaces

386

- **[Data Source V2 APIs](./data-source-v2-apis.md)** - Modern data source implementation APIs

387

- **[Expression APIs](./expression-apis.md)** - Expression system and custom expression development

388

389

### Specialized APIs

390

- **[Streaming APIs](./streaming-apis.md)** - Real-time data processing interfaces

391

- **[Vectorized Processing](./vectorized-processing.md)** - High-performance columnar processing

392

- **[Legacy Data Source V1](./legacy-data-source-v1.md)** - Legacy filter and data source APIs

393

- **[Distributions API](./distributions-api.md)** - Data distribution requirements for query optimization

394

- **[Metrics API](./metrics-api.md)** - Custom metrics collection and reporting

395

396

### Utilities and Helpers

397

- **[Utilities and Helpers](./utilities-helpers.md)** - Common utilities, configurations, and helper classes

398

399

## API Stability

400

401

### Stable APIs (Recommended)

402

- **Java Connector APIs** (`org.apache.spark.sql.connector.*`) - These are the primary public APIs with backward compatibility guarantees

403

- All interfaces marked with `@Evolving` - May change between versions but with compatibility considerations

404

405

### Internal APIs (Advanced Use)

406

- **Scala Catalyst APIs** (`org.apache.spark.sql.catalyst.*`) - Internal APIs that may change without notice

407

- Use these for advanced extensions with the understanding of potential breaking changes

408

409

## Performance Considerations

410

411

### Vectorized Processing

412

For high-performance data processing, implement vectorized operations:

413

414

```java

415

public class MyVectorizedReader implements PartitionReader<ColumnarBatch> {

416

@Override

417

public ColumnarBatch get() {

418

// Return columnar batch instead of individual rows

419

ColumnVector[] columns = createColumnVectors();

420

return new ColumnarBatch(columns, numRows);

421

}

422

}

423

```

424

425

### Code Generation

426

For custom expressions, consider implementing code generation:

427

428

```scala

429

case class MyCustomExpression(child: Expression) extends UnaryExpression {

430

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {

431

// Generate optimized code for expression evaluation

432

val childGen = child.genCode(ctx)

433

// Custom code generation logic...

434

}

435

}

436

```

437

438

### Memory Management

439

Use Catalyst's memory-efficient data structures:

440

441

```java

442

// Use UnsafeRow for memory-efficient row representation

443

UnsafeRow row = new UnsafeRow(numFields);

444

UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, numFields);

445

```

446

447

## Version Compatibility

448

449

This documentation covers **Apache Spark Catalyst 3.5.6**. The Connector APIs provide the most stable interface across versions, while internal Catalyst APIs may change between releases.

450

451

### Migration Notes

452

- Prefer Data Source V2 APIs over legacy V1 APIs

453

- Use Java Connector APIs for maximum stability

454

- Implement capability-based interfaces for forward compatibility

455

456

## Next Steps

457

458

1. **Start with Catalog APIs** if building custom catalogs

459

2. **Explore Data Source V2 APIs** for custom data sources

460

3. **Review Expression APIs** for custom functions and transforms

461

4. **Check Streaming APIs** for real-time processing needs

462

5. **Consider Vectorized Processing** for high-performance requirements

463

464

Each section provides comprehensive API coverage, usage examples, and implementation guidance for building robust Spark extensions.