or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdstream-creation.mdstream-positioning.mdtesting.md

stream-positioning.mddocs/

0

# Stream Positioning

1

2

Configuration options for specifying where to start reading from Kinesis streams, supporting latest records, trim horizon, and timestamp-based positioning.

3

4

## Capabilities

5

6

### KinesisInitialPosition Interface

7

8

Base interface for all initial position types used to specify stream reading start points.

9

10

```java { .api }

11

/**

12

* Base interface for initial position specification in Kinesis streams

13

*/

14

interface KinesisInitialPosition {

15

/** Returns the corresponding KCL InitialPositionInStream enum value */

16

InitialPositionInStream getPosition();

17

}

18

```

19

20

### Latest Position

21

22

Starts reading from the latest (most recent) records in the stream, skipping all existing data.

23

24

```java { .api }

25

/**

26

* Start reading from the latest records in the stream

27

* Equivalent to InitialPositionInStream.LATEST

28

*/

29

public class Latest implements KinesisInitialPosition, Serializable {

30

public Latest();

31

32

@Override

33

public InitialPositionInStream getPosition();

34

}

35

```

36

37

**Usage Example:**

38

39

```scala

40

import org.apache.spark.streaming.kinesis.KinesisInputDStream

41

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

42

43

// Start from latest records (default behavior)

44

val stream = KinesisInputDStream.builder

45

.streamingContext(ssc)

46

.streamName("my-stream")

47

.checkpointAppName("my-app")

48

.initialPosition(new Latest())

49

.build()

50

```

51

52

### Trim Horizon Position

53

54

Starts reading from the trim horizon (oldest available records), processing all data within the retention period.

55

56

```java { .api }

57

/**

58

* Start reading from the trim horizon (oldest available records)

59

* Kinesis retains data for 24 hours to 7 days depending on configuration

60

* Equivalent to InitialPositionInStream.TRIM_HORIZON

61

*/

62

public class TrimHorizon implements KinesisInitialPosition, Serializable {

63

public TrimHorizon();

64

65

@Override

66

public InitialPositionInStream getPosition();

67

}

68

```

69

70

**Usage Example:**

71

72

```scala

73

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon

74

75

// Start from oldest available records

76

val stream = KinesisInputDStream.builder

77

.streamingContext(ssc)

78

.streamName("my-stream")

79

.checkpointAppName("my-app")

80

.initialPosition(new TrimHorizon())

81

.build()

82

```

83

84

### Timestamp Position

85

86

Starts reading from records at or after a specific timestamp, useful for reprocessing data from a known point in time.

87

88

```java { .api }

89

/**

90

* Start reading from records at or after the specified timestamp

91

* Equivalent to InitialPositionInStream.AT_TIMESTAMP

92

*/

93

public class AtTimestamp implements KinesisInitialPosition, Serializable {

94

/**

95

* Create timestamp-based position

96

* @param timestamp The timestamp to start reading from

97

*/

98

public AtTimestamp(java.util.Date timestamp);

99

100

@Override

101

public InitialPositionInStream getPosition();

102

103

/**

104

* Get the configured timestamp

105

* @return The timestamp for this position

106

*/

107

public java.util.Date getTimestamp();

108

}

109

```

110

111

**Usage Examples:**

112

113

```scala

114

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp

115

import java.util.Date

116

import java.time.Instant

117

118

// Start from a specific timestamp

119

val startTime = Date.from(Instant.parse("2024-01-01T12:00:00Z"))

120

val timestampPosition = new AtTimestamp(startTime)

121

122

val stream = KinesisInputDStream.builder

123

.streamingContext(ssc)

124

.streamName("my-stream")

125

.checkpointAppName("my-app")

126

.initialPosition(timestampPosition)

127

.build()

128

129

// Start from 1 hour ago

130

val oneHourAgo = new Date(System.currentTimeMillis() - 3600000)

131

val recentPosition = new AtTimestamp(oneHourAgo)

132

133

// Reprocess data from yesterday

134

val yesterday = Date.from(Instant.now().minusSeconds(86400))

135

val reprocessPosition = new AtTimestamp(yesterday)

136

```

137

138

### Legacy Position Conversion

139

140

Factory method for converting legacy KCL enum values to new position objects (used internally).

141

142

```java { .api }

143

public class KinesisInitialPositions {

144

/**

145

* Converts legacy InitialPositionInStream enum to KinesisInitialPosition

146

* Used internally for backward compatibility with deprecated APIs

147

* @param initialPositionInStream Legacy enum value

148

* @return Corresponding KinesisInitialPosition instance

149

* @throws UnsupportedOperationException for AT_TIMESTAMP (use AtTimestamp constructor instead)

150

*/

151

public static KinesisInitialPosition fromKinesisInitialPosition(

152

InitialPositionInStream initialPositionInStream

153

) throws UnsupportedOperationException;

154

}

155

```

156

157

**Usage Example (Internal):**

158

159

```scala

160

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

161

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

162

163

// Convert legacy enum (used internally by deprecated APIs)

164

val legacyLatest = InitialPositionInStream.LATEST

165

val position = KinesisInitialPositions.fromKinesisInitialPosition(legacyLatest)

166

// Returns new Latest() instance

167

168

// AT_TIMESTAMP not supported in conversion - throws UnsupportedOperationException

169

val legacyTimestamp = InitialPositionInStream.AT_TIMESTAMP

170

// KinesisInitialPositions.fromKinesisInitialPosition(legacyTimestamp) // Throws exception

171

```

172

173

### Position Selection Guidelines

174

175

**Use Latest when:**

176

- Processing real-time data only

177

- Not concerned with historical data

178

- Starting a new application for the first time

179

- Processing high-volume streams where catching up is not feasible

180

181

**Use TrimHorizon when:**

182

- Need to process all available historical data

183

- Implementing data migration or backfill scenarios

184

- Ensuring no data loss during application restarts

185

- Starting a new consumer that needs complete data

186

187

**Use AtTimestamp when:**

188

- Reprocessing data from a specific failure point

189

- Implementing data replay scenarios

190

- Starting consumption from a known business event time

191

- Debugging issues that occurred at specific times

192

193

### Checkpointing Behavior

194

195

Initial positions only apply when **no checkpoint data exists** in DynamoDB:

196

197

```scala

198

// First run: Uses TrimHorizon, starts from oldest records

199

val stream = KinesisInputDStream.builder

200

.streamingContext(ssc)

201

.streamName("my-stream")

202

.checkpointAppName("my-app") // No DynamoDB table exists yet

203

.initialPosition(new TrimHorizon())

204

.build()

205

206

// Subsequent runs: Ignores TrimHorizon, resumes from last checkpoint

207

// The same configuration will continue from where it left off

208

```

209

210

### Error Handling

211

212

Position configuration can fail with:

213

214

- `IllegalArgumentException` - For null timestamp values in AtTimestamp

215

- `UnsupportedOperationException` - When using legacy conversion with AT_TIMESTAMP

216

- `NullPointerException` - For null position objects passed to builder

217

218

**Error Handling Example:**

219

220

```scala

221

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp

222

import java.util.Date

223

224

try {

225

// This will throw IllegalArgumentException

226

val invalidPosition = new AtTimestamp(null)

227

228

val stream = KinesisInputDStream.builder

229

.streamingContext(ssc)

230

.streamName("my-stream")

231

.checkpointAppName("my-app")

232

.initialPosition(invalidPosition)

233

.build()

234

235

} catch {

236

case e: IllegalArgumentException =>

237

println(s"Invalid position configuration: ${e.getMessage}")

238

// Use default Latest position as fallback

239

val fallbackStream = KinesisInputDStream.builder

240

.streamingContext(ssc)

241

.streamName("my-stream")

242

.checkpointAppName("my-app")

243

.initialPosition(new Latest())

244

.build()

245

}

246

```

247

248

### Performance Considerations

249

250

- **Latest**: Fastest startup, minimal initial data processing

251

- **TrimHorizon**: Slower startup, may require significant time to process backlog

252

- **AtTimestamp**: Startup time depends on timestamp age and data volume

253

254

Monitor KCL metrics to track catch-up progress when using TrimHorizon or historical AtTimestamp positions.