Kafka 0.10+ Source for Structured Streaming providing Kafka integration for Apache Spark's streaming and batch processing
Batch operations enable reading historical data from Kafka topics for analysis, data migration, and batch processing workflows. Unlike streaming, batch reads have defined start and end boundaries.
Read a specific range of data from Kafka topics for batch processing.
/**
* Create a batch DataFrame from Kafka topics
* Reads data between specified offset ranges
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
.option("subscribe", topics: String) // Topic subscription (comma-separated)
.option("startingOffsets", startOffsets: String) // Starting position: "earliest" or JSON
.option("endingOffsets", endOffsets: String) // Ending position: "latest" or JSON
.load(): DataFrameUsage Examples:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("KafkaBatch")
.getOrCreate()
// Read all available data
val allData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events,logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read specific offset ranges
val rangeData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", """{"user-events":{"0":1000,"1":2000}}""")
.option("endingOffsets", """{"user-events":{"0":5000,"1":6000}}""")
.load()Read from multiple topics matching a pattern for batch analysis.
/**
* Batch read from topics matching a regex pattern
* Useful for reading from topic families or time-partitioned topics
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("subscribepattern", pattern: String) // Regex pattern for topic names
.option("startingOffsets", startOffsets: String)
.option("endingOffsets", endOffsets: String)
.load(): DataFrameUsage Examples:
// Read from all daily event topics
val dailyEvents = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribepattern", "events-2024-.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read from environment-specific topics
val prodLogs = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "prod-kafka:9092")
.option("subscribepattern", "prod-.*-logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()Assign specific partitions for batch reading with precise control.
/**
* Assign specific Kafka partitions for batch reading
* Provides exact control over data ranges and partitions
*/
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", servers: String)
.option("assign", partitionsJson: String) // JSON specification of TopicPartitions
.option("startingOffsets", startOffsets: String)
.option("endingOffsets", endOffsets: String)
.load(): DataFrameUsage Examples:
// Read specific partitions for parallel processing
val partitionData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"events":[0,1,2],"metrics":[0,1]}""")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Read with partition-specific offset ranges
val preciseData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("assign", """{"logs":[0,1,2,3]}""")
.option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500,"3":3000}}""")
.option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500,"3":7000}}""")
.load()Read data based on message timestamps for time-range analysis.
/**
* Timestamp-based offset resolution for batch reads
* Automatically finds offsets corresponding to specific timestamps
*/
// Global timestamp range
.option("startingTimestamp", startTime: String) // Start timestamp (ms since epoch)
.option("endingTimestamp", endTime: String) // End timestamp (ms since epoch)
// Per-partition timestamp specification
.option("startingOffsetsByTimestamp", startTimestamps: String) // JSON timestamps per partition
.option("endingOffsetsByTimestamp", endTimestamps: String) // JSON timestamps per partitionUsage Examples:
import java.time.Instant
// Read data from last 24 hours
val yesterday = Instant.now().minusSeconds(86400).toEpochMilli.toString
val now = Instant.now().toEpochMilli.toString
val recentData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.option("startingTimestamp", yesterday)
.option("endingTimestamp", now)
.load()
// Read specific time ranges per partition
val timeRangeData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "metrics")
.option("startingOffsetsByTimestamp", s"""{"metrics":{"0":$yesterday,"1":$yesterday}}""")
.option("endingOffsetsByTimestamp", s"""{"metrics":{"0":$now,"1":$now}}""")
.load()Optimize batch reading performance for large datasets.
/**
* Performance configuration for batch operations
*/
.option("minPartitions", partitionCount: String) // Minimum Spark partitions
.option("fetchOffset.numRetries", retries: String) // Offset fetch retry count
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in ms
.option("kafkaConsumer.pollTimeoutMs", timeout: String) // Consumer poll timeoutUsage Examples:
// Optimize for large batch processing
val optimizedBatch = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "large-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("minPartitions", "100") // Force higher parallelism
.option("fetchOffset.numRetries", "5") // Retry offset fetching
.option("fetchOffset.retryIntervalMs", "200") // Wait 200ms between retries
.load()Validate and filter batch data for quality assurance.
/**
* Data validation options for batch processing
*/
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
.option("includeHeaders", includeHeaders: String) // "true" or "false"Usage Examples:
// Strict data validation
val validatedData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "critical-data")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "true") // Fail if any data is missing
.option("includeHeaders", "true") // Include headers for metadata
.load()
// Permissive data reading
val permissiveData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "logs")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false") // Continue despite gaps
.load()import org.apache.spark.sql.functions._
val historicalData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Analyze message patterns by hour
val hourlyStats = historicalData
.select(
hour(col("timestamp")).as("hour"),
col("topic"),
col("partition")
)
.groupBy("hour", "topic")
.agg(
count("*").as("message_count"),
countDistinct("partition").as("active_partitions")
)
.orderBy("hour", "topic")
hourlyStats.show()// Read from source Kafka cluster
val sourceData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "source-cluster:9092")
.option("subscribe", "legacy-topic")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
// Transform and write to destination
sourceData
.select(
col("value").cast("string").as("original_value"),
col("timestamp"),
col("partition").as("source_partition")
)
.write
.format("kafka")
.option("kafka.bootstrap.servers", "dest-cluster:9092")
.option("topic", "migrated-topic")
.save()// Assess data quality across time ranges
val qualityReport = historicalData
.select(
date_trunc("day", col("timestamp")).as("day"),
col("topic"),
when(col("key").isNull, 1).otherwise(0).as("null_keys"),
when(col("value").isNull, 1).otherwise(0).as("null_values"),
length(col("value")).as("value_size")
)
.groupBy("day", "topic")
.agg(
count("*").as("total_messages"),
sum("null_keys").as("messages_with_null_keys"),
sum("null_values").as("messages_with_null_values"),
avg("value_size").as("avg_message_size"),
max("value_size").as("max_message_size")
)
.orderBy("day", "topic")// Analyze offset ranges and gaps
val offsetAnalysis = historicalData
.groupBy("topic", "partition")
.agg(
min("offset").as("min_offset"),
max("offset").as("max_offset"),
count("*").as("message_count"),
countDistinct("offset").as("unique_offsets")
)
.withColumn("expected_count", col("max_offset") - col("min_offset") + 1)
.withColumn("has_gaps", col("unique_offsets") =!= col("expected_count"))
.select(
col("topic"),
col("partition"),
col("min_offset"),
col("max_offset"),
col("message_count"),
col("has_gaps")
)// Archive old Kafka data to long-term storage
val archiveData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "transaction-logs")
.option("startingTimestamp", "1609459200000") // Start of 2021
.option("endingTimestamp", "1640995200000") // End of 2021
.load()
// Save as Parquet with partitioning
archiveData
.withColumn("year", year(col("timestamp")))
.withColumn("month", month(col("timestamp")))
.write
.partitionBy("year", "month", "topic")
.parquet("s3://archive-bucket/kafka-data/")// Replay specific time period for debugging
val replayData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-actions")
.option("startingTimestamp", "1640995200000") // Specific incident time
.option("endingTimestamp", "1640998800000") // One hour later
.load()
// Reprocess the data with updated logic
val reprocessed = replayData
.select(col("value").cast("string").as("event"))
.select(from_json(col("event"), updatedSchema).as("data"))
.select("data.*")// Extract data for compliance reporting
val complianceData = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "user-data-changes")
.option("startingTimestamp", quarterStartTime)
.option("endingTimestamp", quarterEndTime)
.option("includeHeaders", "true")
.load()
// Generate audit trail
val auditTrail = complianceData
.select(
col("timestamp"),
col("headers"),
get_json_object(col("value").cast("string"), "$.userId").as("user_id"),
get_json_object(col("value").cast("string"), "$.action").as("action"),
get_json_object(col("value").cast("string"), "$.dataType").as("data_type")
)
.filter(col("data_type").isin("PII", "SENSITIVE"))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-sql-kafka-0-10-2-12