Streaming operations allow real-time processing of Kafka topics using Spark Structured Streaming. The connector supports both micro-batch and continuous processing modes with exactly-once semantics.
Create a streaming DataFrame from Kafka topics for real-time data processing.
/**
* Create a streaming DataFrame from Kafka topics
* Returns a DataFrame with the fixed Kafka schema
*/
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
.option("subscribe", topics: String) // Topic subscription (comma-separated)
.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON
.load(): DataFrameUsage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaStreaming")
.getOrCreate()
// Basic streaming read
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs,metrics")
.option("startingOffsets", "latest")
.load()
// With consumer group configuration
val configuredStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.load()Subscribe to topics using regex patterns for dynamic topic discovery.
/**
* Subscribe to topics matching a regex pattern
* Automatically includes new topics that match the pattern
*/
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("subscribepattern", pattern: String) // Regex pattern for topic names
.load(): DataFrameUsage Examples:
// Subscribe to all topics starting with "events-"
val patternStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribepattern", "events-.*")
.option("startingOffsets", "latest")
.load()
// Subscribe to topics by environment
val envStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribepattern", s"${env}-.*") // prod-.*, dev-.*, etc.
.load()Directly assign specific Kafka partitions for fine-grained control.
/**
* Assign specific Kafka partitions for reading
* Provides exact control over which partitions to consume
*/
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("assign", partitionsJson: String) // JSON specification of TopicPartitions
.load(): DataFrameUsage Examples:
// Assign specific partitions
val assignedStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1,2],"logs":[0]}""")
.option("startingOffsets", "earliest")
.load()
// Assign partitions with specific offsets
val offsetStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1],"logs":[0,1]}""")
.option("startingOffsets", """{"events":{"0":100,"1":200},"logs":{"0":50,"1":75}}""")
.load()Control exactly where streaming starts and how offsets are managed.
/**
* Offset specification options for streaming reads
*/
// Starting from specific positions
.option("startingOffsets", "earliest") // Start from earliest available
.option("startingOffsets", "latest") // Start from latest available
.option("startingOffsets", offsetJson) // Start from specific offsets (JSON)
// Timestamp-based offset resolution
.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)
.option("startingOffsetsByTimestamp", timestampJson) // Per-partition timestamps (JSON)Usage Examples:
// Start from earliest available offsets
val earliestStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
// Start from specific timestamp
val timestampStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC
.load()
// Start from specific offsets per partition
val specificStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
.load()Configure streaming performance and resource usage.
/**
* Performance tuning options for streaming operations
*/
.option("maxOffsetsPerTrigger", maxRecords: String) // Max records per micro-batch
.option("minOffsetsPerTrigger", minRecords: String) // Min records before triggering
.option("maxTriggerDelay", delay: String) // Max delay before triggering (e.g., "30s")Usage Examples:
// Control batch sizes
val tunedStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "high-volume-topic")
.option("maxOffsetsPerTrigger", "10000") // Max 10K records per batch
.option("minOffsetsPerTrigger", "1000") // Min 1K records before processing
.option("maxTriggerDelay", "30s") // Process every 30s regardless
.load()Configure failure handling and data loss behavior.
/**
* Reliability and failure handling options
*/
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
.option("groupIdPrefix", prefix: String) // Consumer group ID prefix
.option("includeHeaders", includeHeaders: String) // "true" or "false"Usage Examples:
// Configure reliability
val reliableStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-events")
.option("failOnDataLoss", "true") // Fail if data is lost
.option("groupIdPrefix", "my-app") // Custom consumer group prefix
.option("includeHeaders", "true") // Include message headers in schema
.load()
// Handle potential data loss gracefully
val gracefulStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("failOnDataLoss", "false") // Continue on data loss
.load()Choose between micro-batch and continuous processing modes.
/**
* Stream processing with different execution modes
*/
// Micro-batch processing (default)
query.trigger(Trigger.ProcessingTime("10 seconds"))
// Continuous processing (experimental)
query.trigger(Trigger.Continuous("1 second"))Usage Examples:
import org.apache.spark.sql.streaming.Trigger
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
// Micro-batch processing every 30 seconds
val microBatchQuery = stream
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
// Continuous processing (low-latency)
val continuousQuery = stream
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start()import org.apache.spark.sql.functions._
// Extract and cast message values
val messages = kafkaStream
.select(
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
col("key").cast("string").as("messageKey"),
col("value").cast("string").as("messageValue")
)
// Parse JSON messages
val jsonMessages = kafkaStream
.select(
from_json(col("value").cast("string"), schema).as("data"),
col("topic"),
col("timestamp")
)
.select("data.*", "topic", "timestamp")import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._
// Windowed count by topic
val windowedCounts = kafkaStream
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("topic")
)
.count()
.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()// Maintain state across micro-batches
val statefulStream = kafkaStream
.groupByKey(_.topic)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(
updateFunction
)Common error scenarios and handling strategies:
// Handle deserialization errors
val safeMessages = kafkaStream
.select(
col("topic"),
col("offset"),
when(col("value").isNotNull,
col("value").cast("string")).as("messageValue")
)
.filter(col("messageValue").isNotNull)
// Monitor for data loss
kafkaStream.writeStream
.option("checkpointLocation", "/path/to/checkpoint")
.foreachBatch { (batchDF, batchId) =>
// Custom batch processing with error handling
try {
batchDF.show()
} catch {
case ex: Exception =>
println(s"Error processing batch $batchId: ${ex.getMessage}")
}
}
.start()