or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-13

Kafka 0.10+ Source for Structured Streaming

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.13@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-13@3.5.0

index.mddocs/

Apache Spark Kafka SQL Connector

Apache Spark Kafka SQL Connector provides seamless integration between Apache Kafka message queues and Apache Spark's Structured Streaming framework. It enables both reading from and writing to Kafka topics with exactly-once processing semantics, fault tolerance, and automatic offset management for building real-time data pipelines.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.13
  • Package Type: Maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-sql-kafka-0-10_2.13
  • Version: 3.5.6
  • Installation: Add to Maven dependencies or include when submitting Spark applications

Core Imports

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.functions._

// The connector is registered automatically as "kafka" data source
// No direct imports of connector classes are needed

For advanced usage with types:

import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.kafka010.PartitionOffsetMap

Basic Usage

Reading from Kafka (Streaming)

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("KafkaStreaming")
  .getOrCreate()

// Read from Kafka using structured streaming
val kafkaStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "latest")
  .load()

// Process the stream
val processedStream = kafkaStream
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
  .writeStream
  .outputMode("append")
  .format("console")
  .start()

Reading from Kafka (Batch)

val kafkaBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Writing to Kafka

val dataFrame = spark.createDataFrame(Seq(
  ("key1", "value1"),
  ("key2", "value2")
)).toDF("key", "value")

dataFrame
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

Architecture

The Spark Kafka SQL Connector is built around several key components:

  • KafkaSourceProvider: Main entry point implementing multiple Spark SQL interfaces for registration and instantiation
  • Consumer Strategies: Flexible subscription patterns (subscribe, subscribePattern, assign) for different use cases
  • Offset Management: Comprehensive offset tracking with support for earliest, latest, specific offsets, and timestamp-based positioning
  • Schema Conversion: Automatic conversion between Kafka records and Spark rows with optional header support
  • Streaming Sources: Both micro-batch and continuous streaming implementations with trigger support
  • Batch Sources: Efficient batch reading with offset range optimization
  • Write Support: Both batch and streaming write capabilities with producer pooling and configuration management

Capabilities

Data Source Registration

Core data source functionality for registering Kafka as a Spark SQL data source with "kafka" identifier.

// Automatically registered - no direct usage
class KafkaSourceProvider extends DataSourceRegister 
  with StreamSourceProvider 
  with StreamSinkProvider 
  with RelationProvider 
  with CreatableRelationProvider 
  with SimpleTableProvider

Data Source Registration

Consumer Strategy Configuration

Flexible subscription patterns for connecting to Kafka topics including direct assignment, topic subscription, and pattern-based subscription.

// Consumer strategies are configured via options:
// .option("subscribe", "topic1,topic2,topic3")
// .option("subscribePattern", "prefix-.*")
// .option("assign", """{"topic1":[0,1,2],"topic2":[0,1]}""")

sealed trait ConsumerStrategy {
  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
  def createAdmin(kafkaParams: ju.Map[String, Object]): Admin
  def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
}

Consumer Strategies

Offset Management

Comprehensive offset positioning and range management supporting earliest, latest, specific offsets, and timestamp-based positioning.

// Offset range limits for controlling read boundaries
sealed trait KafkaOffsetRangeLimit

case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long], strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit
case class GlobalTimestampRangeLimit(timestamp: Long, strategy: StrategyOnNoMatchStartingOffset.Value) extends KafkaOffsetRangeLimit

Offset Management

Schema Conversion

Schema definition and conversion between Kafka ConsumerRecord format and Spark DataFrame rows with optional header support.

// Schema with headers disabled (default)
val schemaWithoutHeaders = StructType(Array(
  StructField("key", BinaryType),
  StructField("value", BinaryType),
  StructField("topic", StringType),
  StructField("partition", IntegerType),
  StructField("offset", LongType),
  StructField("timestamp", TimestampType),
  StructField("timestampType", IntegerType)
))

// Schema with headers enabled (.option("includeHeaders", "true"))
val schemaWithHeaders = schemaWithoutHeaders.add(
  StructField("headers", ArrayType(StructType(Array(
    StructField("key", StringType),
    StructField("value", BinaryType)
  ))))
)

def kafkaSchema(includeHeaders: Boolean): StructType

Schema Conversion

Streaming Sources

Micro-batch and continuous streaming implementations with comprehensive trigger support and metrics.

