or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

offsets.mddocs/

0

# Offset Management

1

2

Offset management provides position tracking for resumable queries and stream consumption. Different offset types support various backend implementations and consistency requirements.

3

4

## Capabilities

5

6

### Offset Base Class

7

8

Abstract base class for all offset implementations.

9

10

```scala { .api }

11

/**

12

* Base class for query offsets that track position in event streams

13

*/

14

abstract class Offset

15

```

16

17

### Sequence Offset

18

19

Ordered sequence number-based offset for strictly ordered backends.

20

21

```scala { .api }

22

/**

23

* Corresponds to an ordered sequence number for the events.

24

* The offset is exclusive, i.e. the event with the exact same sequence number will not be included

25

* in the returned stream.

26

*/

27

final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {

28

def compare(that: Sequence): Int

29

}

30

```

31

32

**Usage Examples:**

33

34

```scala

35

import akka.persistence.query.Sequence

36

37

// Create sequence offset

38

val offset = Sequence(1000L)

39

40

// Use in query

41

readJournal.eventsByTag("user-events", offset)

42

43

// Ordering comparison

44

val offset1 = Sequence(100L)

45

val offset2 = Sequence(200L)

46

println(offset1 < offset2) // true

47

```

48

49

### Time-Based UUID Offset

50

51

UUID-based offset for time-ordered event storage systems.

52

53

```scala { .api }

54

/**

55

* Corresponds to an ordered unique identifier of the events.

56

* The offset is exclusive, i.e. the event with the exact same sequence number will not be included

57

* in the returned stream.

58

*/

59

final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {

60

def compare(other: TimeBasedUUID): Int

61

}

62

```

63

64

**Usage Examples:**

65

66

```scala

67

import akka.persistence.query.TimeBasedUUID

68

import java.util.UUID

69

70

// Create time-based UUID offset (must be version 1 UUID)

71

val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")

72

val offset = TimeBasedUUID(timeUuid)

73

74

// Use in query

75

readJournal.eventsByTag("order-events", offset)

76

77

// Will throw IllegalArgumentException for non-time-based UUIDs

78

try {

79

TimeBasedUUID(UUID.randomUUID()) // Random UUID (version 4)

80

} catch {

81

case _: IllegalArgumentException => println("Must be time-based UUID")

82

}

83

```

84

85

### Timestamp Offset

86

87

Timestamp-based offset with seen sequence number tracking for handling concurrent events.

88

89

```scala { .api }

90

/**

91

* Timestamp based offset. Since there can be several events for the same timestamp it keeps

92

* track of what sequence nrs for every persistence id that have been seen at this specific timestamp.

93

*

94

* The offset is exclusive, i.e. the event with the exact same sequence number will not be included

95

* in the returned stream.

96

*/

97

final case class TimestampOffset(

98

/** Time when the event was stored, microsecond granularity database timestamp */

99

timestamp: Instant,

100

/** Time when the event was read, microsecond granularity database timestamp */

101

readTimestamp: Instant,

102

/** List of sequence nrs for every persistence id seen at this timestamp */

103

seen: Map[String, Long]

104

) extends Offset {

105

/** Java API */

106

def getSeen(): java.util.Map[String, java.lang.Long]

107

}

108

```

109

110

**Usage Examples:**

111

112

```scala

113

import akka.persistence.query.TimestampOffset

114

import java.time.Instant

115

116

// Create timestamp offset

117

val timestamp = Instant.now()

118

val seen = Map("user-123" -> 5L, "order-456" -> 12L)

119

val offset = TimestampOffset(timestamp, timestamp, seen)

120

121

// Use in query

122

readJournal.eventsByTag("payment-events", offset)

123

124

// Java API usage

125

val javaSeenMap = offset.getSeen()

126

```

127

128

### No Offset

129

130

Marker object for retrieving all events from the beginning.

131

132

```scala { .api }

133

/**

134

* Used when retrieving all events.

135

*/

136

case object NoOffset extends Offset {

137

/** Java API */

138

def getInstance: Offset

139

}

140

```

141

142

**Usage Examples:**

143

144

```scala

145

import akka.persistence.query.NoOffset

146

147

// Start from beginning

148

readJournal.eventsByTag("all-events", NoOffset)

149

150

// Java API

151

import akka.persistence.query.NoOffset;

152

readJournal.eventsByTag("all-events", NoOffset.getInstance());

153

```

154

155

### Offset Factory

156

157

Factory methods for creating offset instances.

158

159

```scala { .api }

160

object Offset {

161

/** Get NoOffset instance */

162

def noOffset: Offset

163

164

/** Create sequence offset from long value */

165

def sequence(value: Long): Offset

166

167

/** Create time-based UUID offset */

168

def timeBasedUUID(uuid: UUID): Offset

169

170

/** Create timestamp offset with empty seen map */

171

def timestamp(instant: Instant): TimestampOffset

172

}

173

```

174

175

**Usage Examples:**

176

177

```scala

178

import akka.persistence.query.Offset

179

import java.time.Instant

180

import java.util.UUID

181

182

// Factory methods

183

val noOffset = Offset.noOffset

184

val seqOffset = Offset.sequence(1000L)

185

val timeOffset = Offset.timestamp(Instant.now())

186

187

// Time-based UUID

188

val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")

189

val uuidOffset = Offset.timeBasedUUID(timeUuid)

190

```

191

192

### TimestampOffset Utilities

193

194

Utility methods and constants for timestamp offset handling.

195

196

```scala { .api }

197

object TimestampOffset {

198

/** Zero timestamp offset representing epoch */

199

val Zero: TimestampOffset

200

201

/** Create timestamp offset with given timestamp and seen map */

202

def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset

203

204

/** Try to convert any Offset to TimestampOffset. Epoch timestamp is used for NoOffset. */

205

def toTimestampOffset(offset: Offset): TimestampOffset

206

}

207

```

208

209

**Usage Examples:**

210

211

```scala

212

import akka.persistence.query.{TimestampOffset, NoOffset, Sequence}

213

import java.time.Instant

214

215

// Zero offset

216

val zeroOffset = TimestampOffset.Zero

217

218

// Convert various offsets

219

val timestampFromNo = TimestampOffset.toTimestampOffset(NoOffset) // Returns Zero

220

val timestampFromSeq = try {

221

TimestampOffset.toTimestampOffset(Sequence(100L))

222

} catch {

223

case _: IllegalArgumentException =>

224

println("Cannot convert Sequence to TimestampOffset")

225

}

226

227

// Create with seen map

228

val seen = Map("entity-1" -> 10L, "entity-2" -> 5L)

229

val offset = TimestampOffset(Instant.now(), seen)

230

```

231

232

## Offset Usage Patterns

233

234

### Query Resumption

235

236

Store and restore offsets for resumable event processing:

237

238

```scala

239

// Store offset from last processed event

240

var lastOffset: Offset = NoOffset

241

242

readJournal

243

.eventsByTag("user-events", lastOffset)

244

.runForeach { envelope =>

245

// Process event

246

processEvent(envelope.event)

247

248

// Update stored offset

249

lastOffset = envelope.offset

250

}

251

252

// Later, resume from stored offset

253

readJournal

254

.eventsByTag("user-events", lastOffset)

255

.runForeach(processEvent)

256

```

257

258

### Offset Comparison

259

260

Compare offsets for ordering (where supported):

261

262

```scala

263

val offset1 = Sequence(100L)

264

val offset2 = Sequence(200L)

265

266

if (offset1 < offset2) {

267

println("offset1 comes before offset2")

268

}

269

270

// Time-based UUID comparison

271

val uuid1 = TimeBasedUUID(timeUuid1)

272

val uuid2 = TimeBasedUUID(timeUuid2)

273

val ordered = List(uuid2, uuid1).sorted // Uses UUID ordering

274

```

275

276

### Backend-Specific Offset Selection

277

278

Choose appropriate offset type based on backend capabilities:

279

280

```scala

281

// For sequence-based backends (like LevelDB)

282

val sequenceOffset = Sequence(lastProcessedSeqNr)

283

284

// For timestamp-based backends (like Cassandra)

285

val timestampOffset = TimestampOffset(lastProcessedTime, seenSequenceNrs)

286

287

// For UUID-based backends

288

val uuidOffset = TimeBasedUUID(lastProcessedUuid)

289

```

290

291

## Error Handling

292

293

### Offset Validation

294

295

```scala

296

// TimeBasedUUID validation

297

try {

298

val offset = TimeBasedUUID(someUuid)

299

} catch {

300

case e: IllegalArgumentException =>

301

println(s"Invalid UUID: ${e.getMessage}")

302

}

303

304

// TimestampOffset conversion

305

try {

306

val converted = TimestampOffset.toTimestampOffset(someOffset)

307

} catch {

308

case e: IllegalArgumentException =>

309

println(s"Cannot convert offset: ${e.getMessage}")

310

}

311

```

312

313

### Offset Compatibility

314

315

Different backends support different offset types:

316

- **LevelDB**: Primarily `Sequence` offsets

317

- **Cassandra**: `TimestampOffset` with microsecond precision

318

- **Custom backends**: May implement any offset type

319

320

## Types

321

322

```scala { .api }

323

trait Ordered[A] {

324

def compare(that: A): Int

325

}

326

327

case class Range(start: Int, end: Int)

328

329

sealed trait UUIDComparator {

330

def compare(uuid1: UUID, uuid2: UUID): Int

331

}

332

```