or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md
tile.json

batch.mddocs/

Batch Operations

Batch operations enable reading historical data from Kafka topics for analysis, data migration, and batch processing workflows. Unlike streaming, batch reads have defined start and end boundaries.

Capabilities

Batch Reading

Read a specific range of data from Kafka topics for batch processing.

/**
 * Create a batch DataFrame from Kafka topics
 * Reads data between specified offset ranges
 */
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)  // Required: Kafka bootstrap servers
  .option("subscribe", topics: String)                 // Topic subscription (comma-separated)
  .option("startingOffsets", startOffsets: String)     // Starting position: "earliest" or JSON
  .option("endingOffsets", endOffsets: String)         // Ending position: "latest" or JSON
  .load(): DataFrame

Usage Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("KafkaBatch")
  .getOrCreate()

// Read all available data
val allData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events,logs")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Read specific offset ranges
val rangeData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .option("startingOffsets", """{"user-events":{"0":1000,"1":2000}}""")
  .option("endingOffsets", """{"user-events":{"0":5000,"1":6000}}""")
  .load()

Pattern-Based Batch Reading

Read from multiple topics matching a pattern for batch analysis.

/**
 * Batch read from topics matching a regex pattern
 * Useful for reading from topic families or time-partitioned topics
 */
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribepattern", pattern: String)    // Regex pattern for topic names
  .option("startingOffsets", startOffsets: String)
  .option("endingOffsets", endOffsets: String)
  .load(): DataFrame

Usage Examples:

// Read from all daily event topics
val dailyEvents = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribepattern", "events-2024-.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Read from environment-specific topics
val prodLogs = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "prod-kafka:9092")
  .option("subscribepattern", "prod-.*-logs")
  .option("startingOffsets", "earliest") 
  .option("endingOffsets", "latest")
  .load()

Partition Assignment for Batch

Assign specific partitions for batch reading with precise control.

/**
 * Assign specific Kafka partitions for batch reading
 * Provides exact control over data ranges and partitions
 */
spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("assign", partitionsJson: String)       // JSON specification of TopicPartitions
  .option("startingOffsets", startOffsets: String)
  .option("endingOffsets", endOffsets: String)
  .load(): DataFrame

Usage Examples:

// Read specific partitions for parallel processing
val partitionData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"events":[0,1,2],"metrics":[0,1]}""")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Read with partition-specific offset ranges
val preciseData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"logs":[0,1,2,3]}""")
  .option("startingOffsets", """{"logs":{"0":1000,"1":2000,"2":1500,"3":3000}}""")
  .option("endingOffsets", """{"logs":{"0":5000,"1":6000,"2":4500,"3":7000}}""")
  .load()

Timestamp-Based Batch Reading

Read data based on message timestamps for time-range analysis.

/**
 * Timestamp-based offset resolution for batch reads
 * Automatically finds offsets corresponding to specific timestamps
 */
// Global timestamp range
.option("startingTimestamp", startTime: String)     // Start timestamp (ms since epoch)
.option("endingTimestamp", endTime: String)         // End timestamp (ms since epoch)

// Per-partition timestamp specification
.option("startingOffsetsByTimestamp", startTimestamps: String) // JSON timestamps per partition
.option("endingOffsetsByTimestamp", endTimestamps: String)     // JSON timestamps per partition

Usage Examples:

import java.time.Instant

// Read data from last 24 hours
val yesterday = Instant.now().minusSeconds(86400).toEpochMilli.toString
val now = Instant.now().toEpochMilli.toString

val recentData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingTimestamp", yesterday)
  .option("endingTimestamp", now)
  .load()

// Read specific time ranges per partition
val timeRangeData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") 
  .option("subscribe", "metrics")
  .option("startingOffsetsByTimestamp", s"""{"metrics":{"0":$yesterday,"1":$yesterday}}""")
  .option("endingOffsetsByTimestamp", s"""{"metrics":{"0":$now,"1":$now}}""")
  .load()

Batch Performance Configuration

Optimize batch reading performance for large datasets.

/**
 * Performance configuration for batch operations
 */
.option("minPartitions", partitionCount: String)    // Minimum Spark partitions
.option("fetchOffset.numRetries", retries: String)  // Offset fetch retry count
.option("fetchOffset.retryIntervalMs", interval: String) // Retry interval in ms
.option("kafkaConsumer.pollTimeoutMs", timeout: String)  // Consumer poll timeout

Usage Examples:

// Optimize for large batch processing
val optimizedBatch = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("minPartitions", "100")              // Force higher parallelism
  .option("fetchOffset.numRetries", "5")       // Retry offset fetching
  .option("fetchOffset.retryIntervalMs", "200") // Wait 200ms between retries
  .load()

Data Validation and Quality

Validate and filter batch data for quality assurance.

/**
 * Data validation options for batch processing
 */
.option("failOnDataLoss", failBehavior: String)  // "true" or "false"
.option("includeHeaders", includeHeaders: String) // "true" or "false"

Usage Examples:

// Strict data validation
val validatedData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-data")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("failOnDataLoss", "true")        // Fail if any data is missing
  .option("includeHeaders", "true")        // Include headers for metadata
  .load()

// Permissive data reading
val permissiveData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .option("failOnDataLoss", "false")       // Continue despite gaps
  .load()

Data Processing Patterns

Historical Analysis

import org.apache.spark.sql.functions._

val historicalData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-events")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Analyze message patterns by hour
val hourlyStats = historicalData
  .select(
    hour(col("timestamp")).as("hour"),
    col("topic"),
    col("partition")
  )
  .groupBy("hour", "topic")
  .agg(
    count("*").as("message_count"),
    countDistinct("partition").as("active_partitions")
  )
  .orderBy("hour", "topic")

hourlyStats.show()

Data Migration

// Read from source Kafka cluster
val sourceData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "source-cluster:9092")
  .option("subscribe", "legacy-topic")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()

// Transform and write to destination
sourceData
  .select(
    col("value").cast("string").as("original_value"),
    col("timestamp"),
    col("partition").as("source_partition")
  )
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "dest-cluster:9092")
  .option("topic", "migrated-topic")
  .save()

Quality Assessment

// Assess data quality across time ranges
val qualityReport = historicalData
  .select(
    date_trunc("day", col("timestamp")).as("day"),
    col("topic"),
    when(col("key").isNull, 1).otherwise(0).as("null_keys"),
    when(col("value").isNull, 1).otherwise(0).as("null_values"),
    length(col("value")).as("value_size")
  )
  .groupBy("day", "topic")
  .agg(
    count("*").as("total_messages"),
    sum("null_keys").as("messages_with_null_keys"),
    sum("null_values").as("messages_with_null_values"),
    avg("value_size").as("avg_message_size"),
    max("value_size").as("max_message_size")
  )
  .orderBy("day", "topic")

Offset Analysis

// Analyze offset ranges and gaps
val offsetAnalysis = historicalData
  .groupBy("topic", "partition")
  .agg(
    min("offset").as("min_offset"),
    max("offset").as("max_offset"),
    count("*").as("message_count"),
    countDistinct("offset").as("unique_offsets")
  )
  .withColumn("expected_count", col("max_offset") - col("min_offset") + 1)
  .withColumn("has_gaps", col("unique_offsets") =!= col("expected_count"))
  .select(
    col("topic"),
    col("partition"),
    col("min_offset"),
    col("max_offset"),
    col("message_count"),
    col("has_gaps")
  )

Common Batch Use Cases

Data Archival

// Archive old Kafka data to long-term storage
val archiveData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "transaction-logs")
  .option("startingTimestamp", "1609459200000") // Start of 2021
  .option("endingTimestamp", "1640995200000")   // End of 2021
  .load()

// Save as Parquet with partitioning
archiveData
  .withColumn("year", year(col("timestamp")))
  .withColumn("month", month(col("timestamp")))
  .write
  .partitionBy("year", "month", "topic")
  .parquet("s3://archive-bucket/kafka-data/")

Replay Processing

// Replay specific time period for debugging
val replayData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-actions")
  .option("startingTimestamp", "1640995200000") // Specific incident time
  .option("endingTimestamp", "1640998800000")   // One hour later
  .load()

// Reprocess the data with updated logic
val reprocessed = replayData
  .select(col("value").cast("string").as("event"))
  .select(from_json(col("event"), updatedSchema).as("data"))
  .select("data.*")

Compliance and Auditing

// Extract data for compliance reporting
val complianceData = spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "user-data-changes")
  .option("startingTimestamp", quarterStartTime)
  .option("endingTimestamp", quarterEndTime)
  .option("includeHeaders", "true")
  .load()

// Generate audit trail
val auditTrail = complianceData
  .select(
    col("timestamp"),
    col("headers"),
    get_json_object(col("value").cast("string"), "$.userId").as("user_id"),
    get_json_object(col("value").cast("string"), "$.action").as("action"),
    get_json_object(col("value").cast("string"), "$.dataType").as("data_type")
  )
  .filter(col("data_type").isin("PII", "SENSITIVE"))