0
# Event Envelopes
1
2
Event envelopes provide metadata wrappers for events in query result streams. They contain the event payload along with persistence information, timestamps, and offset data needed for stream processing and resumption.
3
4
## Capabilities
5
6
### Standard Event Envelope
7
8
Standard event envelope used by untyped query APIs.
9
10
```scala { .api }
11
/**
12
* Event wrapper adding meta data for the events in the result stream of
13
* EventsByTagQuery query, or similar queries.
14
*
15
* The timestamp is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC
16
* (same as System.currentTimeMillis).
17
*/
18
final class EventEnvelope(
19
/** The offset that can be used in next query */
20
val offset: Offset,
21
/** The persistence id of the PersistentActor */
22
val persistenceId: String,
23
/** The sequence number of the event */
24
val sequenceNr: Long,
25
/** The event payload */
26
val event: Any,
27
/** Time when the event was stored (milliseconds since epoch) */
28
val timestamp: Long,
29
/** Optional event metadata */
30
val eventMetadata: Option[Any]
31
) extends Product4[Offset, String, Long, Any] with Serializable {
32
33
/** Java API for accessing event metadata */
34
def getEventMetaData(): Optional[Any]
35
36
/** Create copy with modified fields */
37
def copy(
38
offset: Offset = this.offset,
39
persistenceId: String = this.persistenceId,
40
sequenceNr: Long = this.sequenceNr,
41
event: Any = this.event
42
): EventEnvelope
43
}
44
```
45
46
**Usage Examples:**
47
48
```scala
49
import akka.persistence.query.{EventEnvelope, Sequence}
50
51
// Access envelope properties
52
readJournal
53
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
54
.runForeach { envelope =>
55
println(s"Persistence ID: ${envelope.persistenceId}")
56
println(s"Sequence Nr: ${envelope.sequenceNr}")
57
println(s"Event: ${envelope.event}")
58
println(s"Offset: ${envelope.offset}")
59
println(s"Timestamp: ${envelope.timestamp}")
60
61
// Check for metadata
62
envelope.eventMetadata.foreach { meta =>
63
println(s"Metadata: $meta")
64
}
65
}
66
67
// Pattern matching
68
readJournal
69
.eventsByTag("user-events", Sequence(0L))
70
.runForeach {
71
case EventEnvelope(offset, persistenceId, seqNr, event) =>
72
println(s"Event $event at $seqNr from $persistenceId")
73
}
74
```
75
76
Java API usage:
77
```java
78
import akka.persistence.query.EventEnvelope;
79
import java.util.Optional;
80
81
readJournal
82
.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)
83
.runForeach(envelope -> {
84
System.out.println("Event: " + envelope.event());
85
86
Optional<Object> metadata = envelope.getEventMetaData();
87
metadata.ifPresent(meta ->
88
System.out.println("Metadata: " + meta));
89
}, system);
90
```
91
92
### Event Envelope Factory
93
94
Factory methods for creating event envelope instances.
95
96
```scala { .api }
97
object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {
98
/** Create envelope with timestamp and metadata */
99
def apply(
100
offset: Offset,
101
persistenceId: String,
102
sequenceNr: Long,
103
event: Any,
104
timestamp: Long,
105
meta: Option[Any]
106
): EventEnvelope
107
108
/** Create envelope with timestamp */
109
def apply(
110
offset: Offset,
111
persistenceId: String,
112
sequenceNr: Long,
113
event: Any,
114
timestamp: Long
115
): EventEnvelope
116
117
/** Pattern matching extractor */
118
def unapply(envelope: EventEnvelope): Option[(Offset, String, Long, Any)]
119
}
120
```
121
122
**Usage Examples:**
123
124
```scala
125
import akka.persistence.query.{EventEnvelope, Sequence}
126
127
// Create envelope with metadata
128
val envelope = EventEnvelope(
129
offset = Sequence(100L),
130
persistenceId = "user-123",
131
sequenceNr = 5L,
132
event = UserCreated("John", "john@example.com"),
133
timestamp = System.currentTimeMillis(),
134
meta = Some(Map("source" -> "registration"))
135
)
136
137
// Create envelope without metadata
138
val simpleEnvelope = EventEnvelope(
139
Sequence(101L),
140
"user-124",
141
1L,
142
UserUpdated("Jane", "jane@example.com"),
143
System.currentTimeMillis()
144
)
145
```
146
147
### Typed Event Envelope
148
149
Enhanced event envelope for typed query APIs with additional metadata fields.
150
151
```scala { .api }
152
/**
153
* Event wrapper adding meta data for the events in the result stream of
154
* EventsBySliceQuery query, or similar queries.
155
*
156
* If the event is not defined it has not been loaded yet. It can be loaded with LoadEventQuery.
157
* It is an improved EventEnvelope compared to the untyped version.
158
*/
159
final class EventEnvelope[Event](
160
/** The offset that can be used in next query */
161
val offset: Offset,
162
/** The persistence id of the entity */
163
val persistenceId: String,
164
/** The sequence number of the event */
165
val sequenceNr: Long,
166
/** The event payload (may be empty if not loaded or filtered) */
167
val eventOption: Option[Event],
168
/** Time when the event was stored (milliseconds since epoch) */
169
val timestamp: Long,
170
/** Optional event metadata */
171
val eventMetadata: Option[Any],
172
/** The entity type for slice-based queries */
173
val entityType: String,
174
/** The slice number for horizontal partitioning */
175
val slice: Int,
176
/** Whether the event was filtered out */
177
val filtered: Boolean,
178
/** Source of the event (e.g., journal identifier) */
179
val source: String,
180
/** Set of tags associated with the event */
181
val tags: Set[String]
182
) {
183
184
/** Get event payload, throwing exception if not loaded or filtered */
185
def event: Event
186
187
/** Java API: Get event payload, throwing exception if not loaded or filtered */
188
def getEvent(): Event
189
190
/** Java API: Get optional event payload */
191
def getOptionalEvent(): Optional[Event]
192
193
/** Java API: Get event metadata */
194
def getEventMetaData(): Optional[AnyRef]
195
196
/** Java API: Get event tags */
197
def getTags(): JSet[String]
198
}
199
```
200
201
**Usage Examples:**
202
203
```scala
204
import akka.persistence.query.typed.EventEnvelope
205
206
// Process typed envelope
207
readJournal
208
.eventsBySlices[UserEvent]("User", 0, 1023, offset)
209
.runForeach { envelope =>
210
println(s"Entity Type: ${envelope.entityType}")
211
println(s"Slice: ${envelope.slice}")
212
println(s"Tags: ${envelope.tags}")
213
214
// Check if event is loaded
215
envelope.eventOption match {
216
case Some(event) => processEvent(event)
217
case None if envelope.filtered => println("Event was filtered")
218
case None => println("Event not loaded, use LoadEventQuery")
219
}
220
}
221
222
// Safe event access
223
def processEnvelope[Event](envelope: EventEnvelope[Event]): Unit = {
224
try {
225
val event = envelope.event
226
println(s"Processing event: $event")
227
} catch {
228
case _: IllegalStateException if envelope.filtered =>
229
println("Event was filtered, payload not available")
230
case _: IllegalStateException =>
231
println("Event not loaded, use LoadEventQuery to load on demand")
232
}
233
}
234
```
235
236
### Typed Event Envelope Factory
237
238
Factory methods for creating typed event envelope instances.
239
240
```scala { .api }
241
object EventEnvelope {
242
/** Scala API: Create typed envelope with all fields */
243
def apply[Event](
244
offset: Offset,
245
persistenceId: String,
246
sequenceNr: Long,
247
event: Event,
248
timestamp: Long,
249
entityType: String,
250
slice: Int,
251
filtered: Boolean,
252
source: String,
253
tags: Set[String]
254
): EventEnvelope[Event]
255
256
/** Scala API: Create typed envelope with minimal fields */
257
def apply[Event](
258
offset: Offset,
259
persistenceId: String,
260
sequenceNr: Long,
261
event: Event,
262
timestamp: Long,
263
entityType: String,
264
slice: Int
265
): EventEnvelope[Event]
266
267
/** Java API: Create typed envelope with all fields */
268
def create[Event](
269
offset: Offset,
270
persistenceId: String,
271
sequenceNr: Long,
272
event: Event,
273
timestamp: Long,
274
entityType: String,
275
slice: Int,
276
filtered: Boolean,
277
source: String,
278
tags: JSet[String]
279
): EventEnvelope[Event]
280
281
/** Pattern matching extractor */
282
def unapply[Event](envelope: EventEnvelope[Event]): Option[(Offset, String, Long, Option[Event], Long)]
283
}
284
```
285
286
**Usage Examples:**
287
288
```scala
289
import akka.persistence.query.typed.EventEnvelope
290
import akka.persistence.query.Sequence
291
292
// Create typed envelope
293
val typedEnvelope = EventEnvelope(
294
offset = Sequence(200L),
295
persistenceId = "user-123",
296
sequenceNr = 10L,
297
event = UserLoggedIn("user-123", timestamp),
298
timestamp = System.currentTimeMillis(),
299
entityType = "User",
300
slice = 42,
301
filtered = false,
302
source = "journal-1",
303
tags = Set("login", "user-event")
304
)
305
306
// Pattern matching
307
typedEnvelope match {
308
case EventEnvelope(offset, persistenceId, seqNr, Some(event), timestamp) =>
309
println(s"Loaded event: $event")
310
case EventEnvelope(offset, persistenceId, seqNr, None, timestamp) =>
311
println("Event not loaded")
312
}
313
```
314
315
## Event Processing Patterns
316
317
### Offset-Based Stream Resumption
318
319
Store envelope offsets for resumable stream processing:
320
321
```scala
322
var lastOffset: Offset = NoOffset
323
324
readJournal
325
.eventsByTag("user-events", lastOffset)
326
.runForeach { envelope =>
327
try {
328
processEvent(envelope.event)
329
lastOffset = envelope.offset
330
saveCheckpoint(lastOffset)
331
} catch {
332
case ex: Exception =>
333
println(s"Failed to process event at ${envelope.offset}: $ex")
334
// Don't update offset on failure
335
}
336
}
337
```
338
339
### Metadata-Based Event Routing
340
341
Use envelope metadata for routing and processing decisions:
342
343
```scala
344
readJournal
345
.eventsByTag("order-events", offset)
346
.runForeach { envelope =>
347
val event = envelope.event
348
val metadata = envelope.eventMetadata
349
350
metadata match {
351
case Some(meta: Map[String, Any]) =>
352
meta.get("priority") match {
353
case Some("high") => priorityQueue.offer(envelope)
354
case _ => standardQueue.offer(envelope)
355
}
356
case _ => standardQueue.offer(envelope)
357
}
358
}
359
```
360
361
### Typed Event Loading
362
363
Load events on demand with typed envelopes:
364
365
```scala
366
def loadAndProcess[Event](envelope: EventEnvelope[Event]): Future[Unit] = {
367
envelope.eventOption match {
368
case Some(event) =>
369
Future.successful(processEvent(event))
370
case None if !envelope.filtered =>
371
// Load event on demand
372
readJournal
373
.asInstanceOf[LoadEventQuery]
374
.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)
375
.map(loadedEnvelope => processEvent(loadedEnvelope.event))
376
case None =>
377
Future.successful(()) // Skip filtered events
378
}
379
}
380
```
381
382
### Slice-Based Processing
383
384
Use slice information for distributed processing:
385
386
```scala
387
def processSliceEvents(slice: Int): Unit = {
388
readJournal
389
.eventsBySlices[MyEvent]("MyEntity", slice, slice, offset)
390
.runForeach { envelope =>
391
println(s"Processing event from slice ${envelope.slice}")
392
println(s"Entity type: ${envelope.entityType}")
393
println(s"Tags: ${envelope.tags.mkString(", ")}")
394
395
processEvent(envelope.event)
396
}
397
}
398
399
// Process events from multiple slices in parallel
400
val slices = readJournal.sliceRanges(4) // Get 4 slice ranges
401
slices.foreach { range =>
402
range.foreach(slice => processSliceEvents(slice))
403
}
404
```
405
406
## Error Handling
407
408
### Event Loading Errors
409
410
Handle cases where events are not loaded or filtered:
411
412
```scala
413
def safeProcessEvent[Event](envelope: EventEnvelope[Event]): Unit = {
414
try {
415
val event = envelope.event
416
processEvent(event)
417
} catch {
418
case ex: IllegalStateException if envelope.filtered =>
419
println(s"Event ${envelope.sequenceNr} was filtered: ${ex.getMessage}")
420
case ex: IllegalStateException =>
421
println(s"Event ${envelope.sequenceNr} not loaded: ${ex.getMessage}")
422
loadEventAsync(envelope.persistenceId, envelope.sequenceNr)
423
}
424
}
425
```
426
427
### Envelope Validation
428
429
Validate envelope contents before processing:
430
431
```scala
432
def validateEnvelope(envelope: EventEnvelope): Boolean = {
433
envelope.persistenceId.nonEmpty &&
434
envelope.sequenceNr > 0 &&
435
envelope.timestamp > 0 &&
436
envelope.offset != null
437
}
438
439
// Filter valid envelopes
440
readJournal
441
.eventsByTag("validated-events", offset)
442
.filter(validateEnvelope)
443
.runForeach(processEvent)
444
```
445
446
## Types
447
448
```scala { .api }
449
trait Product4[+T1, +T2, +T3, +T4] {
450
def _1: T1
451
def _2: T2
452
def _3: T3
453
def _4: T4
454
def productPrefix: String
455
def canEqual(that: Any): Boolean
456
}
457
458
trait Serializable
459
460
case class UserCreated(name: String, email: String)
461
case class UserUpdated(name: String, email: String)
462
case class UserLoggedIn(userId: String, timestamp: Long)
463
464
type JSet[T] = java.util.Set[T]
465
```