or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-processing.mdconfiguration.mdconsumer-strategies.mddata-writing.mdindex.mdoffset-management.mdstreaming-sources.md
tile.json

streaming-sources.mddocs/

Streaming Sources

Advanced streaming readers supporting both micro-batch and continuous processing modes with fault tolerance, exactly-once semantics, and efficient partition-based data consumption from Kafka.

Capabilities

Micro-batch Reader

Micro-batch reader for structured streaming that processes data in discrete batches with configurable trigger intervals.

/**
 * Micro-batch reader for Kafka data in structured streaming
 */
class KafkaMicroBatchReader extends MicroBatchReader with Logging {
  /**
   * Sets offset range for the current micro-batch
   * @param start Starting offset (None for initial batch)
   * @param end Ending offset for this batch
   */
  def setOffsetRange(start: Option[Offset], end: Offset): Unit
  
  /**
   * Plans input partitions for the current batch
   * @return List of InputPartition objects for parallel processing
   */
  def planInputPartitions(): ju.List[InputPartition[InternalRow]]
  
  /**
   * Gets the start offset for the current batch
   * @return Starting offset or null if none
   */
  def getStartOffset: Offset
  
  /**
   * Gets the end offset for the current batch
   * @return Ending offset for this batch
   */
  def getEndOffset: Offset
  
  /**
   * Deserializes offset from JSON string
   * @param json JSON representation of offset
   * @return Deserialized Offset object
   */
  def deserializeOffset(json: String): Offset
  
  /**
   * Returns schema for Kafka records
   * @return StructType defining record schema
   */
  def readSchema(): StructType
  
  /**
   * Commits the processed offset
   * @param end Offset to commit
   */
  def commit(end: Offset): Unit
  
  /**
   * Stops the reader and releases resources
   */
  def stop(): Unit
}

Micro-batch Input Partition

Input partition for micro-batch processing that handles a specific offset range.

/**
 * Input partition for micro-batch processing
 * @param offsetRange Range of offsets to process
 * @param executorKafkaParams Kafka parameters for executors
 * @param pollTimeoutMs Timeout for Kafka consumer polls
 * @param failOnDataLoss Whether to fail on data loss
 * @param reuseKafkaConsumer Whether to reuse consumer instances
 */
case class KafkaMicroBatchInputPartition(
  offsetRange: KafkaOffsetRange,
  executorKafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean,
  reuseKafkaConsumer: Boolean
) extends InputPartition[InternalRow] {
  /**
   * Gets preferred executor locations for data locality
   * @return Array of preferred executor hostnames
   */
  def preferredLocations(): Array[String]
  
  /**
   * Creates partition reader for this input partition
   * @return InputPartitionReader for processing records
   */
  def createPartitionReader(): InputPartitionReader[InternalRow]
}

Micro-batch Partition Reader

Partition reader for micro-batch processing that reads records from a specific offset range.

/**
 * Partition reader for micro-batch processing
 * @param offsetRange Range of offsets to read
 * @param executorKafkaParams Kafka parameters for this executor
 * @param pollTimeoutMs Consumer poll timeout
 * @param failOnDataLoss Whether to fail on data loss
 * @param reuseKafkaConsumer Whether to reuse consumer
 */
case class KafkaMicroBatchInputPartitionReader(
  offsetRange: KafkaOffsetRange,
  executorKafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean,
  reuseKafkaConsumer: Boolean
) extends InputPartitionReader[InternalRow] with Logging {
  /**
   * Advances to the next record
   * @return true if next record is available
   */
  def next(): Boolean
  
  /**
   * Gets the current record as UnsafeRow
   * @return Current record as UnsafeRow
   */
  def get(): UnsafeRow
  
  /**
   * Closes the reader and releases resources
   */
  def close(): Unit
}

Continuous Reader

Continuous reader for low-latency streaming with sub-second processing capabilities.

/**
 * Continuous reader for Kafka data in structured streaming
 */
