or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog-apis.mddata-source-v2-apis.mddistributions-api.mdexpression-apis.mdindex.mdlegacy-data-source-v1.mdmetrics-api.mdstreaming-apis.mdutilities-helpers.mdvectorized-processing.md
tile.json

tessl/maven-org-apache-spark--spark-catalyst_2-12

Catalyst is a library for manipulating relational query plans within Apache Spark SQL

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-catalyst_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-catalyst_2-12@3.5.0

index.mddocs/

Apache Spark Catalyst

Apache Spark Catalyst is the SQL engine and query optimization framework for Apache Spark. It provides a comprehensive set of APIs for building custom data sources, catalogs, expressions, and query optimizations.

Package Information

  • Package Name: spark-catalyst_2.12
  • Package Type: maven
  • Language: Scala/Java
  • Installation: See installation examples below

Installation

Add Catalyst to your project:

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-catalyst_2.12</artifactId>
    <version>3.5.6</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" %% "spark-catalyst" % "3.5.6"

Gradle:

implementation 'org.apache.spark:spark-catalyst_2.12:3.5.6'

Core Imports

Java Connector APIs (Stable Public APIs)

// Catalog APIs
import org.apache.spark.sql.connector.catalog.*;

// Data Source V2 APIs  
import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.write.*;

// Expression APIs
import org.apache.spark.sql.connector.expressions.*;

// Streaming APIs
import org.apache.spark.sql.connector.read.streaming.*;
import org.apache.spark.sql.connector.write.streaming.*;

// Utility classes
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.sql.vectorized.*;

Scala Internal APIs (Advanced Extensions)

// Expression system
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._

// Legacy Data Source V1
import org.apache.spark.sql.sources._

// Internal utilities
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util._

Basic Usage

Creating a Custom Catalog

public class MyCustomCatalog implements TableCatalog, SupportsNamespaces {
    private String catalogName;
    private CaseInsensitiveStringMap options;
    
    @Override
    public void initialize(String name, CaseInsensitiveStringMap options) {
        this.catalogName = name;
        this.options = options;
    }
    
    @Override
    public String name() {
        return catalogName;
    }
    
    @Override
    public Identifier[] listTables(String[] namespace) {
        // Implementation for listing tables
        return new Identifier[0];
    }
    
    @Override
    public Table loadTable(Identifier ident) {
        // Implementation for loading table
        return new MyCustomTable(ident);
    }
    
    // Additional method implementations...
}

Implementing a Custom Data Source

public class MyDataSource implements Table, SupportsRead, SupportsWrite {
    private final String name;
    private final StructType schema;
    
    public MyDataSource(String name, StructType schema) {
        this.name = name;
        this.schema = schema;
    }
    
    @Override
    public String name() {
        return name;
    }
    
    @Override
    public Column[] columns() {
        // Convert StructType to Column array - implement custom conversion
        return convertSchemaToColumns(schema);
    }
    
    private Column[] convertSchemaToColumns(StructType schema) {
        return Arrays.stream(schema.fields())
            .map(field -> new Column() {
                @Override
                public String name() { return field.name(); }
                
                @Override
                public DataType dataType() { return field.dataType(); }
                
                @Override
                public boolean nullable() { return field.nullable(); }
                
                @Override
                public String comment() { return field.getComment().orElse(null); }
                
                @Override
                public ColumnDefaultValue defaultValue() { return null; }
                
                @Override
                public MetadataColumn metadataColumn() { return null; }
            })
            .toArray(Column[]::new);
    }
    
    @Override
    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
        return new MyScanBuilder(schema, options);
    }
    
    @Override
    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
        return new MyWriteBuilder(info);
    }
    
    @Override
    public Set<TableCapability> capabilities() {
        return Set.of(
            TableCapability.BATCH_READ,
            TableCapability.BATCH_WRITE,
            TableCapability.ACCEPT_ANY_SCHEMA
        );
    }
}

Creating Custom Expressions

