Catalyst is a library for manipulating relational query plans within Apache Spark SQL
npx @tessl/cli install tessl/maven-org-apache-spark--spark-catalyst_2-12@3.5.00
# Apache Spark Catalyst
1
2
Apache Spark Catalyst is the SQL engine and query optimization framework for Apache Spark. It provides a comprehensive set of APIs for building custom data sources, catalogs, expressions, and query optimizations.
3
4
## Package Information
5
6
- **Package Name**: spark-catalyst_2.12
7
- **Package Type**: maven
8
- **Language**: Scala/Java
9
- **Installation**: See installation examples below
10
11
## Installation
12
13
Add Catalyst to your project:
14
15
**Maven:**
16
```xml
17
<dependency>
18
<groupId>org.apache.spark</groupId>
19
<artifactId>spark-catalyst_2.12</artifactId>
20
<version>3.5.6</version>
21
</dependency>
22
```
23
24
**SBT:**
25
```scala
26
libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.5.6"
27
```
28
29
**Gradle:**
30
```gradle
31
implementation 'org.apache.spark:spark-catalyst_2.12:3.5.6'
32
```
33
34
## Core Imports
35
36
### Java Connector APIs (Stable Public APIs)
37
```java
38
// Catalog APIs
39
import org.apache.spark.sql.connector.catalog.*;
40
41
// Data Source V2 APIs
42
import org.apache.spark.sql.connector.read.*;
43
import org.apache.spark.sql.connector.write.*;
44
45
// Expression APIs
46
import org.apache.spark.sql.connector.expressions.*;
47
48
// Streaming APIs
49
import org.apache.spark.sql.connector.read.streaming.*;
50
import org.apache.spark.sql.connector.write.streaming.*;
51
52
// Utility classes
53
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
54
import org.apache.spark.sql.vectorized.*;
55
```
56
57
### Scala Internal APIs (Advanced Extensions)
58
```scala
59
// Expression system
60
import org.apache.spark.sql.catalyst.expressions._
61
import org.apache.spark.sql.catalyst.expressions.codegen._
62
63
// Legacy Data Source V1
64
import org.apache.spark.sql.sources._
65
66
// Internal utilities
67
import org.apache.spark.sql.catalyst.InternalRow
68
import org.apache.spark.sql.catalyst.util._
69
```
70
71
## Basic Usage
72
73
### Creating a Custom Catalog
74
```java
75
public class MyCustomCatalog implements TableCatalog, SupportsNamespaces {
76
private String catalogName;
77
private CaseInsensitiveStringMap options;
78
79
@Override
80
public void initialize(String name, CaseInsensitiveStringMap options) {
81
this.catalogName = name;
82
this.options = options;
83
}
84
85
@Override
86
public String name() {
87
return catalogName;
88
}
89
90
@Override
91
public Identifier[] listTables(String[] namespace) {
92
// Implementation for listing tables
93
return new Identifier[0];
94
}
95
96
@Override
97
public Table loadTable(Identifier ident) {
98
// Implementation for loading table
99
return new MyCustomTable(ident);
100
}
101
102
// Additional method implementations...
103
}
104
```
105
106
### Implementing a Custom Data Source
107
```java
108
public class MyDataSource implements Table, SupportsRead, SupportsWrite {
109
private final String name;
110
private final StructType schema;
111
112
public MyDataSource(String name, StructType schema) {
113
this.name = name;
114
this.schema = schema;
115
}
116
117
@Override
118
public String name() {
119
return name;
120
}
121
122
@Override
123
public Column[] columns() {
124
// Convert StructType to Column array - implement custom conversion
125
return convertSchemaToColumns(schema);
126
}
127
128
private Column[] convertSchemaToColumns(StructType schema) {
129
return Arrays.stream(schema.fields())
130
.map(field -> new Column() {
131
@Override
132
public String name() { return field.name(); }
133
134
@Override
135
public DataType dataType() { return field.dataType(); }
136
137
@Override
138
public boolean nullable() { return field.nullable(); }
139
140
@Override
141
public String comment() { return field.getComment().orElse(null); }
142
143
@Override
144
public ColumnDefaultValue defaultValue() { return null; }
145
146
@Override
147
public MetadataColumn metadataColumn() { return null; }
148
})
149
.toArray(Column[]::new);
150
}
151
152
@Override
153
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
154
return new MyScanBuilder(schema, options);
155
}
156
157
@Override
158
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
159
return new MyWriteBuilder(info);
160
}
161
162
@Override
163
public Set<TableCapability> capabilities() {
164
return Set.of(
165
TableCapability.BATCH_READ,
166
TableCapability.BATCH_WRITE,
167
TableCapability.ACCEPT_ANY_SCHEMA
168
);
169
}
170
}
171
```
172
173
### Creating Custom Expressions
174
```java
175
// Using the expression factory
176
Expression literalExpr = Expressions.literal(42);
177
NamedReference columnRef = Expressions.column("user_id");
178
Transform bucketTransform = Expressions.bucket(10, "user_id");
179
Transform yearTransform = Expressions.years("created_at");
180
181
// Creating complex expressions
182
Expression[] groupByExprs = new Expression[] {
183
Expressions.column("department"),
184
Expressions.years("hire_date")
185
};
186
```
187
188
### Working with Filters and Predicates
189
```java
190
// V2 filter predicates
191
Predicate equalsPredicate = new EqualTo(
192
Expressions.column("status"),
193
Expressions.literal("active")
194
);
195
196
Predicate rangePredicate = new And(
197
new GreaterThan(Expressions.column("age"), Expressions.literal(18)),
198
new LessThan(Expressions.column("age"), Expressions.literal(65))
199
);
200
201
// Legacy V1 filters (Scala)
202
val legacyFilter = EqualTo("status", "active")
203
```
204
205
## Architecture Overview
206
207
Catalyst is organized into several key components:
208
209
### 1. **Connector API Layer** (Java - Public)
210
- Provides stable, public APIs for external integrations
211
- Includes catalog, data source, expression, and streaming interfaces
212
- Designed for building custom data connectors and extensions
213
214
### 2. **Expression System** (Scala - Internal)
215
- Comprehensive framework for representing and evaluating expressions
216
- Supports code generation for high performance
217
- Extensible through custom expression implementations
218
219
### 3. **Query Planning and Optimization** (Scala - Internal)
220
- Tree-based representation of logical and physical plans
221
- Rule-based optimization framework
222
- Cost-based optimization capabilities
223
224
### 4. **Code Generation** (Scala - Internal)
225
- Just-in-time compilation of expressions and operators
226
- Optimized memory layouts (UnsafeRow)
227
- Vectorized processing support
228
229
## Key Concepts
230
231
### Table Capabilities
232
Tables declare their capabilities through the `TableCapability` enum:
233
234
```java { .api }
235
package org.apache.spark.sql.connector.catalog;
236
237
public enum TableCapability {
238
/**
239
* Signals that the table supports reads in batch execution mode.
240
*/
241
BATCH_READ,
242
243
/**
244
* Signals that the table supports reads in micro-batch streaming execution mode.
245
*/
246
MICRO_BATCH_READ,
247
248
/**
249
* Signals that the table supports reads in continuous streaming execution mode.
250
*/
251
CONTINUOUS_READ,
252
253
/**
254
* Signals that the table supports append writes in batch execution mode.
255
*/
256
BATCH_WRITE,
257
258
/**
259
* Signals that the table supports append writes in streaming execution mode.
260
*/
261
STREAMING_WRITE,
262
263
/**
264
* Signals that the table can be truncated in a write operation.
265
*/
266
TRUNCATE,
267
268
/**
269
* Signals that the table can replace existing data that matches a filter with appended data.
270
*/
271
OVERWRITE_BY_FILTER,
272
273
/**
274
* Signals that the table can dynamically replace existing data partitions with appended data.
275
*/
276
OVERWRITE_DYNAMIC,
277
278
/**
279
* Signals that the table accepts input of any schema in a write operation.
280
*/
281
ACCEPT_ANY_SCHEMA,
282
283
/**
284
* Signals that the table supports append writes using the V1 InsertableRelation interface.
285
*/
286
V1_BATCH_WRITE
287
}
288
```
289
290
### Pushdown Optimizations
291
Data sources can implement various pushdown interfaces to improve performance:
292
293
- **Filter Pushdown**: `SupportsPushDownFilters`, `SupportsPushDownV2Filters`
294
- **Column Pruning**: `SupportsPushDownRequiredColumns`
295
- **Aggregate Pushdown**: `SupportsPushDownAggregates`
296
- **Limit Pushdown**: `SupportsPushDownLimit`
297
- **Offset Pushdown**: `SupportsPushDownOffset`
298
- **TopN Pushdown**: `SupportsPushDownTopN`
299
300
### Expression Types
301
Catalyst supports various expression types:
302
303
- **Literals**: `Expressions.literal(value)`
304
- **Column References**: `Expressions.column(name)`
305
- **Transformations**: `Expressions.bucket()`, `Expressions.years()`
306
- **Aggregates**: `Count`, `Sum`, `Avg`, `Max`, `Min`
307
- **Predicates**: `EqualTo`, `GreaterThan`, `And`, `Or`, `Not`
308
309
### Data Distribution Requirements
310
Data sources can specify distribution requirements for optimal query execution:
311
312
```java { .api }
313
import org.apache.spark.sql.connector.distributions.*;
314
315
// Require data clustered by specific columns
316
Distribution clusteredDist = Distributions.clustered(
317
new NamedReference[] { Expressions.column("department") }
318
);
319
320
// Require data globally ordered
321
Distribution orderedDist = Distributions.ordered(
322
new SortOrder[] {
323
Expressions.sort(Expressions.column("timestamp"), SortDirection.DESCENDING)
324
}
325
);
326
```
327
328
### Custom Metrics Collection
329
Data sources can report custom metrics during query execution:
330
331
```java { .api }
332
import org.apache.spark.sql.connector.metric.*;
333
334
// Define custom metrics
335
CustomMetric recordsProcessed = new CustomSumMetric("recordsProcessed", "Records Processed");
336
CustomMetric avgRecordSize = new CustomAvgMetric("avgRecordSize", "Average Record Size");
337
338
// Report metrics from scan
339
public class MyScan implements Scan, SupportsReportStatistics {
340
@Override
341
public CustomMetric[] supportedCustomMetrics() {
342
return new CustomMetric[] { recordsProcessed, avgRecordSize };
343
}
344
}
345
```
346
347
## Type Definitions
348
349
### Supporting Types for Distribution and Ordering
350
351
```java { .api }
352
package org.apache.spark.sql.connector.expressions;
353
354
// Sort order specification
355
interface SortOrder extends Expression {
356
Expression expression();
357
SortDirection direction();
358
NullOrdering nullOrdering();
359
}
360
361
// Sort direction enumeration
362
enum SortDirection {
363
ASCENDING,
364
DESCENDING
365
}
366
367
// Null value ordering
368
enum NullOrdering {
369
NULLS_FIRST,
370
NULLS_LAST
371
}
372
373
// Factory methods for sort orders
374
class Expressions {
375
public static SortOrder sort(Expression expr, SortDirection direction);
376
public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nulls);
377
}
378
```
379
380
## API Documentation
381
382
This knowledge tile is organized into focused sections covering different aspects of the Catalyst API:
383
384
### Core APIs
385
- **[Catalog APIs](./catalog-apis.md)** - Complete catalog management interfaces
386
- **[Data Source V2 APIs](./data-source-v2-apis.md)** - Modern data source implementation APIs
387
- **[Expression APIs](./expression-apis.md)** - Expression system and custom expression development
388
389
### Specialized APIs
390
- **[Streaming APIs](./streaming-apis.md)** - Real-time data processing interfaces
391
- **[Vectorized Processing](./vectorized-processing.md)** - High-performance columnar processing
392
- **[Legacy Data Source V1](./legacy-data-source-v1.md)** - Legacy filter and data source APIs
393
- **[Distributions API](./distributions-api.md)** - Data distribution requirements for query optimization
394
- **[Metrics API](./metrics-api.md)** - Custom metrics collection and reporting
395
396
### Utilities and Helpers
397
- **[Utilities and Helpers](./utilities-helpers.md)** - Common utilities, configurations, and helper classes
398
399
## API Stability
400
401
### Stable APIs (Recommended)
402
- **Java Connector APIs** (`org.apache.spark.sql.connector.*`) - These are the primary public APIs with backward compatibility guarantees
403
- All interfaces marked with `@Evolving` - May change between versions but with compatibility considerations
404
405
### Internal APIs (Advanced Use)
406
- **Scala Catalyst APIs** (`org.apache.spark.sql.catalyst.*`) - Internal APIs that may change without notice
407
- Use these for advanced extensions with the understanding of potential breaking changes
408
409
## Performance Considerations
410
411
### Vectorized Processing
412
For high-performance data processing, implement vectorized operations:
413
414
```java
415
public class MyVectorizedReader implements PartitionReader<ColumnarBatch> {
416
@Override
417
public ColumnarBatch get() {
418
// Return columnar batch instead of individual rows
419
ColumnVector[] columns = createColumnVectors();
420
return new ColumnarBatch(columns, numRows);
421
}
422
}
423
```
424
425
### Code Generation
426
For custom expressions, consider implementing code generation:
427
428
```scala
429
case class MyCustomExpression(child: Expression) extends UnaryExpression {
430
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
431
// Generate optimized code for expression evaluation
432
val childGen = child.genCode(ctx)
433
// Custom code generation logic...
434
}
435
}
436
```
437
438
### Memory Management
439
Use Catalyst's memory-efficient data structures:
440
441
```java
442
// Use UnsafeRow for memory-efficient row representation
443
UnsafeRow row = new UnsafeRow(numFields);
444
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, numFields);
445
```
446
447
## Version Compatibility
448
449
This documentation covers **Apache Spark Catalyst 3.5.6**. The Connector APIs provide the most stable interface across versions, while internal Catalyst APIs may change between releases.
450
451
### Migration Notes
452
- Prefer Data Source V2 APIs over legacy V1 APIs
453
- Use Java Connector APIs for maximum stability
454
- Implement capability-based interfaces for forward compatibility
455
456
## Next Steps
457
458
1. **Start with Catalog APIs** if building custom catalogs
459
2. **Explore Data Source V2 APIs** for custom data sources
460
3. **Review Expression APIs** for custom functions and transforms
461
4. **Check Streaming APIs** for real-time processing needs
462
5. **Consider Vectorized Processing** for high-performance requirements
463
464
Each section provides comprehensive API coverage, usage examples, and implementation guidance for building robust Spark extensions.