Batch reading and writing operations with flexible offset ranges and performance optimization for processing historical Kafka data.
Read historical data from Kafka topics with precise offset control and range specifications.
/**
* Basic batch read from Kafka topics
* Reads all available data between specified offset ranges
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "earliest") // Required for batch
.option("endingOffsets", "latest") // Optional, defaults to latest
.load()Usage Example:
// Read all historical data from multiple topics
val historicalDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders,payments,shipments")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Process historical data
val summaryDF = historicalDF
.selectExpr("topic", "CAST(value AS STRING) as json_str")
.select(col("topic"), from_json(col("json_str"), messageSchema).as("data"))
.groupBy("topic", "data.date")
.count()
summaryDF.show()Precise control over which messages to read using specific offsets.
/**
* Specific offset ranges for batch processing
* JSON format: {"topic1":{"partition1":startOffset,"partition2":startOffset}}
*/
.option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")
.option("endingOffsets", """{"orders":{"0":5000,"1":8000},"payments":{"0":2000}}""")Usage Example:
// Read specific ranges for data reconciliation
val reconciliationDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transactions")
.option("startingOffsets", """{"transactions":{"0":100000,"1":150000,"2":200000}}""")
.option("endingOffsets", """{"transactions":{"0":101000,"1":151000,"2":201000}}""")
.load()
// Verify data integrity for specific ranges
val integrityCheck = reconciliationDF
.select(
col("partition"),
col("offset"),
expr("CAST(value AS STRING)").as("message")
)
.filter("message RLIKE 'ERROR|FAILURE'")
integrityCheck.show()Read data based on timestamp ranges rather than specific offsets.
/**
* Time-based offset specification for batch reads
* Timestamps in milliseconds since Unix epoch
*/
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000,"1":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000,"1":1640995300000}}""")Usage Example:
// Read data for specific time window (e.g., Jan 1, 2022 to Jan 2, 2022)
val timeWindowDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000,"1":1640995200000}}""") // Jan 1, 2022 00:00:00
.option("endingOffsetsByTimestamp", """{"events":{"0":1641081600000,"1":1641081600000}}""") // Jan 2, 2022 00:00:00
.load()
// Analyze daily patterns
val dailyPatterns = timeWindowDF
.selectExpr(
"hour(timestamp) as hour",
"CAST(value AS STRING) as message"
)
.groupBy("hour")
.count()
.orderBy("hour")
dailyPatterns.show(24)Write DataFrames to Kafka topics in batch mode with various output options.
/**
* Write DataFrame to Kafka in batch mode
* DataFrame must contain 'value' column (binary or string)
* Optional: 'key', 'topic', 'partition', 'headers' columns
*/
dataFrame.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic") // Default topic
.save()Usage Example:
// Process batch data and write results
val processedDF = spark
.read
.format("parquet")
.load("/path/to/processed/data")
.select(
col("user_id").cast("string").as("key"),
to_json(struct("event_type", "timestamp", "data")).cast("binary").as("value")
)
processedDF.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "processed-events")
.save()Route different records to different topics based on data content.
/**
* Dynamic topic assignment per record
* Use 'topic' column to specify destination topic for each record
*/
dataFrame
.select(
expr("CASE WHEN priority = 'high' THEN 'urgent-events' ELSE 'normal-events' END").as("topic"),
expr("CAST(message AS BINARY)").as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save() // No default topic needed when topic column is presentUsage Example:
// Route events to different topics based on type
val routedEvents = spark
.read
.format("json")
.load("/path/to/events")
.select(
expr("CASE event_type " +
"WHEN 'user_action' THEN 'analytics-events' " +
"WHEN 'system_error' THEN 'error-events' " +
"WHEN 'performance' THEN 'metrics-events' " +
"ELSE 'general-events' END").as("topic"),
col("user_id").cast("binary").as("key"),
to_json(struct("*")).cast("binary").as("value")
)
routedEvents.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.save()Control which Kafka partition receives each record.
/**
* Explicit partition assignment
* Use 'partition' column to specify target partition (integer)
*/
dataFrame
.select(
expr("hash(user_id) % 10").as("partition"), // Distribute across 10 partitions
col("user_id").cast("binary").as("key"),
expr("CAST(message AS BINARY)").as("value")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "partitioned-events")
.save()Configuration options for optimizing batch read and write performance.
/**
* Performance tuning for batch reads
*/
.option("minPartitions", "20") // Increase Spark parallelism
.option("kafka.fetch.min.bytes", "1048576") // 1MB minimum fetch size
.option("kafka.max.poll.records", "10000") // More records per poll
.option("kafka.receive.buffer.bytes", "1048576") // 1MB receive bufferUsage Example:
// Optimized batch read for large datasets
val largeBatchDF = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "50") // High parallelism
.option("kafka.fetch.min.bytes", "2097152") // 2MB fetch size
.option("kafka.max.poll.records", "50000") // Large poll size
.load()
// Efficient processing with proper partitioning
val result = largeBatchDF
.repartition(200, col("topic"), col("partition")) // Redistribute for processing
.selectExpr("CAST(value AS STRING) as json_str")
.select(from_json(col("json_str"), schema).as("data"))
.select("data.*")
.filter("status = 'active'")
.cache() // Cache for multiple actions
result.write.mode("overwrite").parquet("/path/to/output")/**
* Performance tuning for batch writes
*/
.option("kafka.batch.size", "65536") // 64KB batch size
.option("kafka.linger.ms", "100") // Wait 100ms for batching
.option("kafka.compression.type", "snappy") // Enable compression
.option("kafka.acks", "1") // Faster acknowledgment
.option("kafka.buffer.memory", "134217728") // 128MB bufferUsage Example:
// High-throughput batch write
val highThroughputWrite = processedDF
.coalesce(10) // Reduce number of tasks for efficiency
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "high-volume-output")
.option("kafka.batch.size", "131072") // 128KB batches
.option("kafka.linger.ms", "200") // Wait for full batches
.option("kafka.compression.type", "lz4") // Fast compression
.option("kafka.acks", "1") // Balance speed/reliability
.save()Validation and constraints specific to batch operations.
/**
* Batch operation constraints and validations
*/
// Starting offsets cannot be "latest" for batch reads
// Ending offsets cannot be "earliest" for batch reads
// Both timestamp and offset specifications are supported
// maxOffsetsPerTrigger is ignored in batch mode (with warning)Error Handling Examples:
// This will throw an error - invalid for batch
try {
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest") // ERROR: Cannot use "latest" for batch startingOffsets
.load()
} catch {
case e: IllegalArgumentException =>
println("Cannot use 'latest' for batch starting offsets")
}
// This will throw an error - invalid for batch
try {
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.option("endingOffsets", "earliest") // ERROR: Cannot use "earliest" for batch endingOffsets
.load()
} catch {
case e: IllegalArgumentException =>
println("Cannot use 'earliest' for batch ending offsets")
}import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
import org.apache.spark.sql.types.Row
// Batch operation interfaces
trait DataFrameReader {
def format(source: String): DataFrameReader // "kafka"
def option(key: String, value: String): DataFrameReader
def load(): DataFrame
}
trait DataFrameWriter[T] {
def format(source: String): DataFrameWriter[T] // "kafka"
def option(key: String, value: String): DataFrameWriter[T]
def save(): Unit
def save(path: String): Unit // Not used for Kafka
}
// Batch-specific implementations
class KafkaBatch {
def planInputPartitions(): Array[InputPartition]
def createReaderFactory(): PartitionReaderFactory
}
class KafkaBatchWrite {
def createBatchWriterFactory(): DataWriterFactory
}