or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

batch-processing.mddocs/

Batch Processing

Batch data access capabilities for reading historical data from Kafka topics with configurable offset ranges, supporting large-scale data processing and analytics workloads.

Capabilities

Kafka Relation

Batch relation for reading historical data from Kafka topics with precise offset control.

/**
 * Batch relation for reading from Kafka topics
 */
class KafkaRelation extends BaseRelation with TableScan with Logging {
  /**
   * Returns the SQL context
   * @return SQLContext for this relation
   */
  def sqlContext: SQLContext
  
  /**
   * Returns schema for Kafka records
   * @return StructType defining record schema
   */
  def schema: StructType
  
  /**
   * Builds scan RDD for batch processing
   * @return RDD[Row] containing Kafka records
   */
  def buildScan(): RDD[Row]
  
  /**
   * String representation of the relation
   * @return String describing this relation
   */
  def toString: String
}

Kafka Source RDD

RDD implementation for reading Kafka data based on offset ranges with partition-aware processing.

/**
 * RDD for reading Kafka data based on offset ranges
 */
class KafkaSourceRDD extends RDD[ConsumerRecord[Array[Byte], Array[Byte]]] {
  /**
   * Persistence not supported for KafkaSourceRDD
   * @param newLevel Storage level (ignored)
   * @return this RDD (logs error)
   */
  def persist(newLevel: StorageLevel): this.type
  
  /**
   * Gets RDD partitions based on offset ranges
   * @return Array of Partition objects
   */
  def getPartitions: Array[Partition]
  
  /**
   * Gets preferred executor locations for data locality
   * @param split Partition to get locations for
   * @return Sequence of preferred executor hostnames
   */
  def getPreferredLocations(split: Partition): Seq[String]
  
  /**
   * Computes partition data by reading from Kafka
   * @param thePart Partition to compute
   * @param context Task context for the computation
   * @return Iterator of Kafka ConsumerRecord objects
   */
  def compute(
    thePart: Partition, 
    context: TaskContext
  ): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
}

RDD Partition Types

Partition implementations for Kafka RDD processing.

/**
 * Offset range for one RDD partition
 * @param topicPartition Kafka topic partition
 * @param fromOffset Starting offset (inclusive)
 * @param untilOffset Ending offset (exclusive)
 * @param preferredLoc Preferred executor location
 */
case class KafkaSourceRDDOffsetRange(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long,
  preferredLoc: Option[String]
) {
  /** Gets topic name */
  def topic: String = topicPartition.topic()
  
  /** Gets partition number */
  def partition: Int = topicPartition.partition()
  
  /** Gets size of offset range */
  def size: Long = untilOffset - fromOffset
}

/**
 * RDD partition containing offset range
 * @param index Partition index
 * @param offsetRange Offset range for this partition
 */
case class KafkaSourceRDDPartition(
  index: Int,
  offsetRange: KafkaSourceRDDOffsetRange
) extends Partition

Usage Examples

Basic Batch Reading

val batchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

batchDF.show()

Reading Specific Offset Range

val historicalDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":1000,"1":2000}}""")
  .option("endingOffsets", """{"events":{"0":5000,"1":6000}}""")
  .load()

// Process historical data
val processedDF = historicalDF
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition", "offset", "timestamp")
  .filter($"timestamp" > "2023-01-01")

Multiple Topics Batch Processing

val multiTopicDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2,topic3")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Aggregate by topic
val topicStats = multiTopicDF
  .groupBy("topic")
  .agg(
    count("*").as("record_count"),
    min("timestamp").as("earliest_timestamp"),
    max("timestamp").as("latest_timestamp")
  )

topicStats.show()

Pattern-based Topic Reading

val patternDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "logs_.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Partition Assignment Reading

val assignedDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"important_topic":[0,1,2],"critical_topic":[0]}""")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

Performance Optimization

Partition Parallelism

val optimizedDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("minPartitions", "20")  // Increase parallelism
  .load()

Consumer Configuration

val tunedDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-throughput-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("kafka.fetch.max.bytes", "52428800")        // 50MB
  .option("kafka.max.poll.records", "1000")           // Records per poll
  .option("kafka.fetch.min.bytes", "1024")            // Min fetch size
  .option("kafka.fetch.max.wait.ms", "500")           // Max wait
  .load()

Memory Management

val memoryOptimizedDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "memory-intensive-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("kafka.receive.buffer.bytes", "262144")     // 256KB
  .option("kafka.send.buffer.bytes", "131072")        // 128KB
  .load()

Data Processing Patterns

Time-based Processing

import org.apache.spark.sql.functions._

val timeBasedDF = batchDF
  .withColumn("event_time", from_unixtime($"timestamp" / 1000))
  .withColumn("date", to_date($"event_time"))
  .filter($"date" >= "2023-01-01" && $"date" <= "2023-12-31")

// Group by date and count records
val dailyStats = timeBasedDF
  .groupBy("date", "topic")
  .agg(count("*").as("daily_count"))
  .orderBy("date", "topic")

Message Content Processing

import org.apache.spark.sql.functions._

val contentDF = batchDF
  .selectExpr("CAST(key AS STRING) as message_key", "CAST(value AS STRING) as message_value")
  .withColumn("json_data", from_json($"message_value", messageSchema))
  .select("message_key", "json_data.*", "topic", "partition", "offset")

Deduplication

val deduplicatedDF = batchDF
  .selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "*")
  .dropDuplicates("key")  // Deduplicate by key
  .orderBy("timestamp")   // Maintain order

Schema and Data Types

Kafka Record Schema

import org.apache.spark.sql.types._

val kafkaSchema = StructType(Seq(
  StructField("key", BinaryType, nullable = true),
  StructField("value", BinaryType, nullable = true),
  StructField("topic", StringType, nullable = false),
  StructField("partition", IntegerType, nullable = false),
  StructField("offset", LongType, nullable = false),
  StructField("timestamp", TimestampType, nullable = false),
  StructField("timestampType", IntegerType, nullable = false)
))

Type Conversions

val convertedDF = batchDF
  .selectExpr(
    "CAST(key AS STRING) as key_str",
    "CAST(value AS STRING) as value_str",
    "topic",
    "partition", 
    "offset",
    "timestamp",
    "CASE WHEN timestampType = 0 THEN 'CreateTime' ELSE 'LogAppendTime' END as timestamp_type"
  )

Error Handling

Offset Validation

try {
  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "topic")
    .option("startingOffsets", """{"topic":{"0":1000}}""")
    .option("endingOffsets", """{"topic":{"0":5000}}""")
    .load()
    
  df.count()
} catch {
  case e: IllegalArgumentException =>
    println(s"Invalid offset configuration: ${e.getMessage}")
  case e: Exception =>
    println(s"Error reading from Kafka: ${e.getMessage}")
}

Topic Existence Check

import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import scala.collection.JavaConverters._

def checkTopicExists(brokers: String, topic: String): Boolean = {
  val props = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers).asJava
  val adminClient = AdminClient.create(props)
  
  try {
    val topics = adminClient.listTopics().names().get()
    topics.contains(topic)
  } finally {
    adminClient.close()
  }
}

if (checkTopicExists("localhost:9092", "my-topic")) {
  val df = spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "my-topic")
    .load()
}

Integration with Spark SQL

Creating Temporary Views

batchDF.createOrReplaceTempView("kafka_messages")

val sqlResult = spark.sql("""
  SELECT 
    topic,
    partition,
    COUNT(*) as message_count,
    MIN(offset) as min_offset,
    MAX(offset) as max_offset,
    MIN(timestamp) as earliest_time,
    MAX(timestamp) as latest_time
  FROM kafka_messages
  GROUP BY topic, partition
  ORDER BY topic, partition
""")

sqlResult.show()

Complex Analytics

spark.sql("""
  SELECT 
    topic,
    DATE(timestamp) as date,
    HOUR(timestamp) as hour,
    COUNT(*) as hourly_count,
    AVG(LENGTH(CAST(value AS STRING))) as avg_message_size
  FROM kafka_messages
  WHERE timestamp >= '2023-01-01'
  GROUP BY topic, DATE(timestamp), HOUR(timestamp)
  ORDER BY date, hour
""").show()

Best Practices

Offset Range Planning

  1. Use earliest/latest for full scans:

    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
  2. Use specific offsets for incremental processing:

    .option("startingOffsets", s"""{"$topic":{"0":$lastProcessedOffset}}""")
  3. Monitor partition lag:

    val offsetInfo = batchDF
      .groupBy("topic", "partition")
      .agg(min("offset").as("min_offset"), max("offset").as("max_offset"))

Performance Guidelines

  1. Set appropriate minPartitions for large datasets
  2. Use partition assignment for known partition layouts
  3. Configure Kafka consumer buffers based on message size
  4. Cache DataFrames for multiple operations
  5. Use columnar formats for intermediate results

Resource Management

// Configure driver and executor memory appropriately
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.cores", "4")

// Optimize shuffle partitions for Kafka data
spark.conf.set("spark.sql.shuffle.partitions", "200")