0
# Stream Positioning
1
2
Configuration options for specifying where to start reading from Kinesis streams, supporting latest records, trim horizon, and timestamp-based positioning.
3
4
## Capabilities
5
6
### KinesisInitialPosition Interface
7
8
Base interface for all initial position types used to specify stream reading start points.
9
10
```java { .api }
11
/**
12
* Base interface for initial position specification in Kinesis streams
13
*/
14
interface KinesisInitialPosition {
15
/** Returns the corresponding KCL InitialPositionInStream enum value */
16
InitialPositionInStream getPosition();
17
}
18
```
19
20
### Latest Position
21
22
Starts reading from the latest (most recent) records in the stream, skipping all existing data.
23
24
```java { .api }
25
/**
26
* Start reading from the latest records in the stream
27
* Equivalent to InitialPositionInStream.LATEST
28
*/
29
public class Latest implements KinesisInitialPosition, Serializable {
30
public Latest();
31
32
@Override
33
public InitialPositionInStream getPosition();
34
}
35
```
36
37
**Usage Example:**
38
39
```scala
40
import org.apache.spark.streaming.kinesis.KinesisInputDStream
41
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
42
43
// Start from latest records (default behavior)
44
val stream = KinesisInputDStream.builder
45
.streamingContext(ssc)
46
.streamName("my-stream")
47
.checkpointAppName("my-app")
48
.initialPosition(new Latest())
49
.build()
50
```
51
52
### Trim Horizon Position
53
54
Starts reading from the trim horizon (oldest available records), processing all data within the retention period.
55
56
```java { .api }
57
/**
58
* Start reading from the trim horizon (oldest available records)
59
* Kinesis retains data for 24 hours to 7 days depending on configuration
60
* Equivalent to InitialPositionInStream.TRIM_HORIZON
61
*/
62
public class TrimHorizon implements KinesisInitialPosition, Serializable {
63
public TrimHorizon();
64
65
@Override
66
public InitialPositionInStream getPosition();
67
}
68
```
69
70
**Usage Example:**
71
72
```scala
73
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon
74
75
// Start from oldest available records
76
val stream = KinesisInputDStream.builder
77
.streamingContext(ssc)
78
.streamName("my-stream")
79
.checkpointAppName("my-app")
80
.initialPosition(new TrimHorizon())
81
.build()
82
```
83
84
### Timestamp Position
85
86
Starts reading from records at or after a specific timestamp, useful for reprocessing data from a known point in time.
87
88
```java { .api }
89
/**
90
* Start reading from records at or after the specified timestamp
91
* Equivalent to InitialPositionInStream.AT_TIMESTAMP
92
*/
93
public class AtTimestamp implements KinesisInitialPosition, Serializable {
94
/**
95
* Create timestamp-based position
96
* @param timestamp The timestamp to start reading from
97
*/
98
public AtTimestamp(java.util.Date timestamp);
99
100
@Override
101
public InitialPositionInStream getPosition();
102
103
/**
104
* Get the configured timestamp
105
* @return The timestamp for this position
106
*/
107
public java.util.Date getTimestamp();
108
}
109
```
110
111
**Usage Examples:**
112
113
```scala
114
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
115
import java.util.Date
116
import java.time.Instant
117
118
// Start from a specific timestamp
119
val startTime = Date.from(Instant.parse("2024-01-01T12:00:00Z"))
120
val timestampPosition = new AtTimestamp(startTime)
121
122
val stream = KinesisInputDStream.builder
123
.streamingContext(ssc)
124
.streamName("my-stream")
125
.checkpointAppName("my-app")
126
.initialPosition(timestampPosition)
127
.build()
128
129
// Start from 1 hour ago
130
val oneHourAgo = new Date(System.currentTimeMillis() - 3600000)
131
val recentPosition = new AtTimestamp(oneHourAgo)
132
133
// Reprocess data from yesterday
134
val yesterday = Date.from(Instant.now().minusSeconds(86400))
135
val reprocessPosition = new AtTimestamp(yesterday)
136
```
137
138
### Legacy Position Conversion
139
140
Factory method for converting legacy KCL enum values to new position objects (used internally).
141
142
```java { .api }
143
public class KinesisInitialPositions {
144
/**
145
* Converts legacy InitialPositionInStream enum to KinesisInitialPosition
146
* Used internally for backward compatibility with deprecated APIs
147
* @param initialPositionInStream Legacy enum value
148
* @return Corresponding KinesisInitialPosition instance
149
* @throws UnsupportedOperationException for AT_TIMESTAMP (use AtTimestamp constructor instead)
150
*/
151
public static KinesisInitialPosition fromKinesisInitialPosition(
152
InitialPositionInStream initialPositionInStream
153
) throws UnsupportedOperationException;
154
}
155
```
156
157
**Usage Example (Internal):**
158
159
```scala
160
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
161
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
162
163
// Convert legacy enum (used internally by deprecated APIs)
164
val legacyLatest = InitialPositionInStream.LATEST
165
val position = KinesisInitialPositions.fromKinesisInitialPosition(legacyLatest)
166
// Returns new Latest() instance
167
168
// AT_TIMESTAMP not supported in conversion - throws UnsupportedOperationException
169
val legacyTimestamp = InitialPositionInStream.AT_TIMESTAMP
170
// KinesisInitialPositions.fromKinesisInitialPosition(legacyTimestamp) // Throws exception
171
```
172
173
### Position Selection Guidelines
174
175
**Use Latest when:**
176
- Processing real-time data only
177
- Not concerned with historical data
178
- Starting a new application for the first time
179
- Processing high-volume streams where catching up is not feasible
180
181
**Use TrimHorizon when:**
182
- Need to process all available historical data
183
- Implementing data migration or backfill scenarios
184
- Ensuring no data loss during application restarts
185
- Starting a new consumer that needs complete data
186
187
**Use AtTimestamp when:**
188
- Reprocessing data from a specific failure point
189
- Implementing data replay scenarios
190
- Starting consumption from a known business event time
191
- Debugging issues that occurred at specific times
192
193
### Checkpointing Behavior
194
195
Initial positions only apply when **no checkpoint data exists** in DynamoDB:
196
197
```scala
198
// First run: Uses TrimHorizon, starts from oldest records
199
val stream = KinesisInputDStream.builder
200
.streamingContext(ssc)
201
.streamName("my-stream")
202
.checkpointAppName("my-app") // No DynamoDB table exists yet
203
.initialPosition(new TrimHorizon())
204
.build()
205
206
// Subsequent runs: Ignores TrimHorizon, resumes from last checkpoint
207
// The same configuration will continue from where it left off
208
```
209
210
### Error Handling
211
212
Position configuration can fail with:
213
214
- `IllegalArgumentException` - For null timestamp values in AtTimestamp
215
- `UnsupportedOperationException` - When using legacy conversion with AT_TIMESTAMP
216
- `NullPointerException` - For null position objects passed to builder
217
218
**Error Handling Example:**
219
220
```scala
221
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
222
import java.util.Date
223
224
try {
225
// This will throw IllegalArgumentException
226
val invalidPosition = new AtTimestamp(null)
227
228
val stream = KinesisInputDStream.builder
229
.streamingContext(ssc)
230
.streamName("my-stream")
231
.checkpointAppName("my-app")
232
.initialPosition(invalidPosition)
233
.build()
234
235
} catch {
236
case e: IllegalArgumentException =>
237
println(s"Invalid position configuration: ${e.getMessage}")
238
// Use default Latest position as fallback
239
val fallbackStream = KinesisInputDStream.builder
240
.streamingContext(ssc)
241
.streamName("my-stream")
242
.checkpointAppName("my-app")
243
.initialPosition(new Latest())
244
.build()
245
}
246
```
247
248
### Performance Considerations
249
250
- **Latest**: Fastest startup, minimal initial data processing
251
- **TrimHorizon**: Slower startup, may require significant time to process backlog
252
- **AtTimestamp**: Startup time depends on timestamp age and data volume
253
254
Monitor KCL metrics to track catch-up progress when using TrimHorizon or historical AtTimestamp positions.