CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-catalyst-2-13

Catalyst is Spark's library for manipulating relational query plans and expressions

Pending
Overview
Eval results
Files

connectors.mddocs/

Data Source Connectors

Catalyst's Data Source V2 API provides standardized interfaces for integrating external data sources with Spark SQL. The connector framework supports advanced features like predicate pushdown, column pruning, streaming, and transactional operations.

Core Imports

import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.read._  
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Catalog APIs

Catalog Interface

trait CatalogPlugin {
  def initialize(name: String, options: CaseInsensitiveStringMap): Unit
  def name(): String
}

trait TableCatalog extends CatalogPlugin {
  def listTables(namespace: Array[String]): Array[Identifier]
  def loadTable(ident: Identifier): Table
  def tableExists(ident: Identifier): Boolean
  def createTable(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): Table
  def alterTable(ident: Identifier, changes: TableChange*): Table
  def dropTable(ident: Identifier): Boolean
  def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit
}

trait SupportsNamespaces extends CatalogPlugin {
  def listNamespaces(): Array[Array[String]]
  def listNamespaces(namespace: Array[String]): Array[Array[String]]
  def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String]
  def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit
  def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit
  def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean
}

Usage Example:

class MyTableCatalog extends TableCatalog {
  private var catalogName: String = _
  
  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
    this.catalogName = name
  }
  
  override def name(): String = catalogName
  
  override def loadTable(ident: Identifier): Table = {
    // Load table from external catalog
    new MyTable(ident, loadSchemaFromExternal(ident))
  }
  
  override def createTable(
    ident: Identifier, 
    schema: StructType, 
    partitions: Array[Transform],
    properties: util.Map[String, String]
  ): Table = {
    // Create table in external system
    createExternalTable(ident, schema, partitions, properties)
    new MyTable(ident, schema, partitions, properties)
  }
}

Table Interface

trait Table {
  def name(): String
  def schema(): StructType
  def partitioning(): Array[Transform]
  def properties(): util.Map[String, String]
  def capabilities(): util.Set[TableCapability]
}

trait SupportsRead extends Table {
  def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
}

trait SupportsWrite extends Table {
  def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}

trait SupportsDelete extends Table {
  def newDeleteBuilder(info: LogicalWriteInfo): DeleteBuilder
}

trait SupportsUpdate extends Table {
  def newUpdateBuilder(info: LogicalWriteInfo): UpdateBuilder  
}

trait SupportsMerge extends Table {
  def newMergeBuilder(info: LogicalWriteInfo): MergeBuilder
}

Usage Example:

class MyTable(
  identifier: Identifier,
  tableSchema: StructType,
  tablePartitions: Array[Transform] = Array.empty,
  tableProperties: util.Map[String, String] = Map.empty.asJava
) extends Table with SupportsRead with SupportsWrite {
  
  override def name(): String = identifier.toString
  override def schema(): StructType = tableSchema  
  override def partitioning(): Array[Transform] = tablePartitions
  override def properties(): util.Map[String, String] = tableProperties
  
  override def capabilities(): util.Set[TableCapability] = Set(
    TableCapability.BATCH_READ,
    TableCapability.BATCH_WRITE,
    TableCapability.STREAMING_READ,
    TableCapability.ACCEPT_ANY_SCHEMA
  ).asJava
  
  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
    new MyScanBuilder(tableSchema, options)
  }
  
  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
    new MyWriteBuilder(info)
  }
}

Read APIs

Scan Building

trait ScanBuilder {
  def build(): Scan
}

trait SupportsPushDownFilters extends ScanBuilder {
  def pushPredicates(predicates: Array[Predicate]): Array[Predicate]
  def pushedPredicates(): Array[Predicate]
}

trait SupportsPushDownRequiredColumns extends ScanBuilder {
  def pruneColumns(requiredSchema: StructType): Unit
}

trait SupportsPushDownLimit extends ScanBuilder {
  def pushLimit(limit: Int): Boolean
}

