or run

tessl search
Log in

Version

Files

docs

batch-operations.mdconnection-configuration.mderror-handling-reliability.mdindex.mdoffset-management.mdschema-data-format.mdstreaming-operations.md
tile.json

batch-operations.mddocs/

Batch Operations

Batch reading and writing operations with flexible offset ranges and performance optimization for processing historical Kafka data.

Capabilities

Batch Reading

Read historical data from Kafka topics with precise offset control and range specifications.

Basic Batch Read

/**
 * Basic batch read from Kafka topics
 * Reads all available data between specified offset ranges
 */
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "earliest")    // Required for batch
  .option("endingOffsets", "latest")        // Optional, defaults to latest
  .load()

Usage Example:

// Read all historical data from multiple topics
val historicalDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "orders,payments,shipments")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Process historical data
val summaryDF = historicalDF
  .selectExpr("topic", "CAST(value AS STRING) as json_str")
  .select(col("topic"), from_json(col("json_str"), messageSchema).as("data"))
  .groupBy("topic", "data.date")
  .count()

summaryDF.show()

Offset Range Specification

Precise control over which messages to read using specific offsets.

/**
 * Specific offset ranges for batch processing
 * JSON format: {"topic1":{"partition1":startOffset,"partition2":startOffset}}
 */
.option("startingOffsets", """{"orders":{"0":1000,"1":2000},"payments":{"0":500}}""")
.option("endingOffsets", """{"orders":{"0":5000,"1":8000},"payments":{"0":2000}}""")

Usage Example:

// Read specific ranges for data reconciliation
val reconciliationDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transactions")
  .option("startingOffsets", """{"transactions":{"0":100000,"1":150000,"2":200000}}""")
  .option("endingOffsets", """{"transactions":{"0":101000,"1":151000,"2":201000}}""")
  .load()

// Verify data integrity for specific ranges
val integrityCheck = reconciliationDF
  .select(
    col("partition"),
    col("offset"),
    expr("CAST(value AS STRING)").as("message")
  )
  .filter("message RLIKE 'ERROR|FAILURE'")

integrityCheck.show()

Time-based Batch Reading

Read data based on timestamp ranges rather than specific offsets.

/**
 * Time-based offset specification for batch reads
 * Timestamps in milliseconds since Unix epoch
 */
.option("startingOffsetsByTimestamp", """{"topic1":{"0":1640995200000,"1":1640995200000}}""")
.option("endingOffsetsByTimestamp", """{"topic1":{"0":1640995300000,"1":1640995300000}}""")

Usage Example:

// Read data for specific time window (e.g., Jan 1, 2022 to Jan 2, 2022)
val timeWindowDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsetsByTimestamp", """{"events":{"0":1640995200000,"1":1640995200000}}""")  // Jan 1, 2022 00:00:00
  .option("endingOffsetsByTimestamp", """{"events":{"0":1641081600000,"1":1641081600000}}""")    // Jan 2, 2022 00:00:00
  .load()

// Analyze daily patterns
val dailyPatterns = timeWindowDF
  .selectExpr(
    "hour(timestamp) as hour",
    "CAST(value AS STRING) as message"
  )
  .groupBy("hour")
  .count()
  .orderBy("hour")

dailyPatterns.show(24)

Batch Writing

Write DataFrames to Kafka topics in batch mode with various output options.

Basic Batch Write

/**
 * Write DataFrame to Kafka in batch mode
 * DataFrame must contain 'value' column (binary or string)
 * Optional: 'key', 'topic', 'partition', 'headers' columns
 */
dataFrame.write
  .format("kafka") 
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "output-topic")                    // Default topic
  .save()

Usage Example:

// Process batch data and write results
val processedDF = spark
  .read
  .format("parquet")
  .load("/path/to/processed/data")
  .select(
    col("user_id").cast("string").as("key"),
    to_json(struct("event_type", "timestamp", "data")).cast("binary").as("value")
  )

processedDF.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-events")
  .save()

Dynamic Topic Routing

Route different records to different topics based on data content.

/**
 * Dynamic topic assignment per record
 * Use 'topic' column to specify destination topic for each record
 */
dataFrame
  .select(
    expr("CASE WHEN priority = 'high' THEN 'urgent-events' ELSE 'normal-events' END").as("topic"),
    expr("CAST(message AS BINARY)").as("value")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()  // No default topic needed when topic column is present

Usage Example:

// Route events to different topics based on type
val routedEvents = spark
  .read
  .format("json")
  .load("/path/to/events")
  .select(
    expr("CASE event_type " +
          "WHEN 'user_action' THEN 'analytics-events' " +
          "WHEN 'system_error' THEN 'error-events' " +
          "WHEN 'performance' THEN 'metrics-events' " +
          "ELSE 'general-events' END").as("topic"),
    col("user_id").cast("binary").as("key"),
    to_json(struct("*")).cast("binary").as("value")
  )

routedEvents.write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .save()

Explicit Partitioning

Control which Kafka partition receives each record.

/**
 * Explicit partition assignment
 * Use 'partition' column to specify target partition (integer)
 */
dataFrame
  .select(
    expr("hash(user_id) % 10").as("partition"),  // Distribute across 10 partitions
    col("user_id").cast("binary").as("key"),
    expr("CAST(message AS BINARY)").as("value")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "partitioned-events")
  .save()

Batch Performance Optimization

Configuration options for optimizing batch read and write performance.

Read Optimization

/**
 * Performance tuning for batch reads
 */
.option("minPartitions", "20")                      // Increase Spark parallelism
.option("kafka.fetch.min.bytes", "1048576")        // 1MB minimum fetch size
.option("kafka.max.poll.records", "10000")         // More records per poll
.option("kafka.receive.buffer.bytes", "1048576")   // 1MB receive buffer

Usage Example:

// Optimized batch read for large datasets
val largeBatchDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("minPartitions", "50")                     // High parallelism
  .option("kafka.fetch.min.bytes", "2097152")       // 2MB fetch size
  .option("kafka.max.poll.records", "50000")        // Large poll size
  .load()

// Efficient processing with proper partitioning
val result = largeBatchDF
  .repartition(200, col("topic"), col("partition"))  // Redistribute for processing
  .selectExpr("CAST(value AS STRING) as json_str")
  .select(from_json(col("json_str"), schema).as("data"))
  .select("data.*")
  .filter("status = 'active'")
  .cache()  // Cache for multiple actions

result.write.mode("overwrite").parquet("/path/to/output")

Write Optimization

/**
 * Performance tuning for batch writes
 */
.option("kafka.batch.size", "65536")                // 64KB batch size
.option("kafka.linger.ms", "100")                   // Wait 100ms for batching
.option("kafka.compression.type", "snappy")         // Enable compression
.option("kafka.acks", "1")                          // Faster acknowledgment
.option("kafka.buffer.memory", "134217728")         // 128MB buffer

Usage Example:

// High-throughput batch write
val highThroughputWrite = processedDF
  .coalesce(10)  // Reduce number of tasks for efficiency
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "high-volume-output")
  .option("kafka.batch.size", "131072")              // 128KB batches
  .option("kafka.linger.ms", "200")                  // Wait for full batches
  .option("kafka.compression.type", "lz4")           // Fast compression
  .option("kafka.acks", "1")                         // Balance speed/reliability
  .save()

Batch-Specific Validation

Validation and constraints specific to batch operations.

/**
 * Batch operation constraints and validations
 */
// Starting offsets cannot be "latest" for batch reads
// Ending offsets cannot be "earliest" for batch reads
// Both timestamp and offset specifications are supported
// maxOffsetsPerTrigger is ignored in batch mode (with warning)

Error Handling Examples:

// This will throw an error - invalid for batch
try {
  spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")  // ERROR: Cannot use "latest" for batch startingOffsets
    .load()
} catch {
  case e: IllegalArgumentException => 
    println("Cannot use 'latest' for batch starting offsets")
}

// This will throw an error - invalid for batch
try {
  spark.read
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "earliest")  // ERROR: Cannot use "earliest" for batch endingOffsets
    .load()
} catch {
  case e: IllegalArgumentException =>
    println("Cannot use 'earliest' for batch ending offsets")
}

Types

import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
import org.apache.spark.sql.types.Row

// Batch operation interfaces
trait DataFrameReader {
  def format(source: String): DataFrameReader     // "kafka"
  def option(key: String, value: String): DataFrameReader
  def load(): DataFrame
}

trait DataFrameWriter[T] {
  def format(source: String): DataFrameWriter[T]  // "kafka"  
  def option(key: String, value: String): DataFrameWriter[T]
  def save(): Unit
  def save(path: String): Unit  // Not used for Kafka
}

// Batch-specific implementations
class KafkaBatch {
  def planInputPartitions(): Array[InputPartition]
  def createReaderFactory(): PartitionReaderFactory
}

class KafkaBatchWrite {
  def createBatchWriterFactory(): DataWriterFactory
}