or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

data-source.mddocs/

Data Source Registration

The Kafka connector registers automatically with Spark SQL as the "kafka" data source, implementing multiple interfaces to provide comprehensive streaming and batch capabilities.

Capabilities

KafkaSourceProvider

Main entry point class that implements all necessary Spark SQL interfaces for complete Kafka integration.

/**
 * Primary provider class for all Kafka readers and writers
 * Automatically registers with Spark SQL using the name "kafka"
 */
class KafkaSourceProvider extends DataSourceRegister
    with StreamSourceProvider
    with StreamSinkProvider
    with RelationProvider
    with CreatableRelationProvider
    with SimpleTableProvider {

  /** Returns the short name used to identify this data source */
  def shortName(): String = "kafka"
  
  /** Creates streaming source for reading Kafka data */
  def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType], 
                  providerName: String, parameters: Map[String, String]): Source
  
  /** Creates streaming sink for writing to Kafka */
  def createSink(sqlContext: SQLContext, parameters: Map[String, String], 
                partitionColumns: Seq[String], outputMode: OutputMode): Sink
  
  /** Creates batch relation for reading Kafka data */
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
  
  /** Creates relation for writing DataFrame to Kafka */
  def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], 
                    data: DataFrame): BaseRelation
  
  /** Returns V2 DataSource table implementation */
  def getTable(options: CaseInsensitiveStringMap): KafkaTable
}

Usage Examples:

The provider is used automatically when specifying "kafka" as the format:

// Streaming read
val stream = spark
  .readStream
  .format("kafka")  // Uses KafkaSourceProvider automatically
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()

// Batch read
val batch = spark
  .read
  .format("kafka")  // Uses KafkaSourceProvider automatically
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .load()

// Write
dataFrame
  .write
  .format("kafka")  // Uses KafkaSourceProvider automatically
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

KafkaTable

V2 DataSource API table implementation providing modern Spark SQL integration.

/**
 * V2 DataSource table implementation for Kafka
 * Supports both reading and writing with comprehensive capabilities
 */
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {
  
  /** Returns the table name for identification */
  def name(): String = "KafkaTable"
  
  /** Returns the schema for Kafka records */
  def schema(): StructType
  
  /** Returns supported table capabilities */
  def capabilities(): ju.Set[TableCapability]
  
  /** Creates scan builder for reading operations */
  def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
  
  /** Creates write builder for writing operations */
  def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}

Supported Capabilities:

// Table capabilities supported by KafkaTable
import org.apache.spark.sql.connector.catalog.TableCapability._

val supportedCapabilities: ju.Set[TableCapability] = ju.EnumSet.of(
  BATCH_READ,        // Batch reading support
  BATCH_WRITE,       // Batch writing support  
  MICRO_BATCH_READ,  // Micro-batch streaming reads
  CONTINUOUS_READ,   // Continuous streaming reads
  STREAMING_WRITE,   // Streaming writes
  ACCEPT_ANY_SCHEMA  // Flexible schema handling
)

KafkaScan

Scan implementation for reading Kafka data in both batch and streaming modes.

/**
 * Scan implementation for reading Kafka data
 * Handles conversion between different read modes
 */
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
  
  /** Returns the read schema for Kafka records */
  def readSchema(): StructType
  
  /** Converts scan to batch reading mode */
  def toBatch(): Batch
  
  /** Converts scan to micro-batch streaming mode */
  def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
  
  /** Converts scan to continuous streaming mode */
  def toContinuousStream(checkpointLocation: String): ContinuousStream
  
  /** Returns supported custom metrics */
  def supportedCustomMetrics(): Array[CustomMetric]
}

Configuration Validation

The data source provider performs comprehensive validation of configuration options:

Required Options Validation

// One of these subscription strategies must be specified:
// - "subscribe": "topic1,topic2,topic3"
// - "subscribePattern": "prefix-.*"  
// - "assign": """{"topic1":[0,1],"topic2":[0,1]}"""

// Bootstrap servers must be specified:
// - "kafka.bootstrap.servers": "localhost:9092"

Unsupported Kafka Options

The following Kafka consumer/producer options are not supported and will cause exceptions:

// Unsupported consumer options
"kafka.auto.offset.reset"           // Use startingOffsets instead
"kafka.key.deserializer"            // Fixed to ByteArrayDeserializer
"kafka.value.deserializer"          // Fixed to ByteArrayDeserializer
"kafka.enable.auto.commit"          // Managed internally
"kafka.interceptor.classes"         // Not safe for Spark usage

// Unsupported producer options  
"kafka.key.serializer"              // Fixed to ByteArraySerializer
"kafka.value.serializer"            // Fixed to ByteArraySerializer

Stream vs Batch Option Validation

// Stream-only options (not valid for batch queries)
"endingOffsets"                     // Only for batch
"endingOffsetsByTimestamp"          // Only for batch

// Batch-specific restrictions
// startingOffsets cannot be "latest" for batch queries
// endingOffsets cannot be "earliest" for batch queries

Error Messages

The provider includes helpful error messages for common configuration issues:

// Missing subscription strategy
"One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"

// Custom group ID warning
"Kafka option 'kafka.group.id' has been set on this query, it is not recommended to set this option"

// Invalid offset configuration
"starting offset can't be latest for batch queries on Kafka"
"ending offset can't be earliest for batch queries on Kafka"