or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-reading.mdconsumer-strategies.mddata-source.mdindex.mdoffset-management.mdschema-conversion.mdstreaming-sources.mdwriting.md
tile.json

streaming-sources.mddocs/

Streaming Sources

The Kafka connector provides comprehensive streaming capabilities with both micro-batch and continuous processing modes, supporting various trigger types and providing detailed metrics.

Capabilities

KafkaMicroBatchStream

V2 DataSource micro-batch streaming implementation for Kafka with comprehensive trigger support.

/**
 * V2 DataSource micro-batch streaming implementation for Kafka
 * Provides reliable streaming with exactly-once semantics
 */
class KafkaMicroBatchStream extends MicroBatchStream 
    with SupportsTriggerAvailableNow 
    with ReportsSourceMetrics {
  
  /** Returns the initial offset for starting the stream */
  def initialOffset(): Offset
  
  /** Returns the latest available offset */
  def latestOffset(): Offset
  
  /** Returns latest offset with read limit applied */
  def latestOffset(startOffset: Offset, readLimit: ReadLimit): Offset
  
  /** Plans input partitions for the given offset range */
  def planInputPartitions(start: Offset, end: Offset): Array[InputPartition]
  
  /** Creates reader factory for reading partitions */
  def createReaderFactory(): PartitionReaderFactory
  
  /** Deserializes offset from checkpoint */
  def deserializeOffset(json: String): Offset
  
  /** Commits processed offset */
  def commit(end: Offset): Unit
  
  /** Stops the stream and releases resources */
  def stop(): Unit
  
  /** Returns stream metrics */
  def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String]
  
  /** Prepares stream for Trigger.AvailableNow mode */
  def prepareForTriggerAvailableNow(): Unit
}

Usage Examples:

// Basic micro-batch streaming
val microBatchStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

val query = microBatchStream
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime("30 seconds"))  // Micro-batch every 30 seconds
  .start()

// Trigger.AvailableNow for processing all available data
val availableNowQuery = microBatchStream
  .writeStream
  .outputMode("append") 
  .format("parquet")
  .option("path", "/output/path")
  .trigger(Trigger.AvailableNow())  // Process all available data then stop
  .start()

KafkaContinuousStream

V2 DataSource continuous streaming implementation for low-latency processing.

/**
 * V2 DataSource continuous streaming implementation
 * Provides low-latency processing with at-least-once semantics
 */
class KafkaContinuousStream extends ContinuousStream {
  
  /** Merges partition offsets into a single offset */
  def mergeOffsets(offsets: Array[PartitionOffset]): Offset
  
  /** Returns initial offset for continuous processing */
  def initialOffset(): Offset
  
  /** Deserializes offset from checkpoint */
  def deserializeOffset(json: String): Offset
  
  /** Plans continuous reader tasks */
  def planInputPartitions(start: Offset): Array[InputPartition]
  
  /** Creates continuous reader factory */
  def createContinuousReaderFactory(): ContinuousPartitionReaderFactory
  
  /** Commits processed offset */
  def commit(end: Offset): Unit
  
  /** Stops continuous processing */
  def stop(): Unit
}

Usage Examples:

// Continuous streaming for low-latency processing
val continuousStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "real-time-events")
  .load()

val continuousQuery = continuousStream
  .writeStream
  .outputMode("append")
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "processed-events")
  .trigger(Trigger.Continuous("1 second"))  // Continuous with 1-second checkpoint
  .start()

KafkaSource (Legacy)

Legacy V1 DataSource streaming implementation, maintained for backward compatibility.

/**
 * Legacy streaming source for reading from Kafka (DataSource V1)
 * Maintained for backward compatibility
 */
class KafkaSource extends Source with SupportsTriggerAvailableNow {
  
  /** Returns the schema of the source */
  def schema: StructType
  
  /** Gets the current offset */
  def getOffset: Option[Offset]
  
  /** Gets batch DataFrame for the given offset range */
  def getBatch(start: Option[Offset], end: Offset): DataFrame
  
  /** Stops the source */
  def stop(): Unit
  
  /** Returns default read limit */
  def getDefaultReadLimit: ReadLimit
  
  /** Gets latest offset with read limit */
  def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset
  
  /** Reports latest offset for monitoring */
  def reportLatestOffset(): streaming.Offset
  
  /** Prepares for Trigger.AvailableNow */
  def prepareForTriggerAvailableNow(): Unit
}

Trigger Support

Processing Time Triggers

Regular micro-batch processing at fixed intervals:

// Process every 30 seconds
.trigger(Trigger.ProcessingTime("30 seconds"))

// Process every 5 minutes
.trigger(Trigger.ProcessingTime("5 minutes"))

// Process as fast as possible
.trigger(Trigger.ProcessingTime("0 seconds"))

Available Now Trigger

Process all currently available data then stop:

// Process all available data once
val query = kafkaStream
  .writeStream
  .outputMode("append")
  .format("delta")
  .option("path", "/delta/table/path")
  .trigger(Trigger.AvailableNow())
  .start()

// Wait for completion
query.awaitTermination()

