Batch operations enable reading historical data from Kafka topics for analysis, data migration, and batch processing workflows. Unlike streaming, batch reads have defined start and end boundaries.
Read a specific range of data from Kafka topics for batch processing.
/**
* Create a batch DataFrame from Kafka topics
* Reads data between specified offset ranges
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
.option("subscribe", topics: String) // Topic subscription (comma-separated)
.option("startingOffsets", startOffsets: String) // Starting position: "earliest" or JSON
.option("endingOffsets", endOffsets: String) // Ending position: "latest" or JSON
.load(): DataFrameUsage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaBatch")
.getOrCreate()
// Read all available data
val allData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read specific offset ranges
val rangeData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", """{"user-events":{"0":1000,"1":2000}}""")
.option("endingOffsets", """{"user-events":{"0":5000,"1":6000}}""")
.load()Read from multiple topics matching a pattern for batch analysis.
/**
* Batch read from topics matching a regex pattern
* Useful for reading from topic families or time-partitioned topics
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("subscribepattern", pattern: String) // Regex pattern for topic names
.option("startingOffsets", startOffsets: String)
.option("endingOffsets", endOffsets: String)
.load(): DataFrameUsage Examples:
// Read from all daily event topics
val dailyEvents = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribepattern", "events-2024-.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read from environment-specific topics
val prodLogs = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "prod-kafka:9092")
.option("subscribepattern", "prod-.*-logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()Assign specific partitions for batch reading with precise control.
/**
* Assign specific Kafka partitions for batch reading
* Provides exact control over data ranges and partitions
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("assign", partitionsJson: String) // JSON specification of TopicPartitions
.option("startingOffsets", startOffsets: String)
.option("endingOffsets", endOffsets: String)
.load(): DataFrameUsage Examples:
// Read specific partitions for parallel processing
val partitionData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1,2],"metrics":[0,1]}""")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read with partition-specific offset ranges
val preciseData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"logs":[0,1,2,3]}""")
.option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500,"3":3000}}""")
.option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500,"3":7000}}""")
.load()Read data based on message timestamps for time-range analysis.
/**
* Timestamp-based offset resolution for batch reads
* Automatically finds offsets corresponding to specific timestamps
*/
// Global timestamp range
.option("startingTimestamp", startTime: String) // Start timestamp (ms since epoch)
.option("endingTimestamp", endTime: String) // End timestamp (ms since epoch)
// Per-partition timestamp specification
.option("startingOffsetsByTimestamp", startTimestamps: String) // JSON timestamps per partition
.option("endingOffsetsByTimestamp", endTimestamps: String) // JSON timestamps per partitionUsage Examples:
import java.time.Instant
// Read data from last 24 hours
val yesterday = Instant.now().minusSeconds(86400).toEpochMilli.toString
val now = Instant.now().toEpochMilli.toString
val recentData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingTimestamp", yesterday)
.option("endingTimestamp", now)
.load()
// Read specific time ranges per partition
val timeRangeData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "metrics")
.option("startingOffsetsByTimestamp", s"""{"metrics":{"0":$yesterday,"1":$yesterday}}""")
.option("endingOffsetsByTimestamp", s"""{"metrics":{"0":$now,"1":$now}}""")
.load()Optimize batch reading performance for large datasets.
/**
* Performance configuration for batch operations
*/
.option("minPartitions", partitionCount: String) // Minimum Spark partitions
.option("fetchOffset.numRetries", retries: String) // Offset fetch retry count
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in ms
.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeoutUsage Examples:
// Optimize for large batch processing
val optimizedBatch = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "100") // Force higher parallelism
.option("fetchOffset.numRetries", "5") // Retry offset fetching
.option("fetchOffset.retryIntervalMs", "200") // Wait 200ms between retries
.load()Validate and filter batch data for quality assurance.
/**
* Data validation options for batch processing
*/
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
.option("includeHeaders", includeHeaders: String) // "true" or "false"Usage Examples:
// Strict data validation
val validatedData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-data")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "true") // Fail if any data is missing
.option("includeHeaders", "true") // Include headers for metadata
.load()
// Permissive data reading
val permissiveData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false") // Continue despite gaps
.load()import org.apache.spark.sql.functions._
val historicalData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Analyze message patterns by hour
val hourlyStats = historicalData
.select(
hour(col("timestamp")).as("hour"),
col("topic"),
col("partition")
)
.groupBy("hour", "topic")
.agg(
count("*").as("message_count"),
countDistinct("partition").as("active_partitions")
)
.orderBy("hour", "topic")
hourlyStats.show()// Read from source Kafka cluster
val sourceData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "source-cluster:9092")
.option("subscribe", "legacy-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Transform and write to destination
sourceData
.select(
col("value").cast("string").as("original_value"),
col("timestamp"),
col("partition").as("source_partition")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "dest-cluster:9092")
.option("topic", "migrated-topic")
.save()// Assess data quality across time ranges
val qualityReport = historicalData
.select(
date_trunc("day", col("timestamp")).as("day"),
col("topic"),
when(col("key").isNull, 1).otherwise(0).as("null_keys"),
when(col("value").isNull, 1).otherwise(0).as("null_values"),
length(col("value")).as("value_size")
)
.groupBy("day", "topic")
.agg(
count("*").as("total_messages"),
sum("null_keys").as("messages_with_null_keys"),
sum("null_values").as("messages_with_null_values"),
avg("value_size").as("avg_message_size"),
max("value_size").as("max_message_size")
)
.orderBy("day", "topic")// Analyze offset ranges and gaps
val offsetAnalysis = historicalData
.groupBy("topic", "partition")
.agg(
min("offset").as("min_offset"),
max("offset").as("max_offset"),
count("*").as("message_count"),
countDistinct("offset").as("unique_offsets")
)
.withColumn("expected_count", col("max_offset") - col("min_offset") + 1)
.withColumn("has_gaps", col("unique_offsets") =!= col("expected_count"))
.select(
col("topic"),
col("partition"),
col("min_offset"),
col("max_offset"),
col("message_count"),
col("has_gaps")
)// Archive old Kafka data to long-term storage
val archiveData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transaction-logs")
.option("startingTimestamp", "1609459200000") // Start of 2021
.option("endingTimestamp", "1640995200000") // End of 2021
.load()
// Save as Parquet with partitioning
archiveData
.withColumn("year", year(col("timestamp")))
.withColumn("month", month(col("timestamp")))
.write
.partitionBy("year", "month", "topic")
.parquet("s3://archive-bucket/kafka-data/")// Replay specific time period for debugging
val replayData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-actions")
.option("startingTimestamp", "1640995200000") // Specific incident time
.option("endingTimestamp", "1640998800000") // One hour later
.load()
// Reprocess the data with updated logic
val reprocessed = replayData
.select(col("value").cast("string").as("event"))
.select(from_json(col("event"), updatedSchema).as("data"))
.select("data.*")// Extract data for compliance reporting
val complianceData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-data-changes")
.option("startingTimestamp", quarterStartTime)
.option("endingTimestamp", quarterEndTime)
.option("includeHeaders", "true")
.load()
// Generate audit trail
val auditTrail = complianceData
.select(
col("timestamp"),
col("headers"),
get_json_object(col("value").cast("string"), "$.userId").as("user_id"),
get_json_object(col("value").cast("string"), "$.action").as("action"),
get_json_object(col("value").cast("string"), "$.dataType").as("data_type")
)
.filter(col("data_type").isin("PII", "SENSITIVE"))