// Using the expression factory
Expression literalExpr = Expressions.literal(42);
NamedReference columnRef = Expressions.column("user_id");
Transform bucketTransform = Expressions.bucket(10, "user_id");
Transform yearTransform = Expressions.years("created_at");

// Creating complex expressions
Expression[] groupByExprs = new Expression[] {
    Expressions.column("department"),
    Expressions.years("hire_date")
};

Working with Filters and Predicates

// V2 filter predicates
Predicate equalsPredicate = new EqualTo(
    Expressions.column("status"), 
    Expressions.literal("active")
);

Predicate rangePredicate = new And(
    new GreaterThan(Expressions.column("age"), Expressions.literal(18)),
    new LessThan(Expressions.column("age"), Expressions.literal(65))
);

// Legacy V1 filters (Scala)
val legacyFilter = EqualTo("status", "active")

Architecture Overview

Catalyst is organized into several key components:

1. Connector API Layer (Java - Public)

  • Provides stable, public APIs for external integrations
  • Includes catalog, data source, expression, and streaming interfaces
  • Designed for building custom data connectors and extensions

2. Expression System (Scala - Internal)

  • Comprehensive framework for representing and evaluating expressions
  • Supports code generation for high performance
  • Extensible through custom expression implementations

3. Query Planning and Optimization (Scala - Internal)

  • Tree-based representation of logical and physical plans
  • Rule-based optimization framework
  • Cost-based optimization capabilities

4. Code Generation (Scala - Internal)

  • Just-in-time compilation of expressions and operators
  • Optimized memory layouts (UnsafeRow)
  • Vectorized processing support

Key Concepts

Table Capabilities

Tables declare their capabilities through the TableCapability enum:

package org.apache.spark.sql.connector.catalog;

public enum TableCapability {
    /**
     * Signals that the table supports reads in batch execution mode.
     */
    BATCH_READ,

    /**
     * Signals that the table supports reads in micro-batch streaming execution mode.
     */
    MICRO_BATCH_READ,

    /**
     * Signals that the table supports reads in continuous streaming execution mode.
     */
    CONTINUOUS_READ,

    /**
     * Signals that the table supports append writes in batch execution mode.
     */
    BATCH_WRITE,

    /**
     * Signals that the table supports append writes in streaming execution mode.
     */
    STREAMING_WRITE,

    /**
     * Signals that the table can be truncated in a write operation.
     */
    TRUNCATE,

    /**
     * Signals that the table can replace existing data that matches a filter with appended data.
     */
    OVERWRITE_BY_FILTER,

    /**
     * Signals that the table can dynamically replace existing data partitions with appended data.
     */
    OVERWRITE_DYNAMIC,

    /**
     * Signals that the table accepts input of any schema in a write operation.
     */
    ACCEPT_ANY_SCHEMA,

    /**
     * Signals that the table supports append writes using the V1 InsertableRelation interface.
     */
    V1_BATCH_WRITE
}

Pushdown Optimizations

Data sources can implement various pushdown interfaces to improve performance:

  • Filter Pushdown: SupportsPushDownFilters, SupportsPushDownV2Filters
  • Column Pruning: SupportsPushDownRequiredColumns
  • Aggregate Pushdown: SupportsPushDownAggregates
  • Limit Pushdown: SupportsPushDownLimit
  • Offset Pushdown: SupportsPushDownOffset
  • TopN Pushdown: SupportsPushDownTopN

Expression Types

Catalyst supports various expression types:

  • Literals: Expressions.literal(value)
  • Column References: Expressions.column(name)
  • Transformations: Expressions.bucket(), Expressions.years()
  • Aggregates: Count, Sum, Avg, Max, Min
  • Predicates: EqualTo, GreaterThan, And, Or, Not

Data Distribution Requirements

Data sources can specify distribution requirements for optimal query execution:

import org.apache.spark.sql.connector.distributions.*;

// Require data clustered by specific columns
Distribution clusteredDist = Distributions.clustered(
    new NamedReference[] { Expressions.column("department") }
);

