or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aws-credentials.mdindex.mdinitial-positions.mdpython-api.mdstream-creation.md

initial-positions.mddocs/

0

# Initial Position Configuration

1

2

Configure where to start reading from Kinesis streams with support for latest records, earliest available records, or starting from a specific timestamp.

3

4

## Initial Position Types

5

6

```scala { .api }

7

package org.apache.spark.streaming.kinesis

8

9

trait KinesisInitialPosition {

10

def getPosition(): InitialPositionInStream

11

}

12

13

object KinesisInitialPositions {

14

class Latest() extends KinesisInitialPosition

15

class TrimHorizon() extends KinesisInitialPosition

16

class AtTimestamp(timestamp: Date) extends KinesisInitialPosition

17

18

def fromKinesisInitialPosition(

19

initialPositionInStream: InitialPositionInStream

20

): KinesisInitialPosition

21

}

22

```

23

24

## Latest Position

25

26

Start reading from the latest (most recent) records in the stream. This is the default behavior.

27

28

```scala

29

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest

30

31

val stream = KinesisInputDStream.builder

32

.streamingContext(ssc)

33

.streamName("my-stream")

34

.checkpointAppName("my-app")

35

.initialPosition(new Latest())

36

.build()

37

```

38

39

**Use Case:** When you only want to process new data that arrives after the application starts.

40

41

## Trim Horizon Position

42

43

Start reading from the earliest available records in the stream (within the retention period, typically 24 hours).

44

45

```scala

46

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon

47

48

val stream = KinesisInputDStream.builder

49

.streamingContext(ssc)

50

.streamName("my-stream")

51

.checkpointAppName("my-app")

52

.initialPosition(new TrimHorizon())

53

.build()

54

```

55

56

**Use Case:** When you want to process all available historical data in the stream.

57

58

## At Timestamp Position

59

60

Start reading from records at or after a specific timestamp.

61

62

```scala

63

import java.util.Date

64

import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp

65

66

// Start from a specific time

67

val startTime = new Date(System.currentTimeMillis() - 3600000) // 1 hour ago

68

69

val stream = KinesisInputDStream.builder

70

.streamingContext(ssc)

71

.streamName("my-stream")

72

.checkpointAppName("my-app")

73

.initialPosition(new AtTimestamp(startTime))

74

.build()

75

```

76

77

**Use Case:** When you want to replay data from a specific point in time.

78

79

## Java API Usage

80

81

```java

82

import java.util.Date;

83

import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

84

85

// Latest position

86

KinesisInputDStream.builder()

87

.initialPosition(new KinesisInitialPositions.Latest())

88

.build();

89

90

// Trim horizon position

91

KinesisInputDStream.builder()

92

.initialPosition(new KinesisInitialPositions.TrimHorizon())

93

.build();

94

95

// At timestamp position

96

Date timestamp = new Date(System.currentTimeMillis() - 3600000);

97

KinesisInputDStream.builder()

98

.initialPosition(new KinesisInitialPositions.AtTimestamp(timestamp))

99

.build();

100

```

101

102

## Legacy API (Deprecated)

103

104

The legacy `initialPositionInStream` method is deprecated but still supported for backward compatibility.

105

106

```scala

107

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

108

109

// Deprecated - use initialPosition instead

110

val stream = KinesisInputDStream.builder

111

.streamingContext(ssc)

112

.streamName("my-stream")

113

.checkpointAppName("my-app")

114

.initialPositionInStream(InitialPositionInStream.LATEST) // Deprecated

115

.build()

116

```

117

118

## Conversion Utility

119

120

Convert from KCL's InitialPositionInStream enum to KinesisInitialPosition objects.

121

122

```scala

123

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream

124

import org.apache.spark.streaming.kinesis.KinesisInitialPositions

125

126

// Convert from KCL enum

127

val position = KinesisInitialPositions.fromKinesisInitialPosition(

128

InitialPositionInStream.TRIM_HORIZON

129

)

130

131

val stream = KinesisInputDStream.builder

132

.streamingContext(ssc)

133

.streamName("my-stream")

134

.checkpointAppName("my-app")

135

.initialPosition(position)

136

.build()

137

```

138

139

**Note:** The conversion utility only supports `LATEST` and `TRIM_HORIZON`. For `AT_TIMESTAMP`, use the `AtTimestamp` class directly.

140

141

## Checkpointing Behavior

142

143

Initial positions only apply when there are no existing checkpoints for the application:

144

145

- **No existing checkpoints**: Uses the specified initial position

146

- **Existing checkpoints**: Resumes from the last checkpointed position, ignoring the initial position setting

147

148

To force reading from a specific position, you must either:

149

1. Use a new checkpoint application name

150

2. Clear the existing DynamoDB checkpoint table

151

3. Wait for checkpoints to expire (based on DynamoDB TTL settings)

152

153

## Best Practices

154

155

### Latest Position

156

- Use for real-time processing of new events

157

- Minimizes startup time and resource usage

158

- Good for monitoring and alerting use cases

159

160

### Trim Horizon Position

161

- Use for batch processing of historical data

162

- Ensures no data loss when reprocessing

163

- May require more time and resources to catch up

164

165

### At Timestamp Position

166

- Use for precise replay scenarios

167

- Useful for debugging or reprocessing specific time ranges

168

- Consider timezone handling when working with timestamps

169

170

### Error Handling

171

All initial position configurations will throw an exception if the position is invalid or if the timestamp is outside the stream's retention period.