Event envelopes provide metadata wrappers for events in query result streams. They contain the event payload along with persistence information, timestamps, and offset data needed for stream processing and resumption.
Standard event envelope used by untyped query APIs.
/**
* Event wrapper adding meta data for the events in the result stream of
* EventsByTagQuery query, or similar queries.
*
* The timestamp is the time the event was stored, in milliseconds since midnight, January 1, 1970 UTC
* (same as System.currentTimeMillis).
*/
final class EventEnvelope(
/** The offset that can be used in next query */
val offset: Offset,
/** The persistence id of the PersistentActor */
val persistenceId: String,
/** The sequence number of the event */
val sequenceNr: Long,
/** The event payload */
val event: Any,
/** Time when the event was stored (milliseconds since epoch) */
val timestamp: Long,
/** Optional event metadata */
val eventMetadata: Option[Any]
) extends Product4[Offset, String, Long, Any] with Serializable {
/** Java API for accessing event metadata */
def getEventMetaData(): Optional[Any]
/** Create copy with modified fields */
def copy(
offset: Offset = this.offset,
persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr,
event: Any = this.event
): EventEnvelope
}Usage Examples:
import akka.persistence.query.{EventEnvelope, Sequence}
// Access envelope properties
readJournal
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
.runForeach { envelope =>
println(s"Persistence ID: ${envelope.persistenceId}")
println(s"Sequence Nr: ${envelope.sequenceNr}")
println(s"Event: ${envelope.event}")
println(s"Offset: ${envelope.offset}")
println(s"Timestamp: ${envelope.timestamp}")
// Check for metadata
envelope.eventMetadata.foreach { meta =>
println(s"Metadata: $meta")
}
}
// Pattern matching
readJournal
.eventsByTag("user-events", Sequence(0L))
.runForeach {
case EventEnvelope(offset, persistenceId, seqNr, event) =>
println(s"Event $event at $seqNr from $persistenceId")
}Java API usage:
import akka.persistence.query.EventEnvelope;
import java.util.Optional;
readJournal
.eventsByPersistenceId("user-123", 0L, Long.MAX_VALUE)
.runForeach(envelope -> {
System.out.println("Event: " + envelope.event());
Optional<Object> metadata = envelope.getEventMetaData();
metadata.ifPresent(meta ->
System.out.println("Metadata: " + meta));
}, system);Factory methods for creating event envelope instances.
object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] {
/** Create envelope with timestamp and metadata */
def apply(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any,
timestamp: Long,
meta: Option[Any]
): EventEnvelope
/** Create envelope with timestamp */
def apply(
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Any,
timestamp: Long
): EventEnvelope
/** Pattern matching extractor */
def unapply(envelope: EventEnvelope): Option[(Offset, String, Long, Any)]
}Usage Examples:
import akka.persistence.query.{EventEnvelope, Sequence}
// Create envelope with metadata
val envelope = EventEnvelope(
offset = Sequence(100L),
persistenceId = "user-123",
sequenceNr = 5L,
event = UserCreated("John", "john@example.com"),
timestamp = System.currentTimeMillis(),
meta = Some(Map("source" -> "registration"))
)
// Create envelope without metadata
val simpleEnvelope = EventEnvelope(
Sequence(101L),
"user-124",
1L,
UserUpdated("Jane", "jane@example.com"),
System.currentTimeMillis()
)Enhanced event envelope for typed query APIs with additional metadata fields.
/**
* Event wrapper adding meta data for the events in the result stream of
* EventsBySliceQuery query, or similar queries.
*
* If the event is not defined it has not been loaded yet. It can be loaded with LoadEventQuery.
* It is an improved EventEnvelope compared to the untyped version.
*/
final class EventEnvelope[Event](
/** The offset that can be used in next query */
val offset: Offset,
/** The persistence id of the entity */
val persistenceId: String,
/** The sequence number of the event */
val sequenceNr: Long,
/** The event payload (may be empty if not loaded or filtered) */
val eventOption: Option[Event],
/** Time when the event was stored (milliseconds since epoch) */
val timestamp: Long,
/** Optional event metadata */
val eventMetadata: Option[Any],
/** The entity type for slice-based queries */
val entityType: String,
/** The slice number for horizontal partitioning */
val slice: Int,
/** Whether the event was filtered out */
val filtered: Boolean,
/** Source of the event (e.g., journal identifier) */
val source: String,
/** Set of tags associated with the event */
val tags: Set[String]
) {
/** Get event payload, throwing exception if not loaded or filtered */
def event: Event
/** Java API: Get event payload, throwing exception if not loaded or filtered */
def getEvent(): Event
/** Java API: Get optional event payload */
def getOptionalEvent(): Optional[Event]
/** Java API: Get event metadata */
def getEventMetaData(): Optional[AnyRef]
/** Java API: Get event tags */
def getTags(): JSet[String]
}Usage Examples:
import akka.persistence.query.typed.EventEnvelope
// Process typed envelope
readJournal
.eventsBySlices[UserEvent]("User", 0, 1023, offset)
.runForeach { envelope =>
println(s"Entity Type: ${envelope.entityType}")
println(s"Slice: ${envelope.slice}")
println(s"Tags: ${envelope.tags}")
// Check if event is loaded
envelope.eventOption match {
case Some(event) => processEvent(event)
case None if envelope.filtered => println("Event was filtered")
case None => println("Event not loaded, use LoadEventQuery")
}
}
// Safe event access
def processEnvelope[Event](envelope: EventEnvelope[Event]): Unit = {
try {
val event = envelope.event
println(s"Processing event: $event")
} catch {
case _: IllegalStateException if envelope.filtered =>
println("Event was filtered, payload not available")
case _: IllegalStateException =>
println("Event not loaded, use LoadEventQuery to load on demand")
}
}Factory methods for creating typed event envelope instances.
object EventEnvelope {
/** Scala API: Create typed envelope with all fields */
def apply[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int,
filtered: Boolean,
source: String,
tags: Set[String]
): EventEnvelope[Event]
/** Scala API: Create typed envelope with minimal fields */
def apply[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int
): EventEnvelope[Event]
/** Java API: Create typed envelope with all fields */
def create[Event](
offset: Offset,
persistenceId: String,
sequenceNr: Long,
event: Event,
timestamp: Long,
entityType: String,
slice: Int,
filtered: Boolean,
source: String,
tags: JSet[String]
): EventEnvelope[Event]
/** Pattern matching extractor */
def unapply[Event](envelope: EventEnvelope[Event]): Option[(Offset, String, Long, Option[Event], Long)]
}Usage Examples:
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.Sequence
// Create typed envelope
val typedEnvelope = EventEnvelope(
offset = Sequence(200L),
persistenceId = "user-123",
sequenceNr = 10L,
event = UserLoggedIn("user-123", timestamp),
timestamp = System.currentTimeMillis(),
entityType = "User",
slice = 42,
filtered = false,
source = "journal-1",
tags = Set("login", "user-event")
)
// Pattern matching
typedEnvelope match {
case EventEnvelope(offset, persistenceId, seqNr, Some(event), timestamp) =>
println(s"Loaded event: $event")
case EventEnvelope(offset, persistenceId, seqNr, None, timestamp) =>
println("Event not loaded")
}Store envelope offsets for resumable stream processing:
var lastOffset: Offset = NoOffset
readJournal
.eventsByTag("user-events", lastOffset)
.runForeach { envelope =>
try {
processEvent(envelope.event)
lastOffset = envelope.offset
saveCheckpoint(lastOffset)
} catch {
case ex: Exception =>
println(s"Failed to process event at ${envelope.offset}: $ex")
// Don't update offset on failure
}
}Use envelope metadata for routing and processing decisions:
readJournal
.eventsByTag("order-events", offset)
.runForeach { envelope =>
val event = envelope.event
val metadata = envelope.eventMetadata
metadata match {
case Some(meta: Map[String, Any]) =>
meta.get("priority") match {
case Some("high") => priorityQueue.offer(envelope)
case _ => standardQueue.offer(envelope)
}
case _ => standardQueue.offer(envelope)
}
}Load events on demand with typed envelopes:
def loadAndProcess[Event](envelope: EventEnvelope[Event]): Future[Unit] = {
envelope.eventOption match {
case Some(event) =>
Future.successful(processEvent(event))
case None if !envelope.filtered =>
// Load event on demand
readJournal
.asInstanceOf[LoadEventQuery]
.loadEnvelope[Event](envelope.persistenceId, envelope.sequenceNr)
.map(loadedEnvelope => processEvent(loadedEnvelope.event))
case None =>
Future.successful(()) // Skip filtered events
}
}Use slice information for distributed processing:
def processSliceEvents(slice: Int): Unit = {
readJournal
.eventsBySlices[MyEvent]("MyEntity", slice, slice, offset)
.runForeach { envelope =>
println(s"Processing event from slice ${envelope.slice}")
println(s"Entity type: ${envelope.entityType}")
println(s"Tags: ${envelope.tags.mkString(", ")}")
processEvent(envelope.event)
}
}
// Process events from multiple slices in parallel
val slices = readJournal.sliceRanges(4) // Get 4 slice ranges
slices.foreach { range =>
range.foreach(slice => processSliceEvents(slice))
}Handle cases where events are not loaded or filtered:
def safeProcessEvent[Event](envelope: EventEnvelope[Event]): Unit = {
try {
val event = envelope.event
processEvent(event)
} catch {
case ex: IllegalStateException if envelope.filtered =>
println(s"Event ${envelope.sequenceNr} was filtered: ${ex.getMessage}")
case ex: IllegalStateException =>
println(s"Event ${envelope.sequenceNr} not loaded: ${ex.getMessage}")
loadEventAsync(envelope.persistenceId, envelope.sequenceNr)
}
}Validate envelope contents before processing:
def validateEnvelope(envelope: EventEnvelope): Boolean = {
envelope.persistenceId.nonEmpty &&
envelope.sequenceNr > 0 &&
envelope.timestamp > 0 &&
envelope.offset != null
}
// Filter valid envelopes
readJournal
.eventsByTag("validated-events", offset)
.filter(validateEnvelope)
.runForeach(processEvent)trait Product4[+T1, +T2, +T3, +T4] {
def _1: T1
def _2: T2
def _3: T3
def _4: T4
def productPrefix: String
def canEqual(that: Any): Boolean
}
trait Serializable
case class UserCreated(name: String, email: String)
case class UserUpdated(name: String, email: String)
case class UserLoggedIn(userId: String, timestamp: Long)
type JSet[T] = java.util.Set[T]