Continuous Triggers

Low-latency continuous processing:

// Continuous processing with 1-second checkpoints
.trigger(Trigger.Continuous("1 second"))

// Continuous processing with 10-second checkpoints
.trigger(Trigger.Continuous("10 seconds"))

Once Trigger

Process one micro-batch then stop:

// Process exactly one batch
.trigger(Trigger.Once())

Rate Limiting

Control the amount of data processed per trigger:

Max Offsets Per Trigger

// Limit to 1000 offsets per trigger across all partitions
val rateLimitedStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "high-volume-topic")
  .option("maxOffsetsPerTrigger", "1000")
  .load()

Min Offsets Per Trigger

// Ensure at least 100 offsets are processed per trigger
val minRateStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "low-volume-topic")
  .option("minOffsetsPerTrigger", "100")
  .load()

Max Trigger Delay

// Maximum delay between triggers when minOffsetsPerTrigger is not met
val delayedStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "variable-volume-topic")
  .option("minOffsetsPerTrigger", "500")
  .option("maxTriggerDelay", "60s")  // Wait max 60 seconds
  .load()

Metrics and Monitoring

Built-in Metrics

The streaming sources provide comprehensive metrics:

// Available metrics from ReportsSourceMetrics
def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String, String] = {
  // Returns metrics including:
  // - offsetOutOfRange: Number of out-of-range offsets
  // - dataLoss: Number of data loss events
  // - numRecords: Number of records processed
  // - estimatedOffsetsToProcess: Estimated remaining offsets
}

Usage Examples:

// Access metrics through streaming query
val query = kafkaStream
  .writeStream
  .outputMode("append")
  .format("console")
  .start()

// Get metrics
val progress = query.lastProgress
val inputMetrics = progress.sources(0)  // First source metrics
println(s"Records processed: ${inputMetrics.numInputRows}")
println(s"Processing rate: ${inputMetrics.inputRowsPerSecond}")

Custom Metrics

The connector exposes custom metrics for monitoring:

// OffsetOutOfRangeMetric
class OffsetOutOfRangeMetric extends CustomSumMetric {
  def name(): String = "offsetOutOfRange"
  def description(): String = "estimated number of fetched offsets out of range"
}

// DataLossMetric  
class DataLossMetric extends CustomSumMetric {
  def name(): String = "dataLoss"
  def description(): String = "number of data loss error"
}

Accessing Custom Metrics:

// Custom metrics are included in source metrics
val customMetrics = query.lastProgress.sources(0).metrics
val offsetOutOfRange = customMetrics.getOrElse("offsetOutOfRange", "0")
val dataLoss = customMetrics.getOrElse("dataLoss", "0")

Fault Tolerance

Checkpointing

Streaming queries automatically maintain checkpoints for fault tolerance:

val faultTolerantQuery = kafkaStream
  .writeStream
  .outputMode("append")
  .format("delta")
  .option("path", "/output/path")
  .option("checkpointLocation", "/checkpoint/path")  // Essential for production
  .start()

Data Loss Handling

Configure how to handle data loss scenarios:

// Fail query on data loss (default, recommended for critical data)
val strictStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "critical-events")
  .option("failOnDataLoss", "true")  // Default
  .load()

// Continue processing despite data loss (for non-critical data)
val lenientStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "logs")
  .option("failOnDataLoss", "false")  // Continue on data loss
  .load()

Partition Management

Min Partitions

Control the minimum number of Spark partitions:

// Ensure at least 10 Spark partitions for parallelism
val partitionedStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "large-topic")
  .option("minPartitions", "10")
  .load()

Consumer Pool Configuration

Configure consumer pool behavior:

// Configure consumer cache settings
val configuredStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("kafkaConsumer.pollTimeoutMs", "5000")        // Poll timeout
  .option("fetchOffset.numRetries", "5")               // Retry attempts
  .option("fetchOffset.retryIntervalMs", "1000")       // Retry interval
  .load()

Performance Optimization

Batch Size Optimization

// Balance latency vs throughput
val optimizedStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("maxOffsetsPerTrigger", "50000")  // Larger batches for higher throughput
  .load()
  .repartition(20)  // Increase parallelism for processing

Consumer Configuration

// Optimize Kafka consumer settings
val tuedStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("kafka.fetch.min.bytes", "1048576")          // 1MB min fetch
  .option("kafka.fetch.max.wait.ms", "500")            // 500ms max wait
  .option("kafka.max.poll.records", "10000")           // More records per poll
  .load()

Error Handling

Query Restart Behavior

// Automatic restart on failure
val resilientQuery = kafkaStream
  .writeStream
  .outputMode("append")
  .format("console")
  .option("checkpointLocation", "/checkpoint/path")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

// Monitor for exceptions
query.exception.foreach { ex =>
  println(s"Query failed with exception: ${ex.getMessage}")
  // Implement custom restart logic
}

Graceful Shutdown

// Graceful shutdown handling
sys.addShutdownHook {
  println("Shutting down streaming query...")
  query.stop()
  query.awaitTermination(30000)  // Wait up to 30 seconds
}