trait SupportsPushDownTopN extends ScanBuilder {
  def pushTopN(orders: Array[SortOrder], limit: Int): Boolean
}

trait SupportsPushDownAggregates extends ScanBuilder {
  def supportCompletePushDown(aggregation: Aggregation): Boolean
  def pushAggregation(aggregation: Aggregation): Boolean
}

Usage Example:

class MyScanBuilder(
  schema: StructType,
  options: CaseInsensitiveStringMap
) extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns {
  
  private var pushedFilters: Array[Predicate] = Array.empty
  private var requiredSchema: StructType = schema
  
  override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
    val (supported, unsupported) = predicates.partition(canPushDown)
    this.pushedFilters = supported
    unsupported
  }
  
  override def pushedPredicates(): Array[Predicate] = pushedFilters
  
  override def pruneColumns(requiredSchema: StructType): Unit = {
    this.requiredSchema = requiredSchema
  }
  
  override def build(): Scan = {
    new MyScan(requiredSchema, pushedFilters, options)
  }
  
  private def canPushDown(predicate: Predicate): Boolean = {
    // Determine which predicates can be pushed down to the data source
    predicate match {
      case _: sources.EqualTo => true
      case _: sources.GreaterThan => true
      case _: sources.LessThan => true
      case _ => false
    }
  }
}

Scan Execution

trait Scan {
  def readSchema(): StructType
  def description(): String
  def toBatch: Batch
}

trait Batch {
  def planInputPartitions(): Array[InputPartition]
  def createReaderFactory(): PartitionReaderFactory
}

trait InputPartition extends Serializable

trait PartitionReaderFactory extends Serializable {
  def createReader(partition: InputPartition): PartitionReader[InternalRow]
  def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch]
  def supportColumnarReads(partition: InputPartition): Boolean
}

trait PartitionReader[T] extends Closeable {
  def next(): Boolean
  def get(): T
}

Usage Example:

class MyScan(
  schema: StructType,
  filters: Array[Predicate],
  options: CaseInsensitiveStringMap
) extends Scan with Batch {
  
  override def readSchema(): StructType = schema
  override def description(): String = s"MyScan(${schema.fieldNames.mkString(", ")})"
  override def toBatch: Batch = this
  
  override def planInputPartitions(): Array[InputPartition] = {
    // Create partitions based on data source layout
    (0 until getPartitionCount).map(i => MyInputPartition(i)).toArray
  }
  
  override def createReaderFactory(): PartitionReaderFactory = {
    new MyPartitionReaderFactory(schema, filters, options)
  }
}

case class MyInputPartition(partitionId: Int) extends InputPartition

class MyPartitionReaderFactory(
  schema: StructType,
  filters: Array[Predicate], 
  options: CaseInsensitiveStringMap
) extends PartitionReaderFactory {
  
  override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
    new MyPartitionReader(partition.asInstanceOf[MyInputPartition], schema, filters)
  }
  
  override def supportColumnarReads(partition: InputPartition): Boolean = false
}

class MyPartitionReader(
  partition: MyInputPartition,
  schema: StructType,
  filters: Array[Predicate]
) extends PartitionReader[InternalRow] {
  
  private val iterator = createDataIterator()
  private var currentRow: InternalRow = _
  
  override def next(): Boolean = {
    if (iterator.hasNext) {
      currentRow = iterator.next()
      true
    } else {
      false
    }
  }
  
  override def get(): InternalRow = currentRow
  
  override def close(): Unit = {
    // Clean up resources
  }
  
  private def createDataIterator(): Iterator[InternalRow] = {
    // Create iterator that reads data from external source
    // applying filters and returning rows matching the schema
    loadDataFromExternalSource(partition.partitionId, schema, filters)
  }
}

Write APIs

Write Building

trait WriteBuilder {
  def build(): Write
}

trait SupportsTruncate extends WriteBuilder {
  def truncate(): WriteBuilder
}

trait SupportsOverwrite extends WriteBuilder {
  def overwrite(filters: Array[Filter]): WriteBuilder
}

