or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md
tile.json

streaming.mddocs/

Streaming Operations

Streaming operations allow real-time processing of Kafka topics using Spark Structured Streaming. The connector supports both micro-batch and continuous processing modes with exactly-once semantics.

Capabilities

Stream Reading

Create a streaming DataFrame from Kafka topics for real-time data processing.

/**
 * Create a streaming DataFrame from Kafka topics
 * Returns a DataFrame with the fixed Kafka schema
 */
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
  .option("subscribe", topics: String)                // Topic subscription (comma-separated)
  .option("startingOffsets", offsets: String)         // Starting position: "earliest", "latest", or JSON
  .load(): DataFrame

Usage Examples:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("KafkaStreaming")
  .getOrCreate()

// Basic streaming read
val kafkaStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events,logs,metrics")
  .option("startingOffsets", "latest")
  .load()

// With consumer group configuration
val configuredStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")
  .option("subscribe", "user-events")
  .option("startingOffsets", "earliest")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .load()

Pattern-Based Subscription

Subscribe to topics using regex patterns for dynamic topic discovery.

/**
 * Subscribe to topics matching a regex pattern
 * Automatically includes new topics that match the pattern
 */
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("subscribepattern", pattern: String) // Regex pattern for topic names
  .load(): DataFrame

Usage Examples:

// Subscribe to all topics starting with "events-"
val patternStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribepattern", "events-.*")
  .option("startingOffsets", "latest")
  .load()

// Subscribe to topics by environment
val envStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribepattern", s"${env}-.*") // prod-.*, dev-.*, etc.
  .load()

Partition Assignment

Directly assign specific Kafka partitions for fine-grained control.

/**
 * Assign specific Kafka partitions for reading
 * Provides exact control over which partitions to consume
 */
spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers: String)
  .option("assign", partitionsJson: String) // JSON specification of TopicPartitions
  .load(): DataFrame

Usage Examples:

// Assign specific partitions
val assignedStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"events":[0,1,2],"logs":[0]}""")
  .option("startingOffsets", "earliest")
  .load()

// Assign partitions with specific offsets
val offsetStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("assign", """{"events":[0,1],"logs":[0,1]}""")
  .option("startingOffsets", """{"events":{"0":100,"1":200},"logs":{"0":50,"1":75}}""")
  .load()

Offset Management

Control exactly where streaming starts and how offsets are managed.

/**
 * Offset specification options for streaming reads
 */
// Starting from specific positions
.option("startingOffsets", "earliest")  // Start from earliest available
.option("startingOffsets", "latest")    // Start from latest available
.option("startingOffsets", offsetJson)  // Start from specific offsets (JSON)

// Timestamp-based offset resolution
.option("startingTimestamp", timestamp: String)     // Global timestamp (ms since epoch)
.option("startingOffsetsByTimestamp", timestampJson) // Per-partition timestamps (JSON)

Usage Examples:

// Start from earliest available offsets
val earliestStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "earliest")
  .load()

// Start from specific timestamp
val timestampStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC
  .load()

// Start from specific offsets per partition
val specificStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
  .load()

Performance Tuning

Configure streaming performance and resource usage.

/**
 * Performance tuning options for streaming operations
 */
.option("maxOffsetsPerTrigger", maxRecords: String)   // Max records per micro-batch
.option("minOffsetsPerTrigger", minRecords: String)   // Min records before triggering
.option("maxTriggerDelay", delay: String)             // Max delay before triggering (e.g., "30s")

Usage Examples:

// Control batch sizes
val tunedStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("maxOffsetsPerTrigger", "10000")     // Max 10K records per batch
  .option("minOffsetsPerTrigger", "1000")      // Min 1K records before processing
  .option("maxTriggerDelay", "30s")            // Process every 30s regardless
  .load()

Reliability Configuration

Configure failure handling and data loss behavior.

/**
 * Reliability and failure handling options
 */
.option("failOnDataLoss", failBehavior: String)    // "true" or "false"
.option("groupIdPrefix", prefix: String)           // Consumer group ID prefix
.option("includeHeaders", includeHeaders: String)  // "true" or "false"

Usage Examples:

// Configure reliability
val reliableStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-events")
  .option("failOnDataLoss", "true")         // Fail if data is lost
  .option("groupIdPrefix", "my-app")        // Custom consumer group prefix
  .option("includeHeaders", "true")         // Include message headers in schema
  .load()

// Handle potential data loss gracefully
val gracefulStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("failOnDataLoss", "false")        // Continue on data loss
  .load()

Processing Modes

Choose between micro-batch and continuous processing modes.

/**
 * Stream processing with different execution modes
 */
// Micro-batch processing (default)
query.trigger(Trigger.ProcessingTime("10 seconds"))

// Continuous processing (experimental)
query.trigger(Trigger.Continuous("1 second"))

Usage Examples:

import org.apache.spark.sql.streaming.Trigger

val stream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

// Micro-batch processing every 30 seconds
val microBatchQuery = stream
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

// Continuous processing (low-latency)
val continuousQuery = stream
  .writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Data Processing Patterns

Message Deserialization

import org.apache.spark.sql.functions._

// Extract and cast message values
val messages = kafkaStream
  .select(
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp"),
    col("key").cast("string").as("messageKey"),
    col("value").cast("string").as("messageValue")
  )

// Parse JSON messages
val jsonMessages = kafkaStream
  .select(
    from_json(col("value").cast("string"), schema).as("data"),
    col("topic"),
    col("timestamp")
  )
  .select("data.*", "topic", "timestamp")

Windowed Aggregations

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.functions._

// Windowed count by topic
val windowedCounts = kafkaStream
  .groupBy(
    window(col("timestamp"), "10 minutes", "5 minutes"),
    col("topic")
  )
  .count()
  .writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

Stateful Processing

// Maintain state across micro-batches
val statefulStream = kafkaStream
  .groupByKey(_.topic)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(
    updateFunction
  )

Error Handling

Common error scenarios and handling strategies:

// Handle deserialization errors
val safeMessages = kafkaStream
  .select(
    col("topic"),
    col("offset"),
    when(col("value").isNotNull, 
         col("value").cast("string")).as("messageValue")
  )
  .filter(col("messageValue").isNotNull)

// Monitor for data loss
kafkaStream.writeStream
  .option("checkpointLocation", "/path/to/checkpoint")
  .foreachBatch { (batchDF, batchId) =>
    // Custom batch processing with error handling
    try {
      batchDF.show()
    } catch {
      case ex: Exception =>
        println(s"Error processing batch $batchId: ${ex.getMessage}")
    }
  }
  .start()