The Kafka connector registers automatically with Spark SQL as the "kafka" data source, implementing multiple interfaces to provide comprehensive streaming and batch capabilities.
Main entry point class that implements all necessary Spark SQL interfaces for complete Kafka integration.
/**
* Primary provider class for all Kafka readers and writers
* Automatically registers with Spark SQL using the name "kafka"
*/
class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with SimpleTableProvider {
/** Returns the short name used to identify this data source */
def shortName(): String = "kafka"
/** Creates streaming source for reading Kafka data */
def createSource(sqlContext: SQLContext, metadataPath: String, schema: Option[StructType],
providerName: String, parameters: Map[String, String]): Source
/** Creates streaming sink for writing to Kafka */
def createSink(sqlContext: SQLContext, parameters: Map[String, String],
partitionColumns: Seq[String], outputMode: OutputMode): Sink
/** Creates batch relation for reading Kafka data */
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
/** Creates relation for writing DataFrame to Kafka */
def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String],
data: DataFrame): BaseRelation
/** Returns V2 DataSource table implementation */
def getTable(options: CaseInsensitiveStringMap): KafkaTable
}Usage Examples:
The provider is used automatically when specifying "kafka" as the format:
// Streaming read
val stream = spark
.readStream
.format("kafka") // Uses KafkaSourceProvider automatically
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
// Batch read
val batch = spark
.read
.format("kafka") // Uses KafkaSourceProvider automatically
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.load()
// Write
dataFrame
.write
.format("kafka") // Uses KafkaSourceProvider automatically
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()V2 DataSource API table implementation providing modern Spark SQL integration.
/**
* V2 DataSource table implementation for Kafka
* Supports both reading and writing with comprehensive capabilities
*/
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite {
/** Returns the table name for identification */
def name(): String = "KafkaTable"
/** Returns the schema for Kafka records */
def schema(): StructType
/** Returns supported table capabilities */
def capabilities(): ju.Set[TableCapability]
/** Creates scan builder for reading operations */
def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
/** Creates write builder for writing operations */
def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}Supported Capabilities:
// Table capabilities supported by KafkaTable
import org.apache.spark.sql.connector.catalog.TableCapability._
val supportedCapabilities: ju.Set[TableCapability] = ju.EnumSet.of(
BATCH_READ, // Batch reading support
BATCH_WRITE, // Batch writing support
MICRO_BATCH_READ, // Micro-batch streaming reads
CONTINUOUS_READ, // Continuous streaming reads
STREAMING_WRITE, // Streaming writes
ACCEPT_ANY_SCHEMA // Flexible schema handling
)Scan implementation for reading Kafka data in both batch and streaming modes.
/**
* Scan implementation for reading Kafka data
* Handles conversion between different read modes
*/
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {
/** Returns the read schema for Kafka records */
def readSchema(): StructType
/** Converts scan to batch reading mode */
def toBatch(): Batch
/** Converts scan to micro-batch streaming mode */
def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
/** Converts scan to continuous streaming mode */
def toContinuousStream(checkpointLocation: String): ContinuousStream
/** Returns supported custom metrics */
def supportedCustomMetrics(): Array[CustomMetric]
}The data source provider performs comprehensive validation of configuration options:
// One of these subscription strategies must be specified:
// - "subscribe": "topic1,topic2,topic3"
// - "subscribePattern": "prefix-.*"
// - "assign": """{"topic1":[0,1],"topic2":[0,1]}"""
// Bootstrap servers must be specified:
// - "kafka.bootstrap.servers": "localhost:9092"The following Kafka consumer/producer options are not supported and will cause exceptions:
// Unsupported consumer options
"kafka.auto.offset.reset" // Use startingOffsets instead
"kafka.key.deserializer" // Fixed to ByteArrayDeserializer
"kafka.value.deserializer" // Fixed to ByteArrayDeserializer
"kafka.enable.auto.commit" // Managed internally
"kafka.interceptor.classes" // Not safe for Spark usage
// Unsupported producer options
"kafka.key.serializer" // Fixed to ByteArraySerializer
"kafka.value.serializer" // Fixed to ByteArraySerializer// Stream-only options (not valid for batch queries)
"endingOffsets" // Only for batch
"endingOffsetsByTimestamp" // Only for batch
// Batch-specific restrictions
// startingOffsets cannot be "latest" for batch queries
// endingOffsets cannot be "earliest" for batch queriesThe provider includes helpful error messages for common configuration issues:
// Missing subscription strategy
"One of the following options must be specified for Kafka source: subscribe, subscribePattern, assign"
// Custom group ID warning
"Kafka option 'kafka.group.id' has been set on this query, it is not recommended to set this option"
// Invalid offset configuration
"starting offset can't be latest for batch queries on Kafka"
"ending offset can't be earliest for batch queries on Kafka"