trait SupportsDynamicOverwrite extends WriteBuilder {
  def overwriteDynamicPartitions(): WriteBuilder
}

trait SupportsStreamingWrite extends WriteBuilder {
  def buildForStreaming(): StreamingWrite
}

Usage Example:

class MyWriteBuilder(info: LogicalWriteInfo) extends WriteBuilder with SupportsOverwrite with SupportsTruncate {
  
  private var overwriteFilters: Array[Filter] = Array.empty
  private var truncateTable: Boolean = false
  
  override def overwrite(filters: Array[Filter]): WriteBuilder = {
    this.overwriteFilters = filters
    this
  }
  
  override def truncate(): WriteBuilder = {
    this.truncateTable = true
    this
  }
  
  override def build(): Write = {
    if (truncateTable) {
      new MyTruncateWrite(info)
    } else if (overwriteFilters.nonEmpty) {
      new MyOverwriteWrite(info, overwriteFilters)
    } else {
      new MyAppendWrite(info)
    }
  }
}

Write Execution

trait Write {
  def description(): String
  def toBatch: BatchWrite
}

trait BatchWrite {
  def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory
  def commit(messages: Array[WriterCommitMessage]): Unit
  def abort(messages: Array[WriterCommitMessage]): Unit
}

trait DataWriterFactory extends Serializable {
  def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow]
}

trait DataWriter[T] extends Closeable {
  def write(record: T): Unit
  def commit(): WriterCommitMessage
  def abort(): Unit
}

trait WriterCommitMessage extends Serializable

Usage Example:

class MyAppendWrite(info: LogicalWriteInfo) extends Write with BatchWrite {
  
  override def description(): String = "MyAppendWrite"
  override def toBatch: BatchWrite = this
  
  override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
    new MyDataWriterFactory(info.schema())
  }
  
  override def commit(messages: Array[WriterCommitMessage]): Unit = {
    // Commit all partition writes atomically
    messages.foreach {
      case msg: MyWriterCommitMessage => commitPartition(msg)
      case _ => throw new IllegalArgumentException("Unexpected commit message type")
    }
  }
  
  override def abort(messages: Array[WriterCommitMessage]): Unit = {
    // Clean up any partial writes
    messages.foreach {
      case msg: MyWriterCommitMessage => abortPartition(msg)
      case _ => // Ignore unknown message types during abort
    }
  }
}

class MyDataWriterFactory(schema: StructType) extends DataWriterFactory {
  override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
    new MyDataWriter(partitionId, taskId, schema)
  }
}

class MyDataWriter(
  partitionId: Int,
  taskId: Long,
  schema: StructType
) extends DataWriter[InternalRow] {
  
  private val outputPath = createOutputPath(partitionId, taskId)
  private val writer = createExternalWriter(outputPath, schema)
  private var recordCount = 0
  
  override def write(record: InternalRow): Unit = {
    writer.writeRecord(record)
    recordCount += 1
  }
  
  override def commit(): WriterCommitMessage = {
    writer.close()
    MyWriterCommitMessage(outputPath, recordCount)
  }
  
  override def abort(): Unit = {
    writer.close()
    deleteOutputFile(outputPath)
  }
  
  override def close(): Unit = {
    writer.close()
  }
}

case class MyWriterCommitMessage(
  outputPath: String,
  recordCount: Int
) extends WriterCommitMessage

Streaming APIs

Streaming Read

trait SupportsAdmissionControl extends Table {
  def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
}

trait ContinuousStream extends SparkDataStream {
  def mergeOffsets(offsets: Array[PartitionOffset]): Offset
  def planInputPartitions(start: Offset): Array[InputPartition]
  def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
}

trait MicroBatchStream extends SparkDataStream {
  def latestOffset(): Offset
  def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]  
  def createReaderFactory(): PartitionReaderFactory
}

Usage Example:

class MyMicroBatchStream(
  schema: StructType,
  options: CaseInsensitiveStringMap
) extends MicroBatchStream {
  
  override def readSchema(): StructType = schema
  
  override def initialOffset(): Offset = {
    MyOffset(getCurrentOffsetFromSource())
  }
  
  override def latestOffset(): Offset = {
    MyOffset(getLatestOffsetFromSource())
  }
  
  override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
    val startOffset = start.asInstanceOf[MyOffset].value
    val endOffset = end.asInstanceOf[MyOffset].value
    
    createPartitionsForRange(startOffset, endOffset)
  }
  
  override def createReaderFactory(): PartitionReaderFactory = {
    new MyStreamingReaderFactory(schema, options)
  }
  
  override def commit(end: Offset): Unit = {
    // Commit offset in external system
    commitOffsetInExternalSystem(end.asInstanceOf[MyOffset].value)
  }
  
  override def stop(): Unit = {
    // Clean up streaming resources
  }
}

case class MyOffset(value: Long) extends Offset {
  override def json(): String = s"""{"offset":$value}"""
}

Streaming Write

trait StreamingWrite {
  def createStreamingWriterFactory(info: PhysicalWriteInfo): StreamingDataWriterFactory
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}

trait StreamingDataWriterFactory extends Serializable {
  def createWriter(partitionId: Int, taskId: Long, epochId: Long): DataWriter[InternalRow]
}

Data Source Provider

TableProvider Interface

trait TableProvider {
  def inferSchema(options: CaseInsensitiveStringMap): StructType
  def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table
}

trait DataSourceV2 extends TableProvider {
  def shortName(): String
}

Usage Example:

class MyDataSourceV2 extends DataSourceV2 {
  
  override def shortName(): String = "mydatasource"
  
  override def inferSchema(options: CaseInsensitiveStringMap): StructType = {
    // Infer schema from external data source
    val path = options.get("path")
    inferSchemaFromPath(path)
  }
  
  override def getTable(
    schema: StructType,
    partitioning: Array[Transform], 
    properties: util.Map[String, String]
  ): Table = {
    val options = CaseInsensitiveStringMap.empty()
    new MyTable(
      Identifier.of(Array.empty, "my_table"),
      schema,
      partitioning,
      properties
    )
  }
}

// Register the data source
class MyDataSourceV2Registration extends DataSourceRegister {
  override def shortName(): String = "mydatasource"
}

Expression Pushdown

Predicate Types

sealed trait Predicate extends Serializable {
  def references: Array[String]
}

case class EqualTo(attribute: String, value: Any) extends Predicate
case class EqualNullSafe(attribute: String, value: Any) extends Predicate
case class GreaterThan(attribute: String, value: Any) extends Predicate
case class GreaterThanOrEqual(attribute: String, value: Any) extends Predicate
case class LessThan(attribute: String, value: Any) extends Predicate
case class LessThanOrEqual(attribute: String, value: Any) extends Predicate
case class In(attribute: String, values: Array[Any]) extends Predicate
case class IsNull(attribute: String) extends Predicate
case class IsNotNull(attribute: String) extends Predicate
case class And(left: Predicate, right: Predicate) extends Predicate
case class Or(left: Predicate, right: Predicate) extends Predicate
case class Not(child: Predicate) extends Predicate
case class StringStartsWith(attribute: String, value: String) extends Predicate
case class StringEndsWith(attribute: String, value: String) extends Predicate
case class StringContains(attribute: String, value: String) extends Predicate

Advanced Pushdown

trait SupportsPushDownCatalystFilters extends ScanBuilder {
  def pushCatalystFilters(filters: Array[Expression]): Array[Expression]
  def pushedCatalystFilters(): Array[Expression]
}

case class Aggregation(
  aggregateExpressions: Array[AggregateFunc],
  groupByExpressions: Array[Expression]
)

trait AggregateFunc extends Expression {
  def aggregateFunction(): aggregate.AggregateFunction
}

The Data Source V2 API provides a comprehensive framework for building high-performance, feature-rich connectors that integrate seamlessly with Spark SQL's query planning and optimization capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-catalyst-2-13

docs

connectors.md

data-types.md

expressions.md

index.md

query-plans.md

tile.json