Catalyst is Spark's library for manipulating relational query plans and expressions
—
Catalyst's Data Source V2 API provides standardized interfaces for integrating external data sources with Spark SQL. The connector framework supports advanced features like predicate pushdown, column pruning, streaming, and transactional operations.
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMaptrait CatalogPlugin {
def initialize(name: String, options: CaseInsensitiveStringMap): Unit
def name(): String
}
trait TableCatalog extends CatalogPlugin {
def listTables(namespace: Array[String]): Array[Identifier]
def loadTable(ident: Identifier): Table
def tableExists(ident: Identifier): Boolean
def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table
def alterTable(ident: Identifier, changes: TableChange*): Table
def dropTable(ident: Identifier): Boolean
def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit
}
trait SupportsNamespaces extends CatalogPlugin {
def listNamespaces(): Array[Array[String]]
def listNamespaces(namespace: Array[String]): Array[Array[String]]
def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String]
def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit
def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit
def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean
}Usage Example:
class MyTableCatalog extends TableCatalog {
private var catalogName: String = _
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
this.catalogName = name
}
override def name(): String = catalogName
override def loadTable(ident: Identifier): Table = {
// Load table from external catalog
new MyTable(ident, loadSchemaFromExternal(ident))
}
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]
): Table = {
// Create table in external system
createExternalTable(ident, schema, partitions, properties)
new MyTable(ident, schema, partitions, properties)
}
}trait Table {
def name(): String
def schema(): StructType
def partitioning(): Array[Transform]
def properties(): util.Map[String, String]
def capabilities(): util.Set[TableCapability]
}
trait SupportsRead extends Table {
def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
}
trait SupportsWrite extends Table {
def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}
trait SupportsDelete extends Table {
def newDeleteBuilder(info: LogicalWriteInfo): DeleteBuilder
}
trait SupportsUpdate extends Table {
def newUpdateBuilder(info: LogicalWriteInfo): UpdateBuilder
}
trait SupportsMerge extends Table {
def newMergeBuilder(info: LogicalWriteInfo): MergeBuilder
}Usage Example:
class MyTable(
identifier: Identifier,
tableSchema: StructType,
tablePartitions: Array[Transform] = Array.empty,
tableProperties: util.Map[String, String] = Map.empty.asJava
) extends Table with SupportsRead with SupportsWrite {
override def name(): String = identifier.toString
override def schema(): StructType = tableSchema
override def partitioning(): Array[Transform] = tablePartitions
override def properties(): util.Map[String, String] = tableProperties
override def capabilities(): util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
TableCapability.STREAMING_READ,
TableCapability.ACCEPT_ANY_SCHEMA
).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
new MyScanBuilder(tableSchema, options)
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new MyWriteBuilder(info)
}
}trait ScanBuilder {
def build(): Scan
}
trait SupportsPushDownFilters extends ScanBuilder {
def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
def pushedPredicates(): Array[Predicate]
}
trait SupportsPushDownRequiredColumns extends ScanBuilder {
def pruneColumns(requiredSchema: StructType): Unit
}
trait SupportsPushDownLimit extends ScanBuilder {
def pushLimit(limit: Int): Boolean
}
trait SupportsPushDownTopN extends ScanBuilder {
def pushTopN(orders: Array[SortOrder], limit: Int): Boolean
}
trait SupportsPushDownAggregates extends ScanBuilder {
def supportCompletePushDown(aggregation: Aggregation): Boolean
def pushAggregation(aggregation: Aggregation): Boolean
}Usage Example:
class MyScanBuilder(
schema: StructType,
options: CaseInsensitiveStringMap
) extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {
private var pushedFilters: Array[Predicate] = Array.empty
private var requiredSchema: StructType = schema
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
val (supported, unsupported) = predicates.partition(canPushDown)
this.pushedFilters = supported
unsupported
}
override def pushedPredicates(): Array[Predicate] = pushedFilters
override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = requiredSchema
}
override def build(): Scan = {
new MyScan(requiredSchema, pushedFilters, options)
}
private def canPushDown(predicate: Predicate): Boolean = {
// Determine which predicates can be pushed down to the data source
predicate match {
case _: sources.EqualTo => true
case _: sources.GreaterThan => true
case _: sources.LessThan => true
case _ => false
}
}
}trait Scan {
def readSchema(): StructType
def description(): String
def toBatch: Batch
}
trait Batch {
def planInputPartitions(): Array[InputPartition]
def createReaderFactory(): PartitionReaderFactory
}
trait InputPartition extends Serializable
trait PartitionReaderFactory extends Serializable {
def createReader(partition: InputPartition): PartitionReader[InternalRow]
def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch]
def supportColumnarReads(partition: InputPartition): Boolean
}
trait PartitionReader[T] extends Closeable {
def next(): Boolean
def get(): T
}Usage Example:
class MyScan(
schema: StructType,
filters: Array[Predicate],
options: CaseInsensitiveStringMap
) extends Scan with Batch {
override def readSchema(): StructType = schema
override def description(): String = s"MyScan(${schema.fieldNames.mkString(", ")})"
override def toBatch: Batch = this
override def planInputPartitions(): Array[InputPartition] = {
// Create partitions based on data source layout
(0 until getPartitionCount).map(i => MyInputPartition(i)).toArray
}
override def createReaderFactory(): PartitionReaderFactory = {
new MyPartitionReaderFactory(schema, filters, options)
}
}
case class MyInputPartition(partitionId: Int) extends InputPartition
class MyPartitionReaderFactory(
schema: StructType,
filters: Array[Predicate],
options: CaseInsensitiveStringMap
) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
new MyPartitionReader(partition.asInstanceOf[MyInputPartition], schema, filters)
}
override def supportColumnarReads(partition: InputPartition): Boolean = false
}
class MyPartitionReader(
partition: MyInputPartition,
schema: StructType,
filters: Array[Predicate]
) extends PartitionReader[InternalRow] {
private val iterator = createDataIterator()
private var currentRow: InternalRow = _
override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
}
}
override def get(): InternalRow = currentRow
override def close(): Unit = {
// Clean up resources
}
private def createDataIterator(): Iterator[InternalRow] = {
// Create iterator that reads data from external source
// applying filters and returning rows matching the schema
loadDataFromExternalSource(partition.partitionId, schema, filters)
}
}trait WriteBuilder {
def build(): Write
}
trait SupportsTruncate extends WriteBuilder {
def truncate(): WriteBuilder
}
trait SupportsOverwrite extends WriteBuilder {
def overwrite(filters: Array[Filter]): WriteBuilder
}
trait SupportsDynamicOverwrite extends WriteBuilder {
def overwriteDynamicPartitions(): WriteBuilder
}
trait SupportsStreamingWrite extends WriteBuilder {
def buildForStreaming(): StreamingWrite
}Usage Example:
class MyWriteBuilder(info: LogicalWriteInfo) extends WriteBuilder with SupportsOverwrite with SupportsTruncate {
private var overwriteFilters: Array[Filter] = Array.empty
private var truncateTable: Boolean = false
override def overwrite(filters: Array[Filter]): WriteBuilder = {
this.overwriteFilters = filters
this
}
override def truncate(): WriteBuilder = {
this.truncateTable = true
this
}
override def build(): Write = {
if (truncateTable) {
new MyTruncateWrite(info)
} else if (overwriteFilters.nonEmpty) {
new MyOverwriteWrite(info, overwriteFilters)
} else {
new MyAppendWrite(info)
}
}
}trait Write {
def description(): String
def toBatch: BatchWrite
}
trait BatchWrite {
def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory
def commit(messages: Array[WriterCommitMessage]): Unit
def abort(messages: Array[WriterCommitMessage]): Unit
}
trait DataWriterFactory extends Serializable {
def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow]
}
trait DataWriter[T] extends Closeable {
def write(record: T): Unit
def commit(): WriterCommitMessage
def abort(): Unit
}
trait WriterCommitMessage extends SerializableUsage Example:
class MyAppendWrite(info: LogicalWriteInfo) extends Write with BatchWrite {
override def description(): String = "MyAppendWrite"
override def toBatch: BatchWrite = this
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
new MyDataWriterFactory(info.schema())
}
override def commit(messages: Array[WriterCommitMessage]): Unit = {
// Commit all partition writes atomically
messages.foreach {
case msg: MyWriterCommitMessage => commitPartition(msg)
case _ => throw new IllegalArgumentException("Unexpected commit message type")
}
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
// Clean up any partial writes
messages.foreach {
case msg: MyWriterCommitMessage => abortPartition(msg)
case _ => // Ignore unknown message types during abort
}
}
}
class MyDataWriterFactory(schema: StructType) extends DataWriterFactory {
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
new MyDataWriter(partitionId, taskId, schema)
}
}
class MyDataWriter(
partitionId: Int,
taskId: Long,
schema: StructType
) extends DataWriter[InternalRow] {
private val outputPath = createOutputPath(partitionId, taskId)
private val writer = createExternalWriter(outputPath, schema)
private var recordCount = 0
override def write(record: InternalRow): Unit = {
writer.writeRecord(record)
recordCount += 1
}
override def commit(): WriterCommitMessage = {
writer.close()
MyWriterCommitMessage(outputPath, recordCount)
}
override def abort(): Unit = {
writer.close()
deleteOutputFile(outputPath)
}
override def close(): Unit = {
writer.close()
}
}
case class MyWriterCommitMessage(
outputPath: String,
recordCount: Int
) extends WriterCommitMessagetrait SupportsAdmissionControl extends Table {
def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
}
trait ContinuousStream extends SparkDataStream {
def mergeOffsets(offsets: Array[PartitionOffset]): Offset
def planInputPartitions(start: Offset): Array[InputPartition]
def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
}
trait MicroBatchStream extends SparkDataStream {
def latestOffset(): Offset
def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
def createReaderFactory(): PartitionReaderFactory
}Usage Example:
class MyMicroBatchStream(
schema: StructType,
options: CaseInsensitiveStringMap
) extends MicroBatchStream {
override def readSchema(): StructType = schema
override def initialOffset(): Offset = {
MyOffset(getCurrentOffsetFromSource())
}
override def latestOffset(): Offset = {
MyOffset(getLatestOffsetFromSource())
}
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
val startOffset = start.asInstanceOf[MyOffset].value
val endOffset = end.asInstanceOf[MyOffset].value
createPartitionsForRange(startOffset, endOffset)
}
override def createReaderFactory(): PartitionReaderFactory = {
new MyStreamingReaderFactory(schema, options)
}
override def commit(end: Offset): Unit = {
// Commit offset in external system
commitOffsetInExternalSystem(end.asInstanceOf[MyOffset].value)
}
override def stop(): Unit = {
// Clean up streaming resources
}
}
case class MyOffset(value: Long) extends Offset {
override def json(): String = s"""{"offset":$value}"""
}trait StreamingWrite {
def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}
trait StreamingDataWriterFactory extends Serializable {
def createWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow]
}trait TableProvider {
def inferSchema(options: CaseInsensitiveStringMap): StructType
def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table
}
trait DataSourceV2 extends TableProvider {
def shortName(): String
}Usage Example:
class MyDataSourceV2 extends DataSourceV2 {
override def shortName(): String = "mydatasource"
override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
// Infer schema from external data source
val path = options.get("path")
inferSchemaFromPath(path)
}
override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]
): Table = {
val options = CaseInsensitiveStringMap.empty()
new MyTable(
Identifier.of(Array.empty, "my_table"),
schema,
partitioning,
properties
)
}
}
// Register the data source
class MyDataSourceV2Registration extends DataSourceRegister {
override def shortName(): String = "mydatasource"
}sealed trait Predicate extends Serializable {
def references: Array[String]
}
case class EqualTo(attribute: String, value: Any) extends Predicate
case class EqualNullSafe(attribute: String, value: Any) extends Predicate
case class GreaterThan(attribute: String, value: Any) extends Predicate
case class GreaterThanOrEqual(attribute: String, value: Any) extends Predicate
case class LessThan(attribute: String, value: Any) extends Predicate
case class LessThanOrEqual(attribute: String, value: Any) extends Predicate
case class In(attribute: String, values: Array[Any]) extends Predicate
case class IsNull(attribute: String) extends Predicate
case class IsNotNull(attribute: String) extends Predicate
case class And(left: Predicate, right: Predicate) extends Predicate
case class Or(left: Predicate, right: Predicate) extends Predicate
case class Not(child: Predicate) extends Predicate
case class StringStartsWith(attribute: String, value: String) extends Predicate
case class StringEndsWith(attribute: String, value: String) extends Predicate
case class StringContains(attribute: String, value: String) extends Predicatetrait SupportsPushDownCatalystFilters extends ScanBuilder {
def pushCatalystFilters(filters: Array[Expression]): Array[Expression]
def pushedCatalystFilters(): Array[Expression]
}
case class Aggregation(
aggregateExpressions: Array[AggregateFunc],
groupByExpressions: Array[Expression]
)
trait AggregateFunc extends Expression {
def aggregateFunction(): aggregate.AggregateFunction
}The Data Source V2 API provides a comprehensive framework for building high-performance, feature-rich connectors that integrate seamlessly with Spark SQL's query planning and optimization capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13