or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md
tile.json

tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12

Kafka 0.10+ Source for Structured Streaming providing Kafka integration for Apache Spark's streaming and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.12@3.5.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-sql-kafka-0-10_2-12@3.5.0

index.mddocs/

Apache Spark Kafka Connector

The Apache Spark Kafka Connector (spark-sql-kafka-0-10_2.12) provides seamless integration between Apache Kafka and Apache Spark's Structured Streaming and SQL APIs. It enables both batch and streaming data processing from Kafka topics with exactly-once processing semantics, offset management, and fault tolerance.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.12
  • Package Type: maven
  • Language: Scala
  • Installation: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6
  • Dependencies: Requires Apache Spark 3.5.6 and Kafka client libraries

Core Usage Pattern

The connector is accessed through Spark SQL's DataSource API using the "kafka" format identifier:

// Reading from Kafka (streaming)
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")
  .load()

// Writing to Kafka (streaming)
df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("append")
  .start()

Basic Usage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("KafkaExample")
  .getOrCreate()

// Stream from Kafka
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .option("startingOffsets", "earliest")
  .load()

// Extract and process the value
val processedDF = kafkaDF
  .select(col("value").cast("string").as("message"))
  .filter(col("message").isNotNull)

// Write back to Kafka
val query = processedDF
  .select(to_json(struct("*")).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("append")
  .start()

Architecture

The Spark Kafka connector is built around several key components:

  • KafkaSourceProvider: Main entry point that registers the "kafka" format with Spark SQL
  • Fixed Schema: Standardized schema for Kafka records (key, value, topic, partition, offset, timestamp, etc.)
  • Offset Management: Comprehensive offset tracking and recovery for exactly-once processing
  • Consumer Strategies: Flexible topic subscription patterns (subscribe, subscribePattern, assign)
  • Producer Integration: Seamless writing back to Kafka topics with proper serialization

Capabilities

Streaming Data Reading

Read data from Kafka topics in real-time using Spark Structured Streaming with micro-batch or continuous processing modes.

// Streaming read operation
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribe", topics: String) // or subscribepattern or assign
  .option("startingOffsets", offsets: String) // "earliest", "latest", or JSON
  .load(): DataFrame

Streaming Operations

Batch Data Reading

Read historical data from Kafka topics for batch processing and analysis.

// Batch read operation  
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribe", topics: String) // or subscribepattern or assign
  .option("startingOffsets", startOffsets: String)
  .option("endingOffsets", endOffsets: String)
  .load(): DataFrame

Batch Operations

Data Writing

Write DataFrame data to Kafka topics with proper serialization and partitioning.

// Streaming write operation
df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("topic", topicName: String) // optional if specified in data
  .outputMode("append")
  .start(): StreamingQuery

// Batch write operation
df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)  
  .option("topic", topicName: String)
  .save()

Write Operations

Configuration Management

Comprehensive configuration options for connection, performance tuning, and reliability.

// Core configuration options
.option("kafka.bootstrap.servers", servers: String) // Required
.option("subscribe", topics: String) // Topic selection
.option("maxOffsetsPerTrigger", maxRecords: String) // Performance tuning
.option("failOnDataLoss", failOnLoss: String) // Reliability

Configuration Options

Data Schema

Read Schema (Fixed)

All Kafka DataFrames have the following fixed schema:

// Fixed Kafka record schema
case class KafkaRecord(
  key: Array[Byte],              // Message key as byte array (nullable)
  value: Array[Byte],            // Message value as byte array  
  topic: String,                 // Topic name
  partition: Int,                // Partition number
  offset: Long,                  // Message offset within partition
  timestamp: java.sql.Timestamp, // Message timestamp
  timestampType: Int,            // 0=CreateTime, 1=LogAppendTime
  headers: Array[KafkaHeader]    // Optional headers (when includeHeaders=true)
)

case class KafkaHeader(
  key: String,
  value: Array[Byte]
)

Write Schema (Flexible)

For writing, DataFrames can contain any combination of these fields:

// Write schema fields (all optional except value)
case class KafkaWriteRecord(
  topic: String,        // Target topic (optional if set in options)
  key: Any,            // Message key (will be serialized)
  value: Any,          // Message value (required, will be serialized)
  partition: Int,      // Specific partition (optional)
  headers: Map[String, Array[Byte]] // Message headers (optional)
)

Topic Selection Strategies

// Subscribe to specific topics by name
.option("subscribe", "topic1,topic2,topic3")

// Subscribe to topics matching a regex pattern  
.option("subscribepattern", "events-.*")

// Assign specific partitions
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")

Offset Management Types

// Offset specification options
"earliest"  // Start from earliest available offsets
"latest"    // Start from latest available offsets  

// Specific offsets per partition (JSON format)
"""{"topic1":{"0":23,"1":345},"topic2":{"0":0}}"""

// Global timestamp (milliseconds since epoch)
.option("startingTimestamp", "1609459200000")

// Per-partition timestamps (JSON format)  
"""{"topic1":{"0":1609459200000,"1":1609459300000}}"""

Error Handling

The connector provides robust error handling for common scenarios:

  • Data Loss Detection: Configurable behavior when data is no longer available
  • Offset Validation: Automatic validation of offset ranges and availability
  • Connection Failures: Retry logic and graceful degradation
  • Schema Validation: Input validation for write operations
  • Configuration Errors: Clear error messages for invalid options

Performance Considerations

  • Consumer Pooling: Efficient reuse of Kafka consumers across tasks
  • Producer Caching: Connection pooling for Kafka producers
  • Batch Size Control: Configurable limits on records per micro-batch
  • Parallel Processing: Automatic parallelization based on Kafka partitions
  • Memory Management: Optimized handling of large message batches