Configure where to start reading from Kinesis streams with support for latest records, earliest available records, or starting from a specific timestamp.
package org.apache.spark.streaming.kinesis
trait KinesisInitialPosition {
def getPosition(): InitialPositionInStream
}
object KinesisInitialPositions {
class Latest() extends KinesisInitialPosition
class TrimHorizon() extends KinesisInitialPosition
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
def fromKinesisInitialPosition(
initialPositionInStream: InitialPositionInStream
): KinesisInitialPosition
}Start reading from the latest (most recent) records in the stream. This is the default behavior.
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(new Latest())
.build()Use Case: When you only want to process new data that arrives after the application starts.
Start reading from the earliest available records in the stream (within the retention period, typically 24 hours).
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(new TrimHorizon())
.build()Use Case: When you want to process all available historical data in the stream.
Start reading from records at or after a specific timestamp.
import java.util.Date
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
// Start from a specific time
val startTime = new Date(System.currentTimeMillis() - 3600000) // 1 hour ago
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(new AtTimestamp(startTime))
.build()Use Case: When you want to replay data from a specific point in time.
import java.util.Date;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
// Latest position
KinesisInputDStream.builder()
.initialPosition(new KinesisInitialPositions.Latest())
.build();
// Trim horizon position
KinesisInputDStream.builder()
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.build();
// At timestamp position
Date timestamp = new Date(System.currentTimeMillis() - 3600000);
KinesisInputDStream.builder()
.initialPosition(new KinesisInitialPositions.AtTimestamp(timestamp))
.build();The legacy initialPositionInStream method is deprecated but still supported for backward compatibility.
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
// Deprecated - use initialPosition instead
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated
.build()Convert from KCL's InitialPositionInStream enum to KinesisInitialPosition objects.
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
// Convert from KCL enum
val position = KinesisInitialPositions.fromKinesisInitialPosition(
InitialPositionInStream.TRIM_HORIZON
)
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(position)
.build()Note: The conversion utility only supports LATEST and TRIM_HORIZON. For AT_TIMESTAMP, use the AtTimestamp class directly.
Initial positions only apply when there are no existing checkpoints for the application:
To force reading from a specific position, you must either:
All initial position configurations will throw an exception if the position is invalid or if the timestamp is outside the stream's retention period.