Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-query_2-13@2.8.00
# Akka Persistence Query
1
2
Akka Persistence Query provides a universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends. It enables building reactive applications, CQRS systems, and event processors that can process persistent event streams with high throughput and reliability.
3
4
## Package Information
5
6
- **Package Name**: akka-persistence-query_2.13
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % "2.8.8"`
10
11
## Core Imports
12
13
```scala
14
import akka.persistence.query.PersistenceQuery
15
import akka.persistence.query.scaladsl.ReadJournal
16
import akka.persistence.query.{EventEnvelope, Offset}
17
```
18
19
Java API:
20
```java
21
import akka.persistence.query.PersistenceQuery;
22
import akka.persistence.query.javadsl.ReadJournal;
23
import akka.persistence.query.EventEnvelope;
24
import akka.persistence.query.Offset;
25
```
26
27
## Basic Usage
28
29
```scala
30
import akka.actor.ActorSystem
31
import akka.persistence.query.PersistenceQuery
32
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
33
import akka.stream.scaladsl.Sink
34
35
implicit val system: ActorSystem = ActorSystem("example")
36
37
// Get read journal for querying
38
val readJournal = PersistenceQuery(system)
39
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
40
41
// Query events by persistence ID
42
readJournal
43
.eventsByPersistenceId("user-123", 0L, Long.MaxValue)
44
.runWith(Sink.foreach { envelope =>
45
println(s"Event: ${envelope.event} at ${envelope.sequenceNr}")
46
})
47
48
// Query events by tag
49
readJournal
50
.eventsByTag("user-events", Offset.noOffset)
51
.runWith(Sink.foreach { envelope =>
52
println(s"Tagged event: ${envelope.event}")
53
})
54
```
55
56
## Architecture
57
58
Akka Persistence Query is built around several key components:
59
60
- **Read Journal Interface**: Pluggable backend implementations for different storage systems
61
- **Query API**: Standardized interface for event and state queries across different journal implementations
62
- **Offset System**: Position tracking mechanism for resumable stream consumption
63
- **Event Envelopes**: Metadata wrappers containing events, offsets, timestamps, and persistence information
64
- **Streaming Integration**: Built on Akka Streams for backpressure-aware event processing
65
- **Dual API**: Both untyped (original) and typed APIs with Scala and Java variants
66
67
## Capabilities
68
69
### Extension and Configuration
70
71
Main extension entry point for obtaining read journal instances from configured plugins.
72
73
```scala { .api }
74
object PersistenceQuery extends ExtensionId[PersistenceQuery] {
75
def get(system: ActorSystem): PersistenceQuery
76
def get(system: ClassicActorSystemProvider): PersistenceQuery
77
}
78
79
class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
80
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
81
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
82
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
83
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
84
}
85
```
86
87
[Extension and Configuration](./extension.md)
88
89
### Offset Management
90
91
Position tracking system for resumable queries and stream consumption.
92
93
```scala { .api }
94
abstract class Offset
95
96
case class Sequence(value: Long) extends Offset with Ordered[Sequence]
97
case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID]
98
case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset
99
case object NoOffset extends Offset
100
101
object Offset {
102
def noOffset: Offset
103
def sequence(value: Long): Offset
104
def timeBasedUUID(uuid: UUID): Offset
105
def timestamp(instant: Instant): TimestampOffset
106
}
107
```
108
109
[Offset Management](./offsets.md)
110
111
### Event Envelopes
112
113
Event wrapper classes providing metadata for streamed events.
114
115
```scala { .api }
116
final class EventEnvelope(
117
val offset: Offset,
118
val persistenceId: String,
119
val sequenceNr: Long,
120
val event: Any,
121
val timestamp: Long,
122
val eventMetadata: Option[Any]
123
) extends Product4[Offset, String, Long, Any]
124
```
125
126
[Event Envelopes](./event-envelopes.md)
127
128
### Untyped Query API
129
130
Original query interface for event and persistence ID queries.
131
132
```scala { .api }
133
trait ReadJournal
134
135
trait EventsByPersistenceIdQuery extends ReadJournal {
136
def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]
137
}
138
139
trait EventsByTagQuery extends ReadJournal {
140
def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]
141
}
142
143
trait PersistenceIdsQuery extends ReadJournal {
144
def persistenceIds(): Source[String, NotUsed]
145
}
146
```
147
148
[Untyped Query API](./untyped-queries.md)
149
150
### Typed Query API
151
152
Enhanced type-safe query interface with improved event envelopes and slice-based querying.
153
154
```scala { .api }
155
final class EventEnvelope[Event](
156
val offset: Offset,
157
val persistenceId: String,
158
val sequenceNr: Long,
159
val eventOption: Option[Event],
160
val timestamp: Long,
161
val eventMetadata: Option[Any],
162
val entityType: String,
163
val slice: Int,
164
val filtered: Boolean,
165
val source: String,
166
val tags: Set[String]
167
) {
168
/** Get the event value, throwing exception if not present or filtered */
169
def event: Event
170
171
/** Java API: Get the event value, throwing exception if not present or filtered */
172
def getEvent(): Event
173
174
/** Java API: Get the optional event value */
175
def getOptionalEvent(): Optional[Event]
176
177
/** Java API: Get the event metadata */
178
def getEventMetaData(): Optional[AnyRef]
179
180
/** Java API: Get the tags */
181
def getTags(): java.util.Set[String]
182
}
183
184
trait EventsBySliceQuery extends ReadJournal {
185
def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]
186
def sliceForPersistenceId(persistenceId: String): Int
187
def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]
188
}
189
```
190
191
[Typed Query API](./typed-queries.md)
192
193
### Durable State Queries
194
195
Query interface for durable state changes and persistence.
196
197
```scala { .api }
198
sealed trait DurableStateChange[A] {
199
def persistenceId: String
200
def offset: Offset
201
}
202
203
final class UpdatedDurableState[A](
204
val persistenceId: String,
205
val revision: Long,
206
val value: A,
207
override val offset: Offset,
208
val timestamp: Long
209
) extends DurableStateChange[A]
210
211
trait DurableStateStoreQuery[A] extends DurableStateStore[A] {
212
def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
213
def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]
214
}
215
```
216
217
[Durable State Queries](./durable-state.md)
218
219
### Journal Implementations
220
221
Concrete read journal implementations for specific storage backends.
222
223
```scala { .api }
224
// LevelDB Implementation (deprecated)
225
class LeveldbReadJournal extends ReadJournal
226
with PersistenceIdsQuery
227
with CurrentPersistenceIdsQuery
228
with EventsByPersistenceIdQuery
229
with CurrentEventsByPersistenceIdQuery
230
with EventsByTagQuery
231
with CurrentEventsByTagQuery
232
233
object LeveldbReadJournal {
234
val Identifier = "akka.persistence.query.journal.leveldb"
235
}
236
237
// Firehose Implementation
238
class EventsBySliceFirehoseQuery extends ReadJournal
239
with EventsBySliceQuery
240
with EventsBySliceStartingFromSnapshotsQuery
241
with EventTimestampQuery
242
with LoadEventQuery
243
244
object EventsBySliceFirehoseQuery {
245
val Identifier = "akka.persistence.query.events-by-slice-firehose"
246
}
247
```
248
249
[Journal Implementations](./journal-implementations.md)
250
251
## Common Query Patterns
252
253
- **Live Streaming**: Use `eventsByPersistenceId` and `eventsByTag` for continuous event processing
254
- **Finite Queries**: Use `currentEventsByPersistenceId` and `currentEventsByTag` for batch processing
255
- **Horizontal Scaling**: Use typed slice-based queries for distributed event processing
256
- **State Reconstruction**: Combine snapshot and event queries for efficient state rebuilding
257
- **CQRS Read Models**: Build projections using event streams with offset tracking
258
259
## Types
260
261
```scala { .api }
262
/**
263
* A query plugin must implement a class that implements this trait.
264
* It provides the concrete implementations for the Java and Scala APIs.
265
*
266
* A read journal plugin must provide implementations for both
267
* scaladsl.ReadJournal and javadsl.ReadJournal.
268
*/
269
trait ReadJournalProvider {
270
/**
271
* The ReadJournal implementation for the Scala API.
272
* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
273
*/
274
def scaladslReadJournal(): scaladsl.ReadJournal
275
276
/**
277
* The ReadJournal implementation for the Java API.
278
* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
279
*/
280
def javadslReadJournal(): javadsl.ReadJournal
281
}
282
283
final class DeletedDurableState[A](
284
val persistenceId: String,
285
val revision: Long,
286
override val offset: Offset,
287
val timestamp: Long
288
) extends DurableStateChange[A]
289
290
object TimestampOffset {
291
val Zero: TimestampOffset
292
def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset
293
def toTimestampOffset(offset: Offset): TimestampOffset
294
}
295
296
import java.util.Optional
297
import java.time.Instant
298
import scala.collection.immutable
299
```