// Micro-batch streaming
class KafkaMicroBatchStream extends MicroBatchStream 
  with SupportsTriggerAvailableNow 
  with ReportsSourceMetrics {
  
  def initialOffset(): Offset
  def latestOffset(): Offset
  def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
  def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
  def createReaderFactory(): PartitionReaderFactory
  def commit(end: Offset): Unit
  def stop(): Unit
}

// Continuous streaming  
class KafkaContinuousStream extends ContinuousStream {
  def mergeOffsets(offsets: Array[PartitionOffset]): Offset
  def initialOffset(): Offset  
  def deserializeOffset(json: String): Offset
  def commit(end: Offset): Unit
  def stop(): Unit
}

Streaming Sources

Batch Reading

Efficient batch reading with offset range calculation and partition optimization.

class KafkaBatch extends Batch {
  def planInputPartitions(): Array[InputPartition]
  def createReaderFactory(): PartitionReaderFactory
}

class KafkaBatchPartitionReader extends PartitionReader[InternalRow] {
  def next(): Boolean
  def get(): UnsafeRow
  def close(): Unit
  def currentMetricsValues(): Array[CustomTaskMetric]
}

Batch Reading

Writing to Kafka

Both batch and streaming write support with producer pooling, topic routing, and data validation.

// Core writer functionality
object KafkaWriter {
  val TOPIC_ATTRIBUTE_NAME: String = "topic"
  val KEY_ATTRIBUTE_NAME: String = "key"
  val VALUE_ATTRIBUTE_NAME: String = "value"
  val HEADERS_ATTRIBUTE_NAME: String = "headers"
  val PARTITION_ATTRIBUTE_NAME: String = "partition"
  
  def write(sparkSession: SparkSession, queryExecution: QueryExecution, 
            kafkaParams: ju.Map[String, Object], topic: Option[String]): Unit
}

// V2 DataSource write implementation
case class KafkaWrite(topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends Write {
  def description(): String
  def toBatch: BatchWrite
  def toStreaming: StreamingWrite
}

Writing to Kafka

Configuration Options

Required Options

  • kafka.bootstrap.servers: Kafka bootstrap servers (required)
  • One of: subscribe, subscribePattern, or assign (required)

Common Options

  • startingOffsets: Where to start reading ("earliest", "latest", or JSON offset specification)
  • endingOffsets: Where to stop reading for batch queries ("latest" or JSON offset specification)
  • failOnDataLoss: Whether to fail query when data loss is detected (default: "true")
  • includeHeaders: Include Kafka headers in DataFrame schema (default: "false")
  • maxOffsetsPerTrigger: Maximum number of offsets to process per trigger
  • minOffsetsPerTrigger: Minimum number of offsets to process per trigger

Advanced Options

  • minPartitions: Minimum number of partitions for processing
  • kafkaConsumer.pollTimeoutMs: Consumer poll timeout in milliseconds
  • fetchOffset.numRetries: Number of retries for offset fetching
  • fetchOffset.retryIntervalMs: Retry interval for offset fetching
  • groupIdPrefix: Prefix for consumer group IDs

Error Handling

The connector provides structured exception handling for common Kafka integration scenarios:

  • Data Loss Detection: Automatic detection of missing data due to Kafka retention or topic deletion
  • Offset Out of Range: Handling of invalid offset requests
  • Connection Failures: Retry logic for transient network issues
  • Configuration Validation: Comprehensive validation of all configuration options

Specific Exceptions

object KafkaExceptions {
  def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched(
      tpsForPrefetched: Set[TopicPartition],
      tpsForEndOffset: Set[TopicPartition]): SparkException

  def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
      prefetchedOffset: Map[TopicPartition, Long],
      endOffset: Map[TopicPartition, Long]): SparkException

  def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow(
      tpsForLatestOffset: Set[TopicPartition],
      tpsForEndOffset: Set[TopicPartition]): SparkException

  def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow(
      latestOffset: Map[TopicPartition, Long],
      endOffset: Map[TopicPartition, Long]): SparkException
}

## Custom Metrics

The connector exposes custom metrics for monitoring:

- `offsetOutOfRange`: Number of offsets that were out of range
- `dataLoss`: Number of data loss events detected

These metrics integrate with Spark's metrics system and can be monitored through Spark UI and external monitoring systems.