Catalyst is a library for manipulating relational query plans within Apache Spark SQL
npx @tessl/cli install tessl/maven-org-apache-spark--spark-catalyst_2-12@3.5.0Apache Spark Catalyst is the SQL engine and query optimization framework for Apache Spark. It provides a comprehensive set of APIs for building custom data sources, catalogs, expressions, and query optimizations.
Add Catalyst to your project:
Maven:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.5.6</version>
</dependency>SBT:
libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.5.6"Gradle:
implementation 'org.apache.spark:spark-catalyst_2.12:3.5.6'// Catalog APIs
import org.apache.spark.sql.connector.catalog.*;
// Data Source V2 APIs
import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.write.*;
// Expression APIs
import org.apache.spark.sql.connector.expressions.*;
// Streaming APIs
import org.apache.spark.sql.connector.read.streaming.*;
import org.apache.spark.sql.connector.write.streaming.*;
// Utility classes
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.sql.vectorized.*;// Expression system
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
// Legacy Data Source V1
import org.apache.spark.sql.sources._
// Internal utilities
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util._public class MyCustomCatalog implements TableCatalog, SupportsNamespaces {
private String catalogName;
private CaseInsensitiveStringMap options;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
this.options = options;
}
@Override
public String name() {
return catalogName;
}
@Override
public Identifier[] listTables(String[] namespace) {
// Implementation for listing tables
return new Identifier[0];
}
@Override
public Table loadTable(Identifier ident) {
// Implementation for loading table
return new MyCustomTable(ident);
}
// Additional method implementations...
}public class MyDataSource implements Table, SupportsRead, SupportsWrite {
private final String name;
private final StructType schema;
public MyDataSource(String name, StructType schema) {
this.name = name;
this.schema = schema;
}
@Override
public String name() {
return name;
}
@Override
public Column[] columns() {
// Convert StructType to Column array - implement custom conversion
return convertSchemaToColumns(schema);
}
private Column[] convertSchemaToColumns(StructType schema) {
return Arrays.stream(schema.fields())
.map(field -> new Column() {
@Override
public String name() { return field.name(); }
@Override
public DataType dataType() { return field.dataType(); }
@Override
public boolean nullable() { return field.nullable(); }
@Override
public String comment() { return field.getComment().orElse(null); }
@Override
public ColumnDefaultValue defaultValue() { return null; }
@Override
public MetadataColumn metadataColumn() { return null; }
})
.toArray(Column[]::new);
}
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
return new MyScanBuilder(schema, options);
}
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new MyWriteBuilder(info);
}
@Override
public Set<TableCapability> capabilities() {
return Set.of(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA
);
}
}// Using the expression factory
Expression literalExpr = Expressions.literal(42);
NamedReference columnRef = Expressions.column("user_id");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");
// Creating complex expressions
Expression[] groupByExprs = new Expression[] {
Expressions.column("department"),
Expressions.years("hire_date")
};// V2 filter predicates
Predicate equalsPredicate = new EqualTo(
Expressions.column("status"),
Expressions.literal("active")
);
Predicate rangePredicate = new And(
new GreaterThan(Expressions.column("age"), Expressions.literal(18)),
new LessThan(Expressions.column("age"), Expressions.literal(65))
);
// Legacy V1 filters (Scala)
val legacyFilter = EqualTo("status", "active")Catalyst is organized into several key components:
Tables declare their capabilities through the TableCapability enum:
package org.apache.spark.sql.connector.catalog;
public enum TableCapability {
/**
* Signals that the table supports reads in batch execution mode.
*/
BATCH_READ,
/**
* Signals that the table supports reads in micro-batch streaming execution mode.
*/
MICRO_BATCH_READ,
/**
* Signals that the table supports reads in continuous streaming execution mode.
*/
CONTINUOUS_READ,
/**
* Signals that the table supports append writes in batch execution mode.
*/
BATCH_WRITE,
/**
* Signals that the table supports append writes in streaming execution mode.
*/
STREAMING_WRITE,
/**
* Signals that the table can be truncated in a write operation.
*/
TRUNCATE,
/**
* Signals that the table can replace existing data that matches a filter with appended data.
*/
OVERWRITE_BY_FILTER,
/**
* Signals that the table can dynamically replace existing data partitions with appended data.
*/
OVERWRITE_DYNAMIC,
/**
* Signals that the table accepts input of any schema in a write operation.
*/
ACCEPT_ANY_SCHEMA,
/**
* Signals that the table supports append writes using the V1 InsertableRelation interface.
*/
V1_BATCH_WRITE
}Data sources can implement various pushdown interfaces to improve performance:
SupportsPushDownFilters, SupportsPushDownV2FiltersSupportsPushDownRequiredColumnsSupportsPushDownAggregatesSupportsPushDownLimitSupportsPushDownOffsetSupportsPushDownTopNCatalyst supports various expression types:
Expressions.literal(value)Expressions.column(name)Expressions.bucket(), Expressions.years()Count, Sum, Avg, Max, MinEqualTo, GreaterThan, And, Or, NotData sources can specify distribution requirements for optimal query execution:
import org.apache.spark.sql.connector.distributions.*;
// Require data clustered by specific columns
Distribution clusteredDist = Distributions.clustered(
new NamedReference[] { Expressions.column("department") }
);
// Require data globally ordered
Distribution orderedDist = Distributions.ordered(
new SortOrder[] {
Expressions.sort(Expressions.column("timestamp"), SortDirection.DESCENDING)
}
);Data sources can report custom metrics during query execution:
import org.apache.spark.sql.connector.metric.*;
// Define custom metrics
CustomMetric recordsProcessed = new CustomSumMetric("recordsProcessed", "Records Processed");
CustomMetric avgRecordSize = new CustomAvgMetric("avgRecordSize", "Average Record Size");
// Report metrics from scan
public class MyScan implements Scan, SupportsReportStatistics {
@Override
public CustomMetric[] supportedCustomMetrics() {
return new CustomMetric[] { recordsProcessed, avgRecordSize };
}
}package org.apache.spark.sql.connector.expressions;
// Sort order specification
interface SortOrder extends Expression {
Expression expression();
SortDirection direction();
NullOrdering nullOrdering();
}
// Sort direction enumeration
enum SortDirection {
ASCENDING,
DESCENDING
}
// Null value ordering
enum NullOrdering {
NULLS_FIRST,
NULLS_LAST
}
// Factory methods for sort orders
class Expressions {
public static SortOrder sort(Expression expr, SortDirection direction);
public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nulls);
}This knowledge tile is organized into focused sections covering different aspects of the Catalyst API:
org.apache.spark.sql.connector.*) - These are the primary public APIs with backward compatibility guarantees@Evolving - May change between versions but with compatibility considerationsorg.apache.spark.sql.catalyst.*) - Internal APIs that may change without noticeFor high-performance data processing, implement vectorized operations:
public class MyVectorizedReader implements PartitionReader<ColumnarBatch> {
@Override
public ColumnarBatch get() {
// Return columnar batch instead of individual rows
ColumnVector[] columns = createColumnVectors();
return new ColumnarBatch(columns, numRows);
}
}For custom expressions, consider implementing code generation:
case class MyCustomExpression(child: Expression) extends UnaryExpression {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Generate optimized code for expression evaluation
val childGen = child.genCode(ctx)
// Custom code generation logic...
}
}Use Catalyst's memory-efficient data structures:
// Use UnsafeRow for memory-efficient row representation
UnsafeRow row = new UnsafeRow(numFields);
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, numFields);This documentation covers Apache Spark Catalyst 3.5.6. The Connector APIs provide the most stable interface across versions, while internal Catalyst APIs may change between releases.
Each section provides comprehensive API coverage, usage examples, and implementation guidance for building robust Spark extensions.