CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-12

Kafka 0.10+ Source for Structured Streaming providing Kafka integration for Apache Spark's streaming and batch processing

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Spark Kafka Connector

The Apache Spark Kafka Connector (spark-sql-kafka-0-10_2.12) provides seamless integration between Apache Kafka and Apache Spark's Structured Streaming and SQL APIs. It enables both batch and streaming data processing from Kafka topics with exactly-once processing semantics, offset management, and fault tolerance.

Package Information

  • Package Name: spark-sql-kafka-0-10_2.12
  • Package Type: maven
  • Language: Scala
  • Installation: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.6
  • Dependencies: Requires Apache Spark 3.5.6 and Kafka client libraries

Core Usage Pattern

The connector is accessed through Spark SQL's DataSource API using the "kafka" format identifier:

// Reading from Kafka (streaming)
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")
  .load()

// Writing to Kafka (streaming)
df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("append")
  .start()

Basic Usage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("KafkaExample")
  .getOrCreate()

// Stream from Kafka
val kafkaDF = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-topic")
  .option("startingOffsets", "earliest")
  .load()

// Extract and process the value
val processedDF = kafkaDF
  .select(col("value").cast("string").as("message"))
  .filter(col("message").isNotNull)

// Write back to Kafka
val query = processedDF
  .select(to_json(struct("*")).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")
  .outputMode("append")
  .start()

Architecture

The Spark Kafka connector is built around several key components:

  • KafkaSourceProvider: Main entry point that registers the "kafka" format with Spark SQL
  • Fixed Schema: Standardized schema for Kafka records (key, value, topic, partition, offset, timestamp, etc.)
  • Offset Management: Comprehensive offset tracking and recovery for exactly-once processing
  • Consumer Strategies: Flexible topic subscription patterns (subscribe, subscribePattern, assign)
  • Producer Integration: Seamless writing back to Kafka topics with proper serialization

Capabilities

Streaming Data Reading

Read data from Kafka topics in real-time using Spark Structured Streaming with micro-batch or continuous processing modes.

// Streaming read operation
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribe", topics: String) // or subscribepattern or assign
  .option("startingOffsets", offsets: String) // "earliest", "latest", or JSON
  .load(): DataFrame

Streaming Operations

Batch Data Reading

Read historical data from Kafka topics for batch processing and analysis.

// Batch read operation  
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribe", topics: String) // or subscribepattern or assign
  .option("startingOffsets", startOffsets: String)
  .option("endingOffsets", endOffsets: String)
  .load(): DataFrame

Batch Operations

Data Writing

Write DataFrame data to Kafka topics with proper serialization and partitioning.

// Streaming write operation
df.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("topic", topicName: String) // optional if specified in data
  .outputMode("append")
  .start(): StreamingQuery

// Batch write operation
df.write
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)  
  .option("topic", topicName: String)
  .save()

Write Operations

Configuration Management

Comprehensive configuration options for connection, performance tuning, and reliability.

// Core configuration options
.option("kafka.bootstrap.servers", servers: String) // Required
.option("subscribe", topics: String) // Topic selection
.option("maxOffsetsPerTrigger", maxRecords: String) // Performance tuning
.option("failOnDataLoss", failOnLoss: String) // Reliability

Configuration Options

Data Schema

Read Schema (Fixed)

All Kafka DataFrames have the following fixed schema:

// Fixed Kafka record schema
case class KafkaRecord(
  key: Array[Byte],              // Message key as byte array (nullable)
  value: Array[Byte],            // Message value as byte array  
  topic: String,                 // Topic name
  partition: Int,                // Partition number
  offset: Long,                  // Message offset within partition
  timestamp: java.sql.Timestamp, // Message timestamp
  timestampType: Int,            // 0=CreateTime, 1=LogAppendTime
  headers: Array[KafkaHeader]    // Optional headers (when includeHeaders=true)
)

case class KafkaHeader(
  key: String,
  value: Array[Byte]
)

Write Schema (Flexible)

For writing, DataFrames can contain any combination of these fields:

// Write schema fields (all optional except value)
case class KafkaWriteRecord(
  topic: String,        // Target topic (optional if set in options)
  key: Any,            // Message key (will be serialized)
  value: Any,          // Message value (required, will be serialized)
  partition: Int,      // Specific partition (optional)
  headers: Map[String, Array[Byte]] // Message headers (optional)
)

Topic Selection Strategies

// Subscribe to specific topics by name
.option("subscribe", "topic1,topic2,topic3")

// Subscribe to topics matching a regex pattern  
.option("subscribepattern", "events-.*")

// Assign specific partitions
.option("assign", """{"topic1":[0,1],"topic2":[0]}""")

Offset Management Types

// Offset specification options
"earliest"  // Start from earliest available offsets
"latest"    // Start from latest available offsets  

// Specific offsets per partition (JSON format)
"""{"topic1":{"0":23,"1":345},"topic2":{"0":0}}"""

// Global timestamp (milliseconds since epoch)
.option("startingTimestamp", "1609459200000")

// Per-partition timestamps (JSON format)  
"""{"topic1":{"0":1609459200000,"1":1609459300000}}"""

Error Handling

The connector provides robust error handling for common scenarios:

  • Data Loss Detection: Configurable behavior when data is no longer available
  • Offset Validation: Automatic validation of offset ranges and availability
  • Connection Failures: Retry logic and graceful degradation
  • Schema Validation: Input validation for write operations
  • Configuration Errors: Clear error messages for invalid options

Performance Considerations

  • Consumer Pooling: Efficient reuse of Kafka consumers across tasks
  • Producer Caching: Connection pooling for Kafka producers
  • Batch Size Control: Configurable limits on records per micro-batch
  • Parallel Processing: Automatic parallelization based on Kafka partitions
  • Memory Management: Optimized handling of large message batches
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-sql-kafka-0-10_2.12@3.5.x
Publish Source
CLI
Badge
tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-12 badge