CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12

Kafka 0.10+ Source for Structured Streaming - provides integration between Apache Spark's Structured Streaming and Apache Kafka for real-time data processing

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Spark SQL Kafka Connector

The Spark SQL Kafka connector provides integration between Apache Spark's Structured Streaming and Apache Kafka 0.10+, enabling real-time data processing capabilities. It implements both source and sink functionality for Kafka topics with exactly-once semantics, offset management, and flexible consumer strategies.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.12
  • Package Type: maven
  • Language: Scala
  • Installation: Add to your project dependencies or use in Spark with --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

Core Imports

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.Consumer
import org.apache.spark.sql.kafka010._

Basic Usage

Reading from Kafka (Streaming)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("KafkaExample")
  .getOrCreate()

// Read from Kafka as a streaming DataFrame
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .load()

// Kafka schema: key, value, topic, partition, offset, timestamp, timestampType, headers (optional)
val messageDF = df.selectExpr("CAST(value AS STRING) as message")

// Start streaming query
val query = messageDF.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()

Writing to Kafka (Streaming)

// Write streaming data to Kafka  
val query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .start()

Batch Operations

// Batch read from Kafka
val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "my-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Batch write to Kafka
batchDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .save()

Architecture

The Spark SQL Kafka connector is built around several key components:

  • Data Source Registration: Registered as "kafka" data source in Spark SQL
  • Consumer Strategies: Multiple strategies for topic subscription (assign, subscribe, subscribePattern)
  • Offset Management: Comprehensive offset tracking with earliest, latest, specific, and timestamp-based options
  • Schema Management: Fixed schema for Kafka records with optional header support
  • Connection Pooling: Efficient resource management for consumers and producers
  • Data Source V2: Modern implementation with both V1 compatibility and V2 performance

Capabilities

Connection Configuration

Options for connecting to Kafka clusters and configuring consumer/producer behavior.

// Required connection configuration
.option("kafka.bootstrap.servers", "localhost:9092")

// Consumer strategies (choose one)
.option("subscribe", "topic1,topic2")           // Subscribe to topics
.option("subscribePattern", "topic-.*")         // Subscribe with regex
.option("assign", """{"topic1":[0,1],"topic2":[0]}""") // Assign partitions

Connection Configuration

Offset Management

Comprehensive offset control for both batch and streaming operations.

// Offset specification options
.option("startingOffsets", "earliest")          // earliest, latest, or JSON
.option("endingOffsets", "latest")              // latest or JSON (batch only)
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000}}""")

Offset Management

Schema and Data Format

Fixed schema for Kafka records with support for headers and type conversion.

// Standard Kafka schema (without headers)
val schemaWithoutHeaders = StructType(Array(
  StructField("key", BinaryType),
  StructField("value", BinaryType),
  StructField("topic", StringType),
  StructField("partition", IntegerType),
  StructField("offset", LongType),
  StructField("timestamp", TimestampType),
  StructField("timestampType", IntegerType)
))

// Schema with headers (when includeHeaders=true)
val headersType = ArrayType(StructType(Array(
  StructField("key", StringType),
  StructField("value", BinaryType)
)))

val schemaWithHeaders = StructType(
  schemaWithoutHeaders.fields :+ StructField("headers", headersType)
)

// Schema access function
def kafkaSchema(includeHeaders: Boolean): StructType = {
  if (includeHeaders) schemaWithHeaders else schemaWithoutHeaders
}

Schema and Data Format

Streaming Operations

Streaming source and sink functionality with advanced configuration options.

// Streaming-specific options
.option("maxOffsetsPerTrigger", "1000")         // Rate limiting
.option("failOnDataLoss", "true")               // Data loss policy
.option("minPartitions", "3")                   // Minimum partitions

Streaming Operations

Batch Operations

Batch reading and writing with flexible offset ranges and performance tuning.

// Batch operations support
spark.read.format("kafka")                      // Batch read
dataFrame.write.format("kafka")                 // Batch write

Batch Operations

Error Handling and Reliability

Configuration options for handling data loss, connection failures, and retry policies.

// Reliability options
.option("failOnDataLoss", "false")              // Continue on data loss
.option("fetchOffset.numRetries", "5")          // Offset fetch retries
.option("fetchOffset.retryIntervalMs", "1000")  // Retry interval
.option("kafkaConsumer.pollTimeoutMs", "60000") // Consumer poll timeout

Error Handling and Reliability

Types

// Core types for offset management
type PartitionOffsetMap = Map[TopicPartition, Long]

case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])

sealed trait KafkaOffsetRangeLimit
case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit  
case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
case class SpecificOffsetRangeLimit(partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit
case class SpecificTimestampRangeLimit(topicTimestamps: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

// Offset range limit constants
object KafkaOffsetRangeLimit {
  val LATEST: Long = -1L    // indicates resolution to the latest offset
  val EARLIEST: Long = -2L  // indicates resolution to the earliest offset
}

case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long, 
  untilOffset: Long,
  preferredLoc: Option[String]
)

// Consumer strategy types
sealed trait ConsumerStrategy {
  def createConsumer(kafkaParams: java.util.Map[String, Object]): Consumer[Array[Byte], Array[Byte]]
}

case class AssignStrategy(partitions: Array[TopicPartition]) extends ConsumerStrategy
case class SubscribeStrategy(topics: Seq[String]) extends ConsumerStrategy  
case class SubscribePatternStrategy(topicPattern: String) extends ConsumerStrategy

// Configuration option keys
object KafkaSourceProvider {
  val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
  val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
  val STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "startingoffsetsbytimestamp"
  val ENDING_OFFSETS_BY_TIMESTAMP_OPTION_KEY = "endingoffsetsbytimestamp"
  val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
  val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
  val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
  val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"
  val CONSUMER_POLL_TIMEOUT = "kafkaconsumer.polltimeoutms"
  val INCLUDE_HEADERS = "includeheaders"
  val TOPIC_OPTION_KEY = "topic"
}

docs

batch-operations.md

connection-configuration.md

error-handling-reliability.md

index.md

offset-management.md

schema-data-format.md

streaming-operations.md

tile.json