or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdconfiguration.mdindex.mdinitial-position.mdpython-integration.mdstream-creation.md

initial-position.mddocs/

0

# Initial Position Configuration

1

2

The initial position system controls where Spark Streaming starts reading from a Kinesis stream when no checkpoint data exists. This is crucial for determining whether to process historical data, start with new data, or begin from a specific point in time.

3

4

## Core API

5

6

### KinesisInitialPosition Interface

7

8

The base interface for initial position configurations.

9

10

```java { .api }

11

interface KinesisInitialPosition {

12

InitialPositionInStream getPosition();

13

}

14

```

15

16

### Position Implementations

17

18

```java { .api }

19

public class KinesisInitialPositions {

20

// Start reading from the latest records (most recent)

21

public static class Latest implements KinesisInitialPosition, Serializable {

22

public Latest();

23

public InitialPositionInStream getPosition();

24

}

25

26

// Start reading from the oldest available records (up to 24 hours retention)

27

public static class TrimHorizon implements KinesisInitialPosition, Serializable {

28

public TrimHorizon();

29

public InitialPositionInStream getPosition();

30

}

31

32

// Start reading from a specific timestamp

33

public static class AtTimestamp implements KinesisInitialPosition, Serializable {

34

public AtTimestamp(Date timestamp);

35

public InitialPositionInStream getPosition();

36

public Date getTimestamp();

37

}

38

39

// Utility method for backward compatibility (deprecated usage)

40

public static KinesisInitialPosition fromKinesisInitialPosition(

41

InitialPositionInStream initialPosition

42

) throws UnsupportedOperationException;

43

}

44

```

45

46

## Latest Position

47

48

Starts consuming from the most recent records in the stream. This is the default behavior and is ideal for real-time processing where you only care about new data.

49

50

```scala

51

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

52

53

val stream = KinesisInputDStream.builder

54

.streamingContext(ssc)

55

.streamName("my-stream")

56

.checkpointAppName("my-app")

57

.initialPosition(new KinesisInitialPositions.Latest())

58

.build()

59

```

60

61

**Use cases:**

62

- Real-time analytics where historical data is not needed

63

- Event-driven applications that only process new events

64

- High-throughput streams where catching up on old data would be overwhelming

65

66

## Trim Horizon Position

67

68

Starts consuming from the oldest available records in the stream (within Kinesis retention period, up to 24 hours by default).

69

70

```scala

71

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

72

73

val stream = KinesisInputDStream.builder

74

.streamingContext(ssc)

75

.streamName("my-stream")

76

.checkpointAppName("my-app")

77

.initialPosition(new KinesisInitialPositions.TrimHorizon())

78

.build()

79

```

80

81

**Use cases:**

82

- Batch processing applications that need to process all available data

83

- Data migration or backfill scenarios

84

- Applications that cannot afford to miss any data

85

86

## At Timestamp Position

87

88

Starts consuming from records at or after a specific timestamp. This provides precise control over the starting point.

89

90

```scala

91

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

92

import java.util.Date

93

94

// Start from records after a specific time

95

val startTime = new Date(System.currentTimeMillis() - (2 * 60 * 60 * 1000)) // 2 hours ago

96

val timestamp = new KinesisInitialPositions.AtTimestamp(startTime)

97

98

val stream = KinesisInputDStream.builder

99

.streamingContext(ssc)

100

.streamName("my-stream")

101

.checkpointAppName("my-app")

102

.initialPosition(timestamp)

103

.build()

104

```

105

106

**Use cases:**

107

- Reprocessing data from a specific point in time

108

- Recovery scenarios where you know when issues occurred

109

- Testing with historical data from a known timeframe

110

111

## Deprecated API (Backward Compatibility)

112

113

For backward compatibility, the deprecated `initialPositionInStream` method is still available:

114

115

```scala

116

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

117

118

val stream = KinesisInputDStream.builder

119

.streamingContext(ssc)

120

.streamName("my-stream")

121

.checkpointAppName("my-app")

122

.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated in 2.3.0

123

.build()

124

```

125

126

**Note**: This method only supports `LATEST` and `TRIM_HORIZON`. For timestamp-based positioning, use the new `initialPosition` method.

127

128

## Important Behavior Notes

129

130

### Checkpointing Takes Precedence

131

132

Initial position is only used when **no checkpoint data exists**. If your application has previously checkpointed progress, it will resume from the checkpointed position regardless of the initial position setting.

133

134

```scala

135

// First run: starts from TRIM_HORIZON

136

// Subsequent runs: resumes from last checkpoint, ignoring TRIM_HORIZON

137

val stream = KinesisInputDStream.builder

138

.streamingContext(ssc)

139

.streamName("my-stream")

140

.checkpointAppName("existing-app") // Has checkpoint data

141

.initialPosition(new KinesisInitialPositions.TrimHorizon())

142

.build()

143

```

144

145

To force a restart from the initial position, you must:

146

1. Delete the DynamoDB checkpoint table, or

147

2. Use a different `checkpointAppName`

148

149

### Kinesis Retention Limits

150

151

Kinesis streams have a retention period (default 24 hours, configurable up to 365 days). Records older than the retention period are not available.

152

153

```scala

154

// This will start from the earliest available record within retention

155

val stream = KinesisInputDStream.builder

156

.initialPosition(new KinesisInitialPositions.TrimHorizon())

157

.build()

158

159

// This may not find records if timestamp is outside retention period

160

val oldTimestamp = new Date(System.currentTimeMillis() - (48 * 60 * 60 * 1000)) // 48 hours ago

161

val stream2 = KinesisInputDStream.builder

162

.initialPosition(new KinesisInitialPositions.AtTimestamp(oldTimestamp))

163

.build()

164

```

165

166

## Complete Examples

167

168

### Real-time Processing

169

170

```scala

171

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

172

173

// Only process new data as it arrives

174

val realtimeStream = KinesisInputDStream.builder

175

.streamingContext(ssc)

176

.streamName("real-time-events")

177

.checkpointAppName("realtime-processor")

178

.initialPosition(new KinesisInitialPositions.Latest())

179

.build()

180

181

realtimeStream.foreachRDD { rdd =>

182

println(s"Processing ${rdd.count()} new records")

183

// Process only recent data

184

}

185

```

186

187

### Historical Data Processing

188

189

```scala

190

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

191

192

// Process all available historical data

193

val batchStream = KinesisInputDStream.builder

194

.streamingContext(ssc)

195

.streamName("historical-data")

196

.checkpointAppName("batch-processor")

197

.initialPosition(new KinesisInitialPositions.TrimHorizon())

198

.build()

199

200

batchStream.foreachRDD { rdd =>

201

println(s"Processing batch of ${rdd.count()} records")

202

// Process all available data

203

}

204

```

205

206

### Point-in-Time Recovery

207

208

```scala

209

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

210

import java.util.Date

211

import java.text.SimpleDateFormat

212

213

// Start processing from a specific incident time

214

val incidentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

215

.parse("2024-01-15 14:30:00")

216

217

val recoveryStream = KinesisInputDStream.builder

218

.streamingContext(ssc)

219

.streamName("application-logs")

220

.checkpointAppName("incident-recovery") // Use unique name to avoid existing checkpoints

221

.initialPosition(new KinesisInitialPositions.AtTimestamp(incidentTime))

222

.build()

223

224

recoveryStream.foreachRDD { rdd =>

225

// Reprocess data from the incident onwards

226

rdd.collect().foreach { record =>

227

println(s"Reprocessing: ${new String(record)}")

228

}

229

}

230

```

231

232

### Development and Testing

233

234

```scala

235

import org.apache.spark.streaming.kinesis.{KinesisInputDStream, KinesisInitialPositions}

236

237

// For testing: start from a known point with limited data

238

val testStartTime = new Date(System.currentTimeMillis() - (30 * 60 * 1000)) // 30 minutes ago

239

240

val testStream = KinesisInputDStream.builder

241

.streamingContext(ssc)

242

.streamName("test-stream")

243

.checkpointAppName(s"test-${System.currentTimeMillis()}") // Unique name for each test

244

.initialPosition(new KinesisInitialPositions.AtTimestamp(testStartTime))

245

.build()

246

```

247

248

## Best Practices

249

250

1. **Use Latest for real-time**: Choose `Latest` for applications that only need new data

251

2. **Use TrimHorizon for completeness**: Choose `TrimHorizon` when you need all available data

252

3. **Use AtTimestamp for precision**: Choose `AtTimestamp` for point-in-time recovery or testing

253

4. **Consider checkpoint behavior**: Remember that checkpoints override initial position

254

5. **Plan for retention limits**: Ensure your timestamp is within the stream's retention period

255

6. **Test with different positions**: Verify your application works correctly with various starting points