0
# Initial Position Configuration
1
2
Configure where to start reading from Kinesis streams with support for latest records, earliest available records, or starting from a specific timestamp.
3
4
## Initial Position Types
5
6
```scala { .api }
7
package org.apache.spark.streaming.kinesis
8
9
trait KinesisInitialPosition {
10
def getPosition(): InitialPositionInStream
11
}
12
13
object KinesisInitialPositions {
14
class Latest() extends KinesisInitialPosition
15
class TrimHorizon() extends KinesisInitialPosition
16
class AtTimestamp(timestamp: Date) extends KinesisInitialPosition
17
18
def fromKinesisInitialPosition(
19
initialPositionInStream: InitialPositionInStream
20
): KinesisInitialPosition
21
}
22
```
23
24
## Latest Position
25
26
Start reading from the latest (most recent) records in the stream. This is the default behavior.
27
28
```scala
29
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
30
31
val stream = KinesisInputDStream.builder
32
.streamingContext(ssc)
33
.streamName("my-stream")
34
.checkpointAppName("my-app")
35
.initialPosition(new Latest())
36
.build()
37
```
38
39
**Use Case:** When you only want to process new data that arrives after the application starts.
40
41
## Trim Horizon Position
42
43
Start reading from the earliest available records in the stream (within the retention period, typically 24 hours).
44
45
```scala
46
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon
47
48
val stream = KinesisInputDStream.builder
49
.streamingContext(ssc)
50
.streamName("my-stream")
51
.checkpointAppName("my-app")
52
.initialPosition(new TrimHorizon())
53
.build()
54
```
55
56
**Use Case:** When you want to process all available historical data in the stream.
57
58
## At Timestamp Position
59
60
Start reading from records at or after a specific timestamp.
61
62
```scala
63
import java.util.Date
64
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
65
66
// Start from a specific time
67
val startTime = new Date(System.currentTimeMillis() - 3600000) // 1 hour ago
68
69
val stream = KinesisInputDStream.builder
70
.streamingContext(ssc)
71
.streamName("my-stream")
72
.checkpointAppName("my-app")
73
.initialPosition(new AtTimestamp(startTime))
74
.build()
75
```
76
77
**Use Case:** When you want to replay data from a specific point in time.
78
79
## Java API Usage
80
81
```java
82
import java.util.Date;
83
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
84
85
// Latest position
86
KinesisInputDStream.builder()
87
.initialPosition(new KinesisInitialPositions.Latest())
88
.build();
89
90
// Trim horizon position
91
KinesisInputDStream.builder()
92
.initialPosition(new KinesisInitialPositions.TrimHorizon())
93
.build();
94
95
// At timestamp position
96
Date timestamp = new Date(System.currentTimeMillis() - 3600000);
97
KinesisInputDStream.builder()
98
.initialPosition(new KinesisInitialPositions.AtTimestamp(timestamp))
99
.build();
100
```
101
102
## Legacy API (Deprecated)
103
104
The legacy `initialPositionInStream` method is deprecated but still supported for backward compatibility.
105
106
```scala
107
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
108
109
// Deprecated - use initialPosition instead
110
val stream = KinesisInputDStream.builder
111
.streamingContext(ssc)
112
.streamName("my-stream")
113
.checkpointAppName("my-app")
114
.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated
115
.build()
116
```
117
118
## Conversion Utility
119
120
Convert from KCL's InitialPositionInStream enum to KinesisInitialPosition objects.
121
122
```scala
123
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
124
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
125
126
// Convert from KCL enum
127
val position = KinesisInitialPositions.fromKinesisInitialPosition(
128
InitialPositionInStream.TRIM_HORIZON
129
)
130
131
val stream = KinesisInputDStream.builder
132
.streamingContext(ssc)
133
.streamName("my-stream")
134
.checkpointAppName("my-app")
135
.initialPosition(position)
136
.build()
137
```
138
139
**Note:** The conversion utility only supports `LATEST` and `TRIM_HORIZON`. For `AT_TIMESTAMP`, use the `AtTimestamp` class directly.
140
141
## Checkpointing Behavior
142
143
Initial positions only apply when there are no existing checkpoints for the application:
144
145
- **No existing checkpoints**: Uses the specified initial position
146
- **Existing checkpoints**: Resumes from the last checkpointed position, ignoring the initial position setting
147
148
To force reading from a specific position, you must either:
149
1. Use a new checkpoint application name
150
2. Clear the existing DynamoDB checkpoint table
151
3. Wait for checkpoints to expire (based on DynamoDB TTL settings)
152
153
## Best Practices
154
155
### Latest Position
156
- Use for real-time processing of new events
157
- Minimizes startup time and resource usage
158
- Good for monitoring and alerting use cases
159
160
### Trim Horizon Position
161
- Use for batch processing of historical data
162
- Ensures no data loss when reprocessing
163
- May require more time and resources to catch up
164
165
### At Timestamp Position
166
- Use for precise replay scenarios
167
- Useful for debugging or reprocessing specific time ranges
168
- Consider timezone handling when working with timestamps
169
170
### Error Handling
171
All initial position configurations will throw an exception if the position is invalid or if the timestamp is outside the stream's retention period.