or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-11

Apache Spark Structured Streaming integration with Apache Kafka providing comprehensive data source and sink capabilities for both batch and streaming workloads.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-11@2.4.0

index.mddocs/

Apache Spark Kafka Integration

Apache Spark Kafka Integration provides comprehensive structured streaming and batch data processing capabilities for Apache Kafka. This module enables seamless reading from and writing to Kafka topics using Spark DataFrames and Datasets with support for micro-batch processing, continuous streaming, and batch operations with complete offset management and fault tolerance.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.11
  • Package Type: maven
  • Language: Scala
  • Installation: Add to your Spark application dependencies
  • Coordinate: org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8

Core Imports

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.TopicPartition

Basic Usage

Reading from Kafka (Streaming)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .appName("KafkaExample")
  .getOrCreate()

// Read from Kafka topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "latest")
  .load()

// Process the stream
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

Writing to Kafka

// Write DataFrame to Kafka
df.select(
  col("id").cast("string").as("key"),
  to_json(struct(col("*"))).as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.save()

Batch Processing

// Read from Kafka for batch processing
val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Architecture

The Spark Kafka integration is built around several key components:

  • Data Source Provider: KafkaSourceProvider implements Spark's DataSource API for both V1 and V2
  • Consumer Strategies: Flexible patterns for topic assignment (Assign, Subscribe, SubscribePattern)
  • Offset Management: Comprehensive offset tracking with configurable start/end positions
  • Streaming Readers: Micro-batch and continuous processing capabilities
  • Producer Integration: Efficient writing with connection pooling and caching
  • Schema Management: Fixed schema for Kafka records with proper type handling

Kafka Record Schema

All Kafka records follow this fixed schema:

StructType(Seq(
  StructField("key", BinaryType),      // Message key (nullable)
  StructField("value", BinaryType),    // Message value (nullable) 
  StructField("topic", StringType),    // Topic name
  StructField("partition", IntegerType), // Partition number
  StructField("offset", LongType),     // Message offset
  StructField("timestamp", TimestampType), // Message timestamp
  StructField("timestampType", IntegerType) // Timestamp type (0=CreateTime, 1=LogAppendTime)
))

Capabilities

Consumer Strategies

Flexible patterns for consuming data from Kafka topics, supporting subscription by topic names, regex patterns, or specific partition assignments.

sealed trait ConsumerStrategy {
  def createConsumer(kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy  
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

Consumer Strategies

Offset Management

Comprehensive offset tracking and range limit handling for precise control over data consumption boundaries.

sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2

Offset Management

Streaming Data Sources

Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance and exactly-once semantics.

class KafkaMicroBatchReader extends MicroBatchReader with Logging {
  def setOffsetRange(start: Option[Offset], end: Offset): Unit
  def planInputPartitions(): ju.List[InputPartition[InternalRow]]
  def readSchema(): StructType
}

class KafkaContinuousReader extends ContinuousReader with Logging {
  def readSchema: StructType
  def setStartOffset(start: Option[Offset]): Unit 
  def planInputPartitions(): ju.List[InputPartition[InternalRow]]
}

Streaming Sources

Batch Data Access

Batch relation for reading historical data from Kafka topics with configurable offset ranges.

class KafkaRelation extends BaseRelation with TableScan with Logging {
  def sqlContext: SQLContext
  def schema: StructType
  def buildScan(): RDD[Row]
}

Batch Processing

Data Writing

Comprehensive writing capabilities for both streaming and batch workloads with producer connection pooling and automatic serialization.

class KafkaSink extends Sink with Logging {
  def addBatch(batchId: Long, data: DataFrame): Unit
}

class KafkaStreamWriter extends StreamWriter {
  def createWriterFactory(): KafkaStreamWriterFactory
  def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit
}

object KafkaWriter extends Logging {
  def write(sparkSession: SparkSession, queryExecution: QueryExecution,
           kafkaParameters: ju.Map[String, Object], topic: Option[String]): Unit
}

Data Writing

Configuration and Options

Complete configuration options for fine-tuning Kafka integration behavior, connection parameters, and performance settings.

Source Options:

// Connection
"kafka.bootstrap.servers" -> "localhost:9092"
"subscribe" -> "topic1,topic2"
"subscribePattern" -> "topic.*"
"assign" -> """{"topic1":[0,1],"topic2":[0]}"""

// Offset Management  
"startingOffsets" -> "earliest" // or "latest" or JSON
"endingOffsets" -> "latest"     // or JSON (batch only)
"failOnDataLoss" -> "true"

// Performance
"minPartitions" -> "10"
"maxOffsetsPerTrigger" -> "1000000"

Sink Options:

"kafka.bootstrap.servers" -> "localhost:9092"
"topic" -> "output-topic"

Configuration

Types

// Package-level type alias
type PartitionOffsetMap = Map[TopicPartition, Long]

// Data Consumer Types
case class AvailableOffsetRange(earliest: Long, latest: Long)

sealed trait KafkaDataConsumer {
  def get(offset: Long, untilOffset: Long, pollTimeoutMs: Long, 
         failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]]
  def getAvailableOffsetRange(): AvailableOffsetRange
  def release(): Unit
}

// Offset Range Types
case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long, 
  preferredLoc: Option[String]
) {
  lazy val size: Long = untilOffset - fromOffset
}

// RDD Types
case class KafkaSourceRDDOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long,
  preferredLoc: Option[String]
) {
  def topic: String = topicPartition.topic
  def partition: Int = topicPartition.partition
  def size: Long = untilOffset - fromOffset
}