or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

event-envelopes.mddocs/

0

# Event Envelopes

1

2

Event envelopes provide metadata wrappers for events in query result streams. They contain the event payload along with persistence information, timestamps, and offset data needed for stream processing and resumption.

3

4

## Capabilities

5

6

### Standard Event Envelope

7

8

Standard event envelope used by untyped query APIs.

9

10

```scala { .api }

11

/**

12

* Event wrapper adding meta data for the events in the result stream of

13

* EventsByTagQuery query, or similar queries.

14

*

15

* The timestamp is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC

16

* (same as System.currentTimeMillis).

17

*/

18

final class EventEnvelope(

19

/** The offset that can be used in next query */

20

val offset: Offset,

21

/** The persistence id of the PersistentActor */

22

val persistenceId: String,

23

/** The sequence number of the event */

24

val sequenceNr: Long,

25

/** The event payload */

26

val event: Any,

27

/** Time when the event was stored (milliseconds since epoch) */

28

val timestamp: Long,

29

/** Optional event metadata */

30

val eventMetadata: Option[Any]

31

) extends Product4[Offset, String, Long, Any] with Serializable {

32

33

/** Java API for accessing event metadata */

34

def getEventMetaData(): Optional[Any]

35

36

/** Create copy with modified fields */

37

def copy(

38

offset: Offset = this.offset,

39

persistenceId: String = this.persistenceId,

40

sequenceNr: Long = this.sequenceNr,

41

event: Any = this.event

42

): EventEnvelope

43

}

44

```

45

46

**Usage Examples:**

47

48

```scala

49

import akka.persistence.query.{EventEnvelope, Sequence}

50

51

// Access envelope properties

52

readJournal

53

.eventsByPersistenceId("user-123", 0L, Long.MaxValue)

54

.runForeach { envelope =>

55

println(s"Persistence ID: ${envelope.persistenceId}")

56

println(s"Sequence Nr: ${envelope.sequenceNr}")

57

println(s"Event: ${envelope.event}")

58

println(s"Offset: ${envelope.offset}")

59

println(s"Timestamp: ${envelope.timestamp}")

60

61

// Check for metadata

62

envelope.eventMetadata.foreach { meta =>

63

println(s"Metadata: $meta")

64

}

65

}

66

67

// Pattern matching

68

readJournal

69

.eventsByTag("user-events", Sequence(0L))

70

.runForeach {

71

case EventEnvelope(offset, persistenceId, seqNr, event) =>

72

println(s"Event $event at $seqNr from $persistenceId")

73

}

74

```

75

76

Java API usage:

77

```java

78

import akka.persistence.query.EventEnvelope;

79

import java.util.Optional;

80

81

readJournal

82

.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)

83

.runForeach(envelope -> {

84

System.out.println("Event: " + envelope.event());

85

86

Optional<Object> metadata = envelope.getEventMetaData();

87

metadata.ifPresent(meta ->

88

System.out.println("Metadata: " + meta));

89

}, system);

90

```

91

92

### Event Envelope Factory

93

94

Factory methods for creating event envelope instances.

95

96

```scala { .api }

97

object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {

98

/** Create envelope with timestamp and metadata */

99

def apply(

100

offset: Offset,

101

persistenceId: String,

102

sequenceNr: Long,

103

event: Any,

104

timestamp: Long,

105

meta: Option[Any]

106

): EventEnvelope

107

108

/** Create envelope with timestamp */

109

def apply(

110

offset: Offset,

111

persistenceId: String,

112

sequenceNr: Long,

113

event: Any,

114

timestamp: Long

115

): EventEnvelope

116

117

/** Pattern matching extractor */

118

def unapply(envelope: EventEnvelope): Option[(Offset, String, Long, Any)]

119

}

120

```

121

122

**Usage Examples:**

123

124

```scala

125

import akka.persistence.query.{EventEnvelope, Sequence}

126

127

// Create envelope with metadata

128

val envelope = EventEnvelope(

129

offset = Sequence(100L),

130

persistenceId = "user-123",

131

sequenceNr = 5L,

132

event = UserCreated("John", "john@example.com"),

133

timestamp = System.currentTimeMillis(),

134

meta = Some(Map("source" -> "registration"))

135

)

136

137

// Create envelope without metadata

138

val simpleEnvelope = EventEnvelope(

139

Sequence(101L),

140

"user-124",

141

1L,

142

UserUpdated("Jane", "jane@example.com"),

143

System.currentTimeMillis()

144

)

145

```

146

147

### Typed Event Envelope

148

149

Enhanced event envelope for typed query APIs with additional metadata fields.

150

151

```scala { .api }

152

/**

153

* Event wrapper adding meta data for the events in the result stream of

154

* EventsBySliceQuery query, or similar queries.

155

*

156

* If the event is not defined it has not been loaded yet. It can be loaded with LoadEventQuery.

157

* It is an improved EventEnvelope compared to the untyped version.

158

*/

159

final class EventEnvelope[Event](

160

/** The offset that can be used in next query */

161

val offset: Offset,

162

/** The persistence id of the entity */

163

val persistenceId: String,

164

/** The sequence number of the event */

165

val sequenceNr: Long,

166

/** The event payload (may be empty if not loaded or filtered) */

167

val eventOption: Option[Event],

168

/** Time when the event was stored (milliseconds since epoch) */

169

val timestamp: Long,

170

/** Optional event metadata */

171

val eventMetadata: Option[Any],

172

/** The entity type for slice-based queries */

173

val entityType: String,

174

/** The slice number for horizontal partitioning */

175

val slice: Int,

176

/** Whether the event was filtered out */

177

val filtered: Boolean,

178

/** Source of the event (e.g., journal identifier) */

179

val source: String,

180

/** Set of tags associated with the event */

181

val tags: Set[String]

182

) {

183

184

/** Get event payload, throwing exception if not loaded or filtered */

185

def event: Event

186

187

/** Java API: Get event payload, throwing exception if not loaded or filtered */

188

def getEvent(): Event

189

190

/** Java API: Get optional event payload */

191

def getOptionalEvent(): Optional[Event]

192

193

/** Java API: Get event metadata */

194

def getEventMetaData(): Optional[AnyRef]

195

196

/** Java API: Get event tags */

197

def getTags(): JSet[String]

198

}

199

```

200

201

**Usage Examples:**

202

203

```scala

204

import akka.persistence.query.typed.EventEnvelope

205

206

// Process typed envelope

207

readJournal

208

.eventsBySlices[UserEvent]("User", 0, 1023, offset)

209

.runForeach { envelope =>

210

println(s"Entity Type: ${envelope.entityType}")

211

println(s"Slice: ${envelope.slice}")

212

println(s"Tags: ${envelope.tags}")

213

214

// Check if event is loaded

215

envelope.eventOption match {

216

case Some(event) => processEvent(event)

217

case None if envelope.filtered => println("Event was filtered")

218

case None => println("Event not loaded, use LoadEventQuery")

219

}

220

}

221

222

// Safe event access

223

def processEnvelope[Event](envelope: EventEnvelope[Event]): Unit = {

224

try {

225

val event = envelope.event

226

println(s"Processing event: $event")

227

} catch {

228

case _: IllegalStateException if envelope.filtered =>

229

println("Event was filtered, payload not available")

230

case _: IllegalStateException =>

231

println("Event not loaded, use LoadEventQuery to load on demand")

232

}

233

}

234

```

235

236

### Typed Event Envelope Factory

237

238

Factory methods for creating typed event envelope instances.

239

240

```scala { .api }

241

object EventEnvelope {

242

/** Scala API: Create typed envelope with all fields */

243

def apply[Event](

244

offset: Offset,

245

persistenceId: String,

246

sequenceNr: Long,

247

event: Event,

248

timestamp: Long,

249

entityType: String,

250

slice: Int,

251

filtered: Boolean,

252

source: String,

253

tags: Set[String]

254

): EventEnvelope[Event]

255

256

/** Scala API: Create typed envelope with minimal fields */

257

def apply[Event](

258

offset: Offset,

259

persistenceId: String,

260

sequenceNr: Long,

261

event: Event,

262

timestamp: Long,

263

entityType: String,

264

slice: Int

265

): EventEnvelope[Event]

266

267

/** Java API: Create typed envelope with all fields */

268

def create[Event](

269

offset: Offset,

270

persistenceId: String,

271

sequenceNr: Long,

272

event: Event,

273

timestamp: Long,

274

entityType: String,

275

slice: Int,

276

filtered: Boolean,

277

source: String,

278

tags: JSet[String]

279

): EventEnvelope[Event]

280

281

/** Pattern matching extractor */

282

def unapply[Event](envelope: EventEnvelope[Event]): Option[(Offset, String, Long, Option[Event], Long)]

283

}

284

```

285

286

**Usage Examples:**

287

288

```scala

289

import akka.persistence.query.typed.EventEnvelope

290

import akka.persistence.query.Sequence

291

292

// Create typed envelope

293

val typedEnvelope = EventEnvelope(

294

offset = Sequence(200L),

295

persistenceId = "user-123",

296

sequenceNr = 10L,

297

event = UserLoggedIn("user-123", timestamp),

298

timestamp = System.currentTimeMillis(),

299

entityType = "User",

300

slice = 42,

301

filtered = false,

302

source = "journal-1",

303

tags = Set("login", "user-event")

304

)

305

306

// Pattern matching

307

typedEnvelope match {

308

case EventEnvelope(offset, persistenceId, seqNr, Some(event), timestamp) =>

309

println(s"Loaded event: $event")

310

case EventEnvelope(offset, persistenceId, seqNr, None, timestamp) =>

311

println("Event not loaded")

312

}

313

```

314

315

## Event Processing Patterns

316

317

### Offset-Based Stream Resumption

318

319

Store envelope offsets for resumable stream processing:

320

321

```scala

322

var lastOffset: Offset = NoOffset

323

324

readJournal

325

.eventsByTag("user-events", lastOffset)

326

.runForeach { envelope =>

327

try {

328

processEvent(envelope.event)

329

lastOffset = envelope.offset

330

saveCheckpoint(lastOffset)

331

} catch {

332

case ex: Exception =>

333

println(s"Failed to process event at ${envelope.offset}: $ex")

334

// Don't update offset on failure

335

}

336

}

337

```

338

339

### Metadata-Based Event Routing

340

341

Use envelope metadata for routing and processing decisions:

342

343

```scala

344

readJournal

345

.eventsByTag("order-events", offset)

346

.runForeach { envelope =>

347

val event = envelope.event

348

val metadata = envelope.eventMetadata

349

350

metadata match {

351

case Some(meta: Map[String, Any]) =>

352

meta.get("priority") match {

353

case Some("high") => priorityQueue.offer(envelope)

354

case _ => standardQueue.offer(envelope)

355

}

356

case _ => standardQueue.offer(envelope)

357

}

358

}

359

```

360

361

### Typed Event Loading

362

363

Load events on demand with typed envelopes:

364

365

```scala

366

def loadAndProcess[Event](envelope: EventEnvelope[Event]): Future[Unit] = {

367

envelope.eventOption match {

368

case Some(event) =>

369

Future.successful(processEvent(event))

370

case None if !envelope.filtered =>

371

// Load event on demand

372

readJournal

373

.asInstanceOf[LoadEventQuery]

374

.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)

375

.map(loadedEnvelope => processEvent(loadedEnvelope.event))

376

case None =>

377

Future.successful(()) // Skip filtered events

378

}

379

}

380

```

381

382

### Slice-Based Processing

383

384

Use slice information for distributed processing:

385

386

```scala

387

def processSliceEvents(slice: Int): Unit = {

388

readJournal

389

.eventsBySlices[MyEvent]("MyEntity", slice, slice, offset)

390

.runForeach { envelope =>

391

println(s"Processing event from slice ${envelope.slice}")

392

println(s"Entity type: ${envelope.entityType}")

393

println(s"Tags: ${envelope.tags.mkString(", ")}")

394

395

processEvent(envelope.event)

396

}

397

}

398

399

// Process events from multiple slices in parallel

400

val slices = readJournal.sliceRanges(4) // Get 4 slice ranges

401

slices.foreach { range =>

402

range.foreach(slice => processSliceEvents(slice))

403

}

404

```

405

406

## Error Handling

407

408

### Event Loading Errors

409

410

Handle cases where events are not loaded or filtered:

411

412

```scala

413

def safeProcessEvent[Event](envelope: EventEnvelope[Event]): Unit = {

414

try {

415

val event = envelope.event

416

processEvent(event)

417

} catch {

418

case ex: IllegalStateException if envelope.filtered =>

419

println(s"Event ${envelope.sequenceNr} was filtered: ${ex.getMessage}")

420

case ex: IllegalStateException =>

421

println(s"Event ${envelope.sequenceNr} not loaded: ${ex.getMessage}")

422

loadEventAsync(envelope.persistenceId, envelope.sequenceNr)

423

}

424

}

425

```

426

427

### Envelope Validation

428

429

Validate envelope contents before processing:

430

431

```scala

432

def validateEnvelope(envelope: EventEnvelope): Boolean = {

433

envelope.persistenceId.nonEmpty &&

434

envelope.sequenceNr > 0 &&

435

envelope.timestamp > 0 &&

436

envelope.offset != null

437

}

438

439

// Filter valid envelopes

440

readJournal

441

.eventsByTag("validated-events", offset)

442

.filter(validateEnvelope)

443

.runForeach(processEvent)

444

```

445

446

## Types

447

448

```scala { .api }

449

trait Product4[+T1, +T2, +T3, +T4] {

450

def _1: T1

451

def _2: T2

452

def _3: T3

453

def _4: T4

454

def productPrefix: String

455

def canEqual(that: Any): Boolean

456

}

457

458

trait Serializable

459

460

case class UserCreated(name: String, email: String)

461

case class UserUpdated(name: String, email: String)

462

case class UserLoggedIn(userId: String, timestamp: Long)

463

464

type JSet[T] = java.util.Set[T]

465

```