// Require data globally ordered
Distribution orderedDist = Distributions.ordered(
    new SortOrder[] { 
        Expressions.sort(Expressions.column("timestamp"), SortDirection.DESCENDING)
    }
);

Custom Metrics Collection

Data sources can report custom metrics during query execution:

import org.apache.spark.sql.connector.metric.*;

// Define custom metrics
CustomMetric recordsProcessed = new CustomSumMetric("recordsProcessed", "Records Processed");
CustomMetric avgRecordSize = new CustomAvgMetric("avgRecordSize", "Average Record Size");

// Report metrics from scan
public class MyScan implements Scan, SupportsReportStatistics {
    @Override
    public CustomMetric[] supportedCustomMetrics() {
        return new CustomMetric[] { recordsProcessed, avgRecordSize };
    }
}

Type Definitions

Supporting Types for Distribution and Ordering

package org.apache.spark.sql.connector.expressions;

// Sort order specification
interface SortOrder extends Expression {
    Expression expression();
    SortDirection direction();
    NullOrdering nullOrdering();
}

// Sort direction enumeration
enum SortDirection {
    ASCENDING,
    DESCENDING
}

// Null value ordering
enum NullOrdering {
    NULLS_FIRST,
    NULLS_LAST
}

// Factory methods for sort orders
class Expressions {
    public static SortOrder sort(Expression expr, SortDirection direction);
    public static SortOrder sort(Expression expr, SortDirection direction, NullOrdering nulls);
}

API Documentation

This knowledge tile is organized into focused sections covering different aspects of the Catalyst API:

Core APIs

  • Catalog APIs - Complete catalog management interfaces
  • Data Source V2 APIs - Modern data source implementation APIs
  • Expression APIs - Expression system and custom expression development

Specialized APIs

Utilities and Helpers

API Stability

Stable APIs (Recommended)

  • Java Connector APIs (org.apache.spark.sql.connector.*) - These are the primary public APIs with backward compatibility guarantees
  • All interfaces marked with @Evolving - May change between versions but with compatibility considerations

Internal APIs (Advanced Use)

  • Scala Catalyst APIs (org.apache.spark.sql.catalyst.*) - Internal APIs that may change without notice
  • Use these for advanced extensions with the understanding of potential breaking changes

Performance Considerations

Vectorized Processing

For high-performance data processing, implement vectorized operations:

public class MyVectorizedReader implements PartitionReader<ColumnarBatch> {
    @Override
    public ColumnarBatch get() {
        // Return columnar batch instead of individual rows
        ColumnVector[] columns = createColumnVectors();
        return new ColumnarBatch(columns, numRows);
    }
}

Code Generation

For custom expressions, consider implementing code generation:

case class MyCustomExpression(child: Expression) extends UnaryExpression {
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    // Generate optimized code for expression evaluation
    val childGen = child.genCode(ctx)
    // Custom code generation logic...
  }
}

Memory Management

Use Catalyst's memory-efficient data structures:

// Use UnsafeRow for memory-efficient row representation
UnsafeRow row = new UnsafeRow(numFields);
UnsafeRowWriter writer = new UnsafeRowWriter(bufferHolder, numFields);

Version Compatibility

This documentation covers Apache Spark Catalyst 3.5.6. The Connector APIs provide the most stable interface across versions, while internal Catalyst APIs may change between releases.

Migration Notes

  • Prefer Data Source V2 APIs over legacy V1 APIs
  • Use Java Connector APIs for maximum stability
  • Implement capability-based interfaces for forward compatibility

Next Steps

  1. Start with Catalog APIs if building custom catalogs
  2. Explore Data Source V2 APIs for custom data sources
  3. Review Expression APIs for custom functions and transforms
  4. Check Streaming APIs for real-time processing needs
  5. Consider Vectorized Processing for high-performance requirements

Each section provides comprehensive API coverage, usage examples, and implementation guidance for building robust Spark extensions.