The initial position system controls where Spark Streaming starts reading from a Kinesis stream when no checkpoint data exists. This is crucial for determining whether to process historical data, start with new data, or begin from a specific point in time.
The base interface for initial position configurations.
interface KinesisInitialPosition {
InitialPositionInStream getPosition();
}public class KinesisInitialPositions {
// Start reading from the latest records (most recent)
public static class Latest implements KinesisInitialPosition, Serializable {
public Latest();
public InitialPositionInStream getPosition();
}
// Start reading from the oldest available records (up to 24 hours retention)
public static class TrimHorizon implements KinesisInitialPosition, Serializable {
public TrimHorizon();
public InitialPositionInStream getPosition();
}
// Start reading from a specific timestamp
public static class AtTimestamp implements KinesisInitialPosition, Serializable {
public AtTimestamp(Date timestamp);
public InitialPositionInStream getPosition();
public Date getTimestamp();
}
// Utility method for backward compatibility (deprecated usage)
public static KinesisInitialPosition fromKinesisInitialPosition(
InitialPositionInStream initialPosition
) throws UnsupportedOperationException;
}Starts consuming from the most recent records in the stream. This is the default behavior and is ideal for real-time processing where you only care about new data.
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(new KinesisInitialPositions.Latest())
.build()Use cases:
Starts consuming from the oldest available records in the stream (within Kinesis retention period, up to 24 hours by default).
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.build()Use cases:
Starts consuming from records at or after a specific timestamp. This provides precise control over the starting point.
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import java.util.Date
// Start from records after a specific time
val startTime = new Date(System.currentTimeMillis() - (2 * 60 * 60 * 1000)) // 2 hours ago
val timestamp = new KinesisInitialPositions.AtTimestamp(startTime)
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPosition(timestamp)
.build()Use cases:
For backward compatibility, the deprecated initialPositionInStream method is still available:
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("my-app")
.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated in 2.3.0
.build()Note: This method only supports LATEST and TRIM_HORIZON. For timestamp-based positioning, use the new initialPosition method.
Initial position is only used when no checkpoint data exists. If your application has previously checkpointed progress, it will resume from the checkpointed position regardless of the initial position setting.
// First run: starts from TRIM_HORIZON
// Subsequent runs: resumes from last checkpoint, ignoring TRIM_HORIZON
val stream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("my-stream")
.checkpointAppName("existing-app") // Has checkpoint data
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.build()To force a restart from the initial position, you must:
checkpointAppNameKinesis streams have a retention period (default 24 hours, configurable up to 365 days). Records older than the retention period are not available.
// This will start from the earliest available record within retention
val stream = KinesisInputDStream.builder
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.build()
// This may not find records if timestamp is outside retention period
val oldTimestamp = new Date(System.currentTimeMillis() - (48 * 60 * 60 * 1000)) // 48 hours ago
val stream2 = KinesisInputDStream.builder
.initialPosition(new KinesisInitialPositions.AtTimestamp(oldTimestamp))
.build()import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
// Only process new data as it arrives
val realtimeStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("real-time-events")
.checkpointAppName("realtime-processor")
.initialPosition(new KinesisInitialPositions.Latest())
.build()
realtimeStream.foreachRDD { rdd =>
println(s"Processing ${rdd.count()} new records")
// Process only recent data
}import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
// Process all available historical data
val batchStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("historical-data")
.checkpointAppName("batch-processor")
.initialPosition(new KinesisInitialPositions.TrimHorizon())
.build()
batchStream.foreachRDD { rdd =>
println(s"Processing batch of ${rdd.count()} records")
// Process all available data
}import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
import java.util.Date
import java.text.SimpleDateFormat
// Start processing from a specific incident time
val incidentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.parse("2024-01-15 14:30:00")
val recoveryStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("application-logs")
.checkpointAppName("incident-recovery") // Use unique name to avoid existing checkpoints
.initialPosition(new KinesisInitialPositions.AtTimestamp(incidentTime))
.build()
recoveryStream.foreachRDD { rdd =>
// Reprocess data from the incident onwards
rdd.collect().foreach { record =>
println(s"Reprocessing: ${new String(record)}")
}
}import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
// For testing: start from a known point with limited data
val testStartTime = new Date(System.currentTimeMillis() - (30 * 60 * 1000)) // 30 minutes ago
val testStream = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName("test-stream")
.checkpointAppName(s"test-${System.currentTimeMillis()}") // Unique name for each test
.initialPosition(new KinesisInitialPositions.AtTimestamp(testStartTime))
.build()Latest for applications that only need new dataTrimHorizon when you need all available dataAtTimestamp for point-in-time recovery or testing