class KafkaContinuousReader extends ContinuousReader with Logging {
  /**
   * Returns schema for Kafka records
   * @return StructType defining record schema
   */
  def readSchema: StructType
  
  /**
   * Sets starting offset for continuous processing
   * @param start Starting offset (None to start from beginning)
   */
  def setStartOffset(start: Option[Offset]): Unit
  
  /**
   * Gets the starting offset
   * @return Starting offset or null if none
   */
  def getStartOffset(): Offset
  
  /**
   * Deserializes offset from JSON string
   * @param json JSON representation of offset
   * @return Deserialized Offset object
   */
  def deserializeOffset(json: String): Offset
  
  /**
   * Plans input partitions for continuous processing
   * @return List of InputPartition objects
   */
  def planInputPartitions(): ju.List[InputPartition[InternalRow]]
  
  /**
   * Stops the reader and releases resources
   */
  def stop(): Unit
  
  /**
   * Commits processed offsets
   * @param end Offset to commit
   */
  def commit(end: Offset): Unit
  
  /**
   * Merges partition offsets into a single offset
   * @param offsets Array of partition offsets
   * @return Merged offset
   */
  def mergeOffsets(offsets: Array[PartitionOffset]): Offset
  
  /**
   * Checks if reader reconfiguration is needed
   * @return true if reconfiguration is required
   */
  def needsReconfiguration(): Boolean
}

Continuous Input Partition

Input partition for continuous processing that handles a specific topic partition.

/**
 * Input partition for continuous processing
 * @param topicPartition Kafka topic partition
 * @param startOffset Starting offset for processing
 * @param kafkaParams Kafka consumer parameters
 * @param pollTimeoutMs Consumer poll timeout
 * @param failOnDataLoss Whether to fail on data loss
 */
case class KafkaContinuousInputPartition(
  topicPartition: TopicPartition,
  startOffset: Long,
  kafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean
) extends ContinuousInputPartition[InternalRow] {
  /**
   * Creates continuous reader for this partition
   * @param offset Starting partition offset
   * @return InputPartitionReader for continuous processing
   */
  def createContinuousReader(offset: PartitionOffset): InputPartitionReader[InternalRow]
  
  /**
   * Creates partition reader for this partition
   * @return KafkaContinuousInputPartitionReader instance
   */
  def createPartitionReader(): KafkaContinuousInputPartitionReader
}

Continuous Partition Reader

Partition reader for continuous processing that provides low-latency record consumption.

/**
 * Partition reader for continuous processing
 * @param topicPartition Kafka topic partition to read from
 * @param startOffset Starting offset for reading
 * @param kafkaParams Kafka consumer parameters
 * @param pollTimeoutMs Consumer poll timeout
 * @param failOnDataLoss Whether to fail on data loss
 */
class KafkaContinuousInputPartitionReader(
  topicPartition: TopicPartition,
  startOffset: Long,
  kafkaParams: ju.Map[String, Object],
  pollTimeoutMs: Long,
  failOnDataLoss: Boolean
) extends ContinuousInputPartitionReader[InternalRow] {
  /**
   * Advances to the next record
   * @return true if next record is available
   */
  def next(): Boolean
  
  /**
   * Gets the current record as UnsafeRow
   * @return Current record as UnsafeRow
   */
  def get(): UnsafeRow
  
  /**
   * Gets the current partition offset
   * @return Current offset for this partition
   */
  def getOffset(): KafkaSourcePartitionOffset
  
  /**
   * Closes the reader and releases resources
   */
  def close(): Unit
}

Legacy Streaming Source

Legacy streaming source for backward compatibility with DataSource V1 API.

/**
 * Legacy streaming source for reading from Kafka (DataSource V1)
 */
class KafkaSource extends Source with Logging {
  /**
   * Returns the schema of Kafka records
   * @return StructType defining record schema
   */
  def schema: StructType
  
  /**
   * Gets the maximum available offset
   * @return Optional offset representing latest available data
   */
  def getOffset: Option[Offset]
  
  /**
   * Gets batch data between specified offsets
   * @param start Starting offset (None for initial batch)
   * @param end Ending offset for this batch
   * @return DataFrame containing batch data
   */
  def getBatch(start: Option[Offset], end: Offset): DataFrame
  
  /**
   * Stops the source and releases resources
   */
  def stop(): Unit
}

/**
 * Companion object for KafkaSource
 */
object KafkaSource {
  /**
   * Gets sorted list of executor IDs for locality optimization
   * @param sc SparkContext for accessing executor information
   * @return Array of executor IDs sorted for consistent assignment
   */
  def getSortedExecutorList(sc: SparkContext): Array[String]
}

Usage Examples

Micro-batch Streaming

import org.apache.spark.sql.streaming.Trigger
import java.util.concurrent.TimeUnit

val microBatchDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "latest")
  .option("maxOffsetsPerTrigger", "10000")
  .load()

val query = microBatchDF
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.ProcessingTime(30, TimeUnit.SECONDS))
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

query.awaitTermination()

Continuous Streaming

val continuousDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "low-latency-events")
  .option("startingOffsets", "latest")  
  .load()

val continuousQuery = continuousDF
  .selectExpr("CAST(value AS STRING) as json")
  .writeStream
  .outputMode("append")
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .option("checkpointLocation", "/tmp/continuous-checkpoint")
  .start()

continuousQuery.awaitTermination()

Legacy Source (DataSource V1)

val legacyDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .options(Map(
    "subscribe" -> "legacy-topic",
    "startingOffsets" -> "earliest"
  ))
  .load()

Performance Configuration

Consumer Tuning

.option("kafka.fetch.min.bytes", "1024")           // Minimum fetch size
.option("kafka.fetch.max.wait.ms", "500")          // Max wait for min bytes
.option("kafka.max.poll.records", "500")           // Records per poll
.option("kafka.session.timeout.ms", "30000")       // Session timeout
.option("kafka.heartbeat.interval.ms", "3000")     // Heartbeat interval

Partition Management

.option("minPartitions", "10")                     // Minimum Spark partitions
.option("maxOffsetsPerTrigger", "1000000")        // Rate limiting

Memory and Buffering

.option("kafka.receive.buffer.bytes", "65536")     // Receive buffer
.option("kafka.send.buffer.bytes", "131072")       // Send buffer
.option("kafka.fetch.max.bytes", "52428800")       // Max fetch size

Error Handling and Recovery

Data Loss Scenarios

The streaming sources handle various data loss scenarios:

// Strict error handling
.option("failOnDataLoss", "true")

// Lenient error handling with warnings
.option("failOnDataLoss", "false")

Timeout Configuration

// Consumer poll timeout
.option("kafkaConsumer.pollTimeoutMs", "10000")

// Connection timeout
.option("kafka.request.timeout.ms", "30000")

Fault Tolerance

// Checkpoint location for exactly-once processing
.option("checkpointLocation", "/path/to/checkpoint")

// Automatic retry configuration
.option("kafka.retry.backoff.ms", "100")
.option("kafka.reconnect.backoff.ms", "50")

Monitoring and Metrics

Progress Reporting

val query = df.writeStream
  .format("console")
  .start()

// Monitor progress
val progress = query.lastProgress
println(s"Input rows per second: ${progress.inputRowsPerSecond}")
println(s"Processing time: ${progress.durationMs}")

Offset Tracking

// Access current offsets
val currentOffsets = query.lastProgress.sources(0).endOffset
println(s"Current offsets: $currentOffsets")

Processing Modes Comparison

FeatureMicro-batchContinuousLegacy
Latency~100ms+~1ms~100ms+
ThroughputHighMediumHigh
Fault ToleranceFullFullFull
Exactly-onceYesYesYes
State ManagementFullLimitedFull
AggregationsAllSimpleAll
API VersionDataSource V2DataSource V2DataSource V1