0
# Initial Position Configuration
1
2
The initial position system controls where Spark Streaming starts reading from a Kinesis stream when no checkpoint data exists. This is crucial for determining whether to process historical data, start with new data, or begin from a specific point in time.
3
4
## Core API
5
6
### KinesisInitialPosition Interface
7
8
The base interface for initial position configurations.
9
10
```java { .api }
11
interface KinesisInitialPosition {
12
InitialPositionInStream getPosition();
13
}
14
```
15
16
### Position Implementations
17
18
```java { .api }
19
public class KinesisInitialPositions {
20
// Start reading from the latest records (most recent)
21
public static class Latest implements KinesisInitialPosition, Serializable {
22
public Latest();
23
public InitialPositionInStream getPosition();
24
}
25
26
// Start reading from the oldest available records (up to 24 hours retention)
27
public static class TrimHorizon implements KinesisInitialPosition, Serializable {
28
public TrimHorizon();
29
public InitialPositionInStream getPosition();
30
}
31
32
// Start reading from a specific timestamp
33
public static class AtTimestamp implements KinesisInitialPosition, Serializable {
34
public AtTimestamp(Date timestamp);
35
public InitialPositionInStream getPosition();
36
public Date getTimestamp();
37
}
38
39
// Utility method for backward compatibility (deprecated usage)
40
public static KinesisInitialPosition fromKinesisInitialPosition(
41
InitialPositionInStream initialPosition
42
) throws UnsupportedOperationException;
43
}
44
```
45
46
## Latest Position
47
48
Starts consuming from the most recent records in the stream. This is the default behavior and is ideal for real-time processing where you only care about new data.
49
50
```scala
51
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
52
53
val stream = KinesisInputDStream.builder
54
.streamingContext(ssc)
55
.streamName("my-stream")
56
.checkpointAppName("my-app")
57
.initialPosition(new KinesisInitialPositions.Latest())
58
.build()
59
```
60
61
**Use cases:**
62
- Real-time analytics where historical data is not needed
63
- Event-driven applications that only process new events
64
- High-throughput streams where catching up on old data would be overwhelming
65
66
## Trim Horizon Position
67
68
Starts consuming from the oldest available records in the stream (within Kinesis retention period, up to 24 hours by default).
69
70
```scala
71
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
72
73
val stream = KinesisInputDStream.builder
74
.streamingContext(ssc)
75
.streamName("my-stream")
76
.checkpointAppName("my-app")
77
.initialPosition(new KinesisInitialPositions.TrimHorizon())
78
.build()
79
```
80
81
**Use cases:**
82
- Batch processing applications that need to process all available data
83
- Data migration or backfill scenarios
84
- Applications that cannot afford to miss any data
85
86
## At Timestamp Position
87
88
Starts consuming from records at or after a specific timestamp. This provides precise control over the starting point.
89
90
```scala
91
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
92
import java.util.Date
93
94
// Start from records after a specific time
95
val startTime = new Date(System.currentTimeMillis() - (2 * 60 * 60 * 1000)) // 2 hours ago
96
val timestamp = new KinesisInitialPositions.AtTimestamp(startTime)
97
98
val stream = KinesisInputDStream.builder
99
.streamingContext(ssc)
100
.streamName("my-stream")
101
.checkpointAppName("my-app")
102
.initialPosition(timestamp)
103
.build()
104
```
105
106
**Use cases:**
107
- Reprocessing data from a specific point in time
108
- Recovery scenarios where you know when issues occurred
109
- Testing with historical data from a known timeframe
110
111
## Deprecated API (Backward Compatibility)
112
113
For backward compatibility, the deprecated `initialPositionInStream` method is still available:
114
115
```scala
116
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
117
118
val stream = KinesisInputDStream.builder
119
.streamingContext(ssc)
120
.streamName("my-stream")
121
.checkpointAppName("my-app")
122
.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated in 2.3.0
123
.build()
124
```
125
126
**Note**: This method only supports `LATEST` and `TRIM_HORIZON`. For timestamp-based positioning, use the new `initialPosition` method.
127
128
## Important Behavior Notes
129
130
### Checkpointing Takes Precedence
131
132
Initial position is only used when **no checkpoint data exists**. If your application has previously checkpointed progress, it will resume from the checkpointed position regardless of the initial position setting.
133
134
```scala
135
// First run: starts from TRIM_HORIZON
136
// Subsequent runs: resumes from last checkpoint, ignoring TRIM_HORIZON
137
val stream = KinesisInputDStream.builder
138
.streamingContext(ssc)
139
.streamName("my-stream")
140
.checkpointAppName("existing-app") // Has checkpoint data
141
.initialPosition(new KinesisInitialPositions.TrimHorizon())
142
.build()
143
```
144
145
To force a restart from the initial position, you must:
146
1. Delete the DynamoDB checkpoint table, or
147
2. Use a different `checkpointAppName`
148
149
### Kinesis Retention Limits
150
151
Kinesis streams have a retention period (default 24 hours, configurable up to 365 days). Records older than the retention period are not available.
152
153
```scala
154
// This will start from the earliest available record within retention
155
val stream = KinesisInputDStream.builder
156
.initialPosition(new KinesisInitialPositions.TrimHorizon())
157
.build()
158
159
// This may not find records if timestamp is outside retention period
160
val oldTimestamp = new Date(System.currentTimeMillis() - (48 * 60 * 60 * 1000)) // 48 hours ago
161
val stream2 = KinesisInputDStream.builder
162
.initialPosition(new KinesisInitialPositions.AtTimestamp(oldTimestamp))
163
.build()
164
```
165
166
## Complete Examples
167
168
### Real-time Processing
169
170
```scala
171
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
172
173
// Only process new data as it arrives
174
val realtimeStream = KinesisInputDStream.builder
175
.streamingContext(ssc)
176
.streamName("real-time-events")
177
.checkpointAppName("realtime-processor")
178
.initialPosition(new KinesisInitialPositions.Latest())
179
.build()
180
181
realtimeStream.foreachRDD { rdd =>
182
println(s"Processing ${rdd.count()} new records")
183
// Process only recent data
184
}
185
```
186
187
### Historical Data Processing
188
189
```scala
190
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
191
192
// Process all available historical data
193
val batchStream = KinesisInputDStream.builder
194
.streamingContext(ssc)
195
.streamName("historical-data")
196
.checkpointAppName("batch-processor")
197
.initialPosition(new KinesisInitialPositions.TrimHorizon())
198
.build()
199
200
batchStream.foreachRDD { rdd =>
201
println(s"Processing batch of ${rdd.count()} records")
202
// Process all available data
203
}
204
```
205
206
### Point-in-Time Recovery
207
208
```scala
209
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
210
import java.util.Date
211
import java.text.SimpleDateFormat
212
213
// Start processing from a specific incident time
214
val incidentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
215
.parse("2024-01-15 14:30:00")
216
217
val recoveryStream = KinesisInputDStream.builder
218
.streamingContext(ssc)
219
.streamName("application-logs")
220
.checkpointAppName("incident-recovery") // Use unique name to avoid existing checkpoints
221
.initialPosition(new KinesisInitialPositions.AtTimestamp(incidentTime))
222
.build()
223
224
recoveryStream.foreachRDD { rdd =>
225
// Reprocess data from the incident onwards
226
rdd.collect().foreach { record =>
227
println(s"Reprocessing: ${new String(record)}")
228
}
229
}
230
```
231
232
### Development and Testing
233
234
```scala
235
import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}
236
237
// For testing: start from a known point with limited data
238
val testStartTime = new Date(System.currentTimeMillis() - (30 * 60 * 1000)) // 30 minutes ago
239
240
val testStream = KinesisInputDStream.builder
241
.streamingContext(ssc)
242
.streamName("test-stream")
243
.checkpointAppName(s"test-${System.currentTimeMillis()}") // Unique name for each test
244
.initialPosition(new KinesisInitialPositions.AtTimestamp(testStartTime))
245
.build()
246
```
247
248
## Best Practices
249
250
1. **Use Latest for real-time**: Choose `Latest` for applications that only need new data
251
2. **Use TrimHorizon for completeness**: Choose `TrimHorizon` when you need all available data
252
3. **Use AtTimestamp for precision**: Choose `AtTimestamp` for point-in-time recovery or testing
253
4. **Consider checkpoint behavior**: Remember that checkpoints override initial position
254
5. **Plan for retention limits**: Ensure your timestamp is within the stream's retention period
255
6. **Test with different positions**: Verify your application works correctly with various starting points