or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

batch-reading.mddocs/

Batch Reading

The Kafka connector provides efficient batch reading capabilities for processing historical Kafka data with offset range optimization and partition-aware processing.

Capabilities

KafkaBatch

V2 DataSource batch implementation for reading Kafka data efficiently.

/**
 * V2 DataSource batch implementation for reading Kafka data
 * Provides efficient batch processing with partition optimization
 */
class KafkaBatch extends Batch {
  
  /** Plans input partitions for parallel processing */
  def planInputPartitions(): Array[InputPartition]
  
  /** Creates reader factory for partition processing */
  def createReaderFactory(): PartitionReaderFactory
}

Usage Examples:

// Basic batch reading
val kafkaBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "historical-data")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Process batch data
val processedBatch = kafkaBatch
  .select(
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp"),
    expr("CAST(value AS STRING)").as("message")
  )
  .groupBy("topic", "partition")
  .agg(
    count("*").as("message_count"),
    min("offset").as("min_offset"),
    max("offset").as("max_offset")
  )

KafkaRelation

V1 DataSource relation for batch reading with TableScan support.

/**
 * V1 DataSource relation for batch reading from Kafka
 * Provides backward compatibility with DataSource V1 API
 */
class KafkaRelation extends BaseRelation with TableScan {
  
  /** Returns the schema for Kafka records */
  def schema: StructType
  
  /** Builds RDD for scanning all data */
  def buildScan(): RDD[Row]
  
  /** Returns SQL context */
  def sqlContext: SQLContext
}

Partition Input Planning

KafkaBatchInputPartition

Represents a single input partition for batch processing.

/**
 * Input partition for batch Kafka reading
 * Represents a range of offsets to be processed by a single task
 */
case class KafkaBatchInputPartition(
  offsetRange: KafkaOffsetRange,
  executorKafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean,
  includeHeaders: Boolean
) extends InputPartition {
  
  /** Returns preferred locations for this partition (empty for Kafka) */
  def preferredLocations(): Array[String] = Array.empty
}

KafkaOffsetRange

Defines the range of offsets to be processed by a partition reader.

/**
 * Represents a range of offsets for a specific topic partition
 * Used for planning batch processing tasks
 */
case class KafkaOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long,
  preferredLoc: Option[String]
) {
  /** Topic name */
  def topic: String = topicPartition.topic()
  
  /** Partition number */
  def partition: Int = topicPartition.partition()
  
  /** Estimated number of messages in this range */
  def size: Long = untilOffset - fromOffset
}

Partition Readers

KafkaBatchReaderFactory

Factory for creating partition readers.

/**
 * Factory for creating Kafka batch partition readers
 * Creates readers that can process KafkaBatchInputPartition instances
 */
object KafkaBatchReaderFactory extends PartitionReaderFactory {
  
  /** Creates a partition reader for the given input partition */
  def createReader(partition: InputPartition): PartitionReader[InternalRow]
}

KafkaBatchPartitionReader

Reads data from a specific Kafka partition range.

/**
 * Partition reader for batch Kafka data processing
 * Reads a specific range of offsets from a Kafka partition
 */
case class KafkaBatchPartitionReader(
  offsetRange: KafkaOffsetRange,
  executorKafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean,
  includeHeaders: Boolean
) extends PartitionReader[InternalRow] {
  
  /** Advances to next record */
  def next(): Boolean
  
  /** Gets current record as UnsafeRow */
  def get(): UnsafeRow
  
  /** Closes reader and releases resources */
  def close(): Unit
  
  /** Returns current metrics values */
  def currentMetricsValues(): Array[CustomTaskMetric]
}

Offset Range Configuration

Time-based Range Selection

// Read data from specific time range
val timeRangeBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingTimestamp", "1609459200000")  // Jan 1, 2021 00:00:00 UTC
  .option("endingTimestamp", "1609545600000")    // Jan 2, 2021 00:00:00 UTC
  .load()

Specific Offset Ranges

// Read specific offset ranges per partition
val specificOffsetsBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
  .option("endingOffsets", """{"events":{"0":5000,"1":6000,"2":4500}}""")
  .load()

Mixed Offset Strategies

// Mix earliest/latest with specific offsets
val mixedOffsetsBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":-2,"1":1000,"2":-2}}""")  // -2 = earliest
  .option("endingOffsets", """{"events":{"0":-1,"1":2000,"2":-1}}""")    // -1 = latest
  .load()

Partition Optimization

Minimum Partitions

Control the minimum number of Spark partitions for processing:

// Ensure sufficient parallelism for batch processing
val parallelBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("minPartitions", "50")  // Create at least 50 Spark partitions
  .load()

KafkaOffsetRangeCalculator

Internal calculator for optimizing offset ranges based on partition configuration.

/**
 * Calculates optimal offset ranges for processing
 * Splits large partition ranges to improve parallelism
 */
class KafkaOffsetRangeCalculator(minPartitions: Option[Int]) {
  
  /** 
   * Calculates offset ranges ensuring minimum partition count
   * Splits large ranges across multiple Spark partitions if needed
   */
  def getRanges(
    fromOffsets: Seq[KafkaOffsetRange],
    executorLocations: Seq[String]
  ): Seq[KafkaOffsetRange]
}

object KafkaOffsetRangeCalculator {
  /** Creates calculator from options */
  def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator
}

Range Splitting Logic:

// If a Kafka partition has 100,000 messages and minPartitions=10:
// Original: [topic-0: 0 -> 100000]
// Split into: 
//   [topic-0: 0 -> 10000]
//   [topic-0: 10000 -> 20000]  
//   [topic-0: 20000 -> 30000]
//   ... (10 total ranges)

Performance Optimization

Consumer Configuration

// Optimize Kafka consumer for batch processing
val optimizedBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-dataset")
  .option("kafka.fetch.min.bytes", "1048576")      // 1MB minimum fetch
  .option("kafka.fetch.max.wait.ms", "500")        // 500ms max wait
  .option("kafka.max.poll.records", "50000")       // Large batch sizes
  .option("kafka.receive.buffer.bytes", "1048576") // 1MB receive buffer
  .load()

Parallel Processing

// Maximize parallelism for large datasets
val highParallelismBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "big-data-topic")
  .option("minPartitions", "200")  // Force high parallelism
  .load()
  .repartition(400)  // Further increase parallelism for processing

Caching Strategy

// Cache frequently accessed batch data
val cachedBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "reference-data")
  .load()
  .select(
    col("topic"),
    expr("CAST(key AS STRING)").as("key"),
    expr("CAST(value AS STRING)").as("value"),
    col("timestamp")
  )
  .cache()  // Cache in memory for multiple operations

// Use cached data multiple times
val summary = cachedBatch.groupBy("topic").count()
val sample = cachedBatch.sample(0.1)

Data Processing Patterns

Time Window Analysis

// Analyze data by time windows
val timeWindowAnalysis = kafkaBatch
  .select(
    col("topic"),
    col("timestamp"),
    expr("CAST(value AS STRING)").as("message")
  )
  .withColumn("hour", hour(col("timestamp")))
  .withColumn("day", date(col("timestamp")))
  .groupBy("topic", "day", "hour")
  .agg(
    count("*").as("message_count"),
    countDistinct("message").as("unique_messages")
  )
  .orderBy("topic", "day", "hour")

Cross-Topic Analysis

// Analyze data across multiple topics
val crossTopicBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events,logs,metrics")
  .load()

val topicComparison = crossTopicBatch
  .groupBy("topic", date(col("timestamp")).as("date"))
  .agg(
    count("*").as("message_count"),
    avg(length(col("value"))).as("avg_message_size"),
    min("timestamp").as("first_message"),
    max("timestamp").as("last_message")
  )

Data Quality Analysis

// Analyze data quality across partitions
val qualityAnalysis = kafkaBatch
  .select(
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp"),
    when(col("key").isNull, 1).otherwise(0).as("null_key"),
    when(col("value").isNull, 1).otherwise(0).as("null_value"),
    length(col("value")).as("value_size")
  )
  .groupBy("topic", "partition")
  .agg(
    count("*").as("total_messages"),
    sum("null_key").as("null_keys"),
    sum("null_value").as("null_values"),
    avg("value_size").as("avg_value_size"),
    min("offset").as("min_offset"),
    max("offset").as("max_offset")
  )

Error Handling

Data Loss Detection

// Enable data loss detection for critical batch processing
val strictBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-data")
  .option("failOnDataLoss", "true")  // Fail if data is missing
  .load()

Timeout Configuration

// Configure timeouts for reliable batch processing
val timeoutConfiguredBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("kafkaConsumer.pollTimeoutMs", "30000")    // 30 second poll timeout
  .option("fetchOffset.numRetries", "10")           // 10 retry attempts  
  .option("fetchOffset.retryIntervalMs", "5000")    // 5 second retry interval
  .load()

Partial Processing

// Process data even if some partitions are unavailable
val resilientBatch = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("failOnDataLoss", "false")  // Continue despite missing data
  .load()
  .filter(col("value").isNotNull)     // Filter out null values

Monitoring and Metrics

Processing Metrics

// Monitor batch processing metrics
val batchJob = kafkaBatch.count()  // Trigger computation

// Access metrics through Spark UI or programmatically
val executorMetrics = spark.sparkContext.statusTracker.getExecutorInfos
executorMetrics.foreach { executor =>
  println(s"Executor ${executor.executorId}: ${executor.totalCores} cores")
}

Custom Metrics Collection

// Collect custom metrics during batch processing  
val metricsCollector = kafkaBatch
  .select(
    col("topic"),
    col("partition"),
    col("timestamp"),
    length(col("value")).as("message_size")
  )
  .groupBy("topic", "partition")
  .agg(
    count("*").as("message_count"),
    sum("message_size").as("total_bytes"),
    avg("message_size").as("avg_message_size"),
    min("timestamp").as("earliest_timestamp"),
    max("timestamp").as("latest_timestamp")
  )

// Write metrics for monitoring
metricsCollector
  .write
  .format("delta")
  .mode("overwrite")
  .save("/metrics/kafka_batch_processing")