or run

tessl search
Log in

Version

Files

tile.json

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12

Kafka 0.10+ Source for Structured Streaming - provides integration between Apache Spark's Structured Streaming and Apache Kafka for real-time data processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.12@3.0.x

To install, run

tessl install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12@3.0.0

index.mddocs/

Spark SQL Kafka Connector

The Spark SQL Kafka connector provides integration between Apache Spark's Structured Streaming and Apache Kafka 0.10+, enabling real-time data processing capabilities. It implements both source and sink functionality for Kafka topics with exactly-once semantics, offset management, and flexible consumer strategies.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.12
  • Package Type: maven
  • Language: Scala
  • Installation: Add to your project dependencies or use in Spark with --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

Core Imports

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.Consumer
import org.apache.spark.sql.kafka010._

Basic Usage

Reading from Kafka (Streaming)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("KafkaExample")
  .getOrCreate()

// Read from Kafka as a streaming DataFrame
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .load()

// Kafka schema: key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
val messageDF = df.selectExpr("CAST(value AS STRING) as message")

// Start streaming query
val query = messageDF.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

Writing to Kafka (Streaming)

// Write streaming data to Kafka  
val query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .start()

Batch Operations

// Batch read from Kafka
val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Batch write to Kafka
batchDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

Architecture

The Spark SQL Kafka connector is built around several key components:

  • Data Source Registration: Registered as "kafka" data source in Spark SQL
  • Consumer Strategies: Multiple strategies for topic subscription (assign, subscribe, subscribePattern)
  • Offset Management: Comprehensive offset tracking with earliest, latest, specific, and timestamp-based options
  • Schema Management: Fixed schema for Kafka records with optional header support
  • Connection Pooling: Efficient resource management for consumers and producers
  • Data Source V2: Modern implementation with both V1 compatibility and V2 performance

Capabilities

Connection Configuration

Options for connecting to Kafka clusters and configuring consumer/producer behavior.

// Required connection configuration
.option("kafka.bootstrap.servers", "localhost:9092")

// Consumer strategies (choose one)
.option("subscribe", "topic1,topic2")           // Subscribe to topics
.option("subscribePattern", "topic-.*")         // Subscribe with regex
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // Assign partitions

Connection Configuration

Offset Management

Comprehensive offset control for both batch and streaming operations.

// Offset specification options
.option("startingOffsets", "earliest")          // earliest, latest, or JSON
.option("endingOffsets", "latest")              // latest or JSON (batch only)
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000}}""")

Offset Management

Schema and Data Format

Fixed schema for Kafka records with support for headers and type conversion.

// Standard Kafka schema (without headers)
val schemaWithoutHeaders = StructType(Array(
  StructField("key", BinaryType),
  StructField("value", BinaryType),
  StructField("topic", StringType),
  StructField("partition", IntegerType),
  StructField("offset", LongType),
  StructField("timestamp", TimestampType),
  StructField("timestampType", IntegerType)
))

// Schema with headers (when includeHeaders=true)
val headersType = ArrayType(StructType(Array(
  StructField("key", StringType),
  StructField("value", BinaryType)
)))

val schemaWithHeaders = StructType(
  schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)

// Schema access function
def kafkaSchema(includeHeaders: Boolean): StructType = {
  if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}

Schema and Data Format

Streaming Operations

Streaming source and sink functionality with advanced configuration options.

// Streaming-specific options
.option("maxOffsetsPerTrigger", "1000")         // Rate limiting
.option("failOnDataLoss", "true")               // Data loss policy
.option("minPartitions", "3")                   // Minimum partitions

Streaming Operations

Batch Operations

Batch reading and writing with flexible offset ranges and performance tuning.

// Batch operations support
spark.read.format("kafka")                      // Batch read
dataFrame.write.format("kafka")                 // Batch write

Batch Operations

Error Handling and Reliability

Configuration options for handling data loss, connection failures, and retry policies.

// Reliability options
.option("failOnDataLoss", "false")              // Continue on data loss
.option("fetchOffset.numRetries", "5")          // Offset fetch retries
.option("fetchOffset.retryIntervalMs", "1000")  // Retry interval
.option("kafkaConsumer.pollTimeoutMs", "60000") // Consumer poll timeout

Error Handling and Reliability

Types

// Core types for offset management
type PartitionOffsetMap = Map[TopicPartition, Long]

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])

sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit  
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

// Offset range limit constants
object KafkaOffsetRangeLimit {
  val LATEST: Long = -1L    // indicates resolution to the latest offset
  val EARLIEST: Long = -2L  // indicates resolution to the earliest offset
}

case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long, 
  untilOffset: Long,
  preferredLoc: Option[String]
)

// Consumer strategy types
sealed trait ConsumerStrategy {
  def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy  
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

// Configuration option keys
object KafkaSourceProvider {
  val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
  val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
  val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
  val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp"
  val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
  val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
  val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
  val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"
  val CONSUMER_POLL_TIMEOUT = "kafkaconsumer.polltimeoutms"
  val INCLUDE_HEADERS = "includeheaders"
  val TOPIC_OPTION_KEY = "topic"
}