0
# Offset Management
1
2
Offset management provides position tracking for resumable queries and stream consumption. Different offset types support various backend implementations and consistency requirements.
3
4
## Capabilities
5
6
### Offset Base Class
7
8
Abstract base class for all offset implementations.
9
10
```scala { .api }
11
/**
12
* Base class for query offsets that track position in event streams
13
*/
14
abstract class Offset
15
```
16
17
### Sequence Offset
18
19
Ordered sequence number-based offset for strictly ordered backends.
20
21
```scala { .api }
22
/**
23
* Corresponds to an ordered sequence number for the events.
24
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
25
* in the returned stream.
26
*/
27
final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
28
def compare(that: Sequence): Int
29
}
30
```
31
32
**Usage Examples:**
33
34
```scala
35
import akka.persistence.query.Sequence
36
37
// Create sequence offset
38
val offset = Sequence(1000L)
39
40
// Use in query
41
readJournal.eventsByTag("user-events", offset)
42
43
// Ordering comparison
44
val offset1 = Sequence(100L)
45
val offset2 = Sequence(200L)
46
println(offset1 < offset2) // true
47
```
48
49
### Time-Based UUID Offset
50
51
UUID-based offset for time-ordered event storage systems.
52
53
```scala { .api }
54
/**
55
* Corresponds to an ordered unique identifier of the events.
56
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
57
* in the returned stream.
58
*/
59
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
60
def compare(other: TimeBasedUUID): Int
61
}
62
```
63
64
**Usage Examples:**
65
66
```scala
67
import akka.persistence.query.TimeBasedUUID
68
import java.util.UUID
69
70
// Create time-based UUID offset (must be version 1 UUID)
71
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
72
val offset = TimeBasedUUID(timeUuid)
73
74
// Use in query
75
readJournal.eventsByTag("order-events", offset)
76
77
// Will throw IllegalArgumentException for non-time-based UUIDs
78
try {
79
TimeBasedUUID(UUID.randomUUID()) // Random UUID (version 4)
80
} catch {
81
case _: IllegalArgumentException => println("Must be time-based UUID")
82
}
83
```
84
85
### Timestamp Offset
86
87
Timestamp-based offset with seen sequence number tracking for handling concurrent events.
88
89
```scala { .api }
90
/**
91
* Timestamp based offset. Since there can be several events for the same timestamp it keeps
92
* track of what sequence nrs for every persistence id that have been seen at this specific timestamp.
93
*
94
* The offset is exclusive, i.e. the event with the exact same sequence number will not be included
95
* in the returned stream.
96
*/
97
final case class TimestampOffset(
98
/** Time when the event was stored, microsecond granularity database timestamp */
99
timestamp: Instant,
100
/** Time when the event was read, microsecond granularity database timestamp */
101
readTimestamp: Instant,
102
/** List of sequence nrs for every persistence id seen at this timestamp */
103
seen: Map[String, Long]
104
) extends Offset {
105
/** Java API */
106
def getSeen(): java.util.Map[String, java.lang.Long]
107
}
108
```
109
110
**Usage Examples:**
111
112
```scala
113
import akka.persistence.query.TimestampOffset
114
import java.time.Instant
115
116
// Create timestamp offset
117
val timestamp = Instant.now()
118
val seen = Map("user-123" -> 5L, "order-456" -> 12L)
119
val offset = TimestampOffset(timestamp, timestamp, seen)
120
121
// Use in query
122
readJournal.eventsByTag("payment-events", offset)
123
124
// Java API usage
125
val javaSeenMap = offset.getSeen()
126
```
127
128
### No Offset
129
130
Marker object for retrieving all events from the beginning.
131
132
```scala { .api }
133
/**
134
* Used when retrieving all events.
135
*/
136
case object NoOffset extends Offset {
137
/** Java API */
138
def getInstance: Offset
139
}
140
```
141
142
**Usage Examples:**
143
144
```scala
145
import akka.persistence.query.NoOffset
146
147
// Start from beginning
148
readJournal.eventsByTag("all-events", NoOffset)
149
150
// Java API
151
import akka.persistence.query.NoOffset;
152
readJournal.eventsByTag("all-events", NoOffset.getInstance());
153
```
154
155
### Offset Factory
156
157
Factory methods for creating offset instances.
158
159
```scala { .api }
160
object Offset {
161
/** Get NoOffset instance */
162
def noOffset: Offset
163
164
/** Create sequence offset from long value */
165
def sequence(value: Long): Offset
166
167
/** Create time-based UUID offset */
168
def timeBasedUUID(uuid: UUID): Offset
169
170
/** Create timestamp offset with empty seen map */
171
def timestamp(instant: Instant): TimestampOffset
172
}
173
```
174
175
**Usage Examples:**
176
177
```scala
178
import akka.persistence.query.Offset
179
import java.time.Instant
180
import java.util.UUID
181
182
// Factory methods
183
val noOffset = Offset.noOffset
184
val seqOffset = Offset.sequence(1000L)
185
val timeOffset = Offset.timestamp(Instant.now())
186
187
// Time-based UUID
188
val timeUuid = UUID.fromString("550e8400-e29b-11d4-a716-446655440000")
189
val uuidOffset = Offset.timeBasedUUID(timeUuid)
190
```
191
192
### TimestampOffset Utilities
193
194
Utility methods and constants for timestamp offset handling.
195
196
```scala { .api }
197
object TimestampOffset {
198
/** Zero timestamp offset representing epoch */
199
val Zero: TimestampOffset
200
201
/** Create timestamp offset with given timestamp and seen map */
202
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
203
204
/** Try to convert any Offset to TimestampOffset. Epoch timestamp is used for NoOffset. */
205
def toTimestampOffset(offset: Offset): TimestampOffset
206
}
207
```
208
209
**Usage Examples:**
210
211
```scala
212
import akka.persistence.query.{TimestampOffset, NoOffset, Sequence}
213
import java.time.Instant
214
215
// Zero offset
216
val zeroOffset = TimestampOffset.Zero
217
218
// Convert various offsets
219
val timestampFromNo = TimestampOffset.toTimestampOffset(NoOffset) // Returns Zero
220
val timestampFromSeq = try {
221
TimestampOffset.toTimestampOffset(Sequence(100L))
222
} catch {
223
case _: IllegalArgumentException =>
224
println("Cannot convert Sequence to TimestampOffset")
225
}
226
227
// Create with seen map
228
val seen = Map("entity-1" -> 10L, "entity-2" -> 5L)
229
val offset = TimestampOffset(Instant.now(), seen)
230
```
231
232
## Offset Usage Patterns
233
234
### Query Resumption
235
236
Store and restore offsets for resumable event processing:
237
238
```scala
239
// Store offset from last processed event
240
var lastOffset: Offset = NoOffset
241
242
readJournal
243
.eventsByTag("user-events", lastOffset)
244
.runForeach { envelope =>
245
// Process event
246
processEvent(envelope.event)
247
248
// Update stored offset
249
lastOffset = envelope.offset
250
}
251
252
// Later, resume from stored offset
253
readJournal
254
.eventsByTag("user-events", lastOffset)
255
.runForeach(processEvent)
256
```
257
258
### Offset Comparison
259
260
Compare offsets for ordering (where supported):
261
262
```scala
263
val offset1 = Sequence(100L)
264
val offset2 = Sequence(200L)
265
266
if (offset1 < offset2) {
267
println("offset1 comes before offset2")
268
}
269
270
// Time-based UUID comparison
271
val uuid1 = TimeBasedUUID(timeUuid1)
272
val uuid2 = TimeBasedUUID(timeUuid2)
273
val ordered = List(uuid2, uuid1).sorted // Uses UUID ordering
274
```
275
276
### Backend-Specific Offset Selection
277
278
Choose appropriate offset type based on backend capabilities:
279
280
```scala
281
// For sequence-based backends (like LevelDB)
282
val sequenceOffset = Sequence(lastProcessedSeqNr)
283
284
// For timestamp-based backends (like Cassandra)
285
val timestampOffset = TimestampOffset(lastProcessedTime, seenSequenceNrs)
286
287
// For UUID-based backends
288
val uuidOffset = TimeBasedUUID(lastProcessedUuid)
289
```
290
291
## Error Handling
292
293
### Offset Validation
294
295
```scala
296
// TimeBasedUUID validation
297
try {
298
val offset = TimeBasedUUID(someUuid)
299
} catch {
300
case e: IllegalArgumentException =>
301
println(s"Invalid UUID: ${e.getMessage}")
302
}
303
304
// TimestampOffset conversion
305
try {
306
val converted = TimestampOffset.toTimestampOffset(someOffset)
307
} catch {
308
case e: IllegalArgumentException =>
309
println(s"Cannot convert offset: ${e.getMessage}")
310
}
311
```
312
313
### Offset Compatibility
314
315
Different backends support different offset types:
316
- **LevelDB**: Primarily `Sequence` offsets
317
- **Cassandra**: `TimestampOffset` with microsecond precision
318
- **Custom backends**: May implement any offset type
319
320
## Types
321
322
```scala { .api }
323
trait Ordered[A] {
324
def compare(that: A): Int
325
}
326
327
case class Range(start: Int, end: Int)
328
329
sealed trait UUIDComparator {
330
def compare(uuid1: UUID, uuid2: UUID): Int
331
}
332
```