or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-com-typesafe-akka--akka-persistence-query_2-13

Universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends with support for CQRS and event sourcing patterns.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-persistence-query_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-persistence-query_2-13@2.8.0

0

# Akka Persistence Query

1

2

Akka Persistence Query provides a universal asynchronous stream-based query interface for querying persisted events and state changes from various journal backends. It enables building reactive applications, CQRS systems, and event processors that can process persistent event streams with high throughput and reliability.

3

4

## Package Information

5

6

- **Package Name**: akka-persistence-query_2.13

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-persistence-query" % "2.8.8"`

10

11

## Core Imports

12

13

```scala

14

import akka.persistence.query.PersistenceQuery

15

import akka.persistence.query.scaladsl.ReadJournal

16

import akka.persistence.query.{EventEnvelope, Offset}

17

```

18

19

Java API:

20

```java

21

import akka.persistence.query.PersistenceQuery;

22

import akka.persistence.query.javadsl.ReadJournal;

23

import akka.persistence.query.EventEnvelope;

24

import akka.persistence.query.Offset;

25

```

26

27

## Basic Usage

28

29

```scala

30

import akka.actor.ActorSystem

31

import akka.persistence.query.PersistenceQuery

32

import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal

33

import akka.stream.scaladsl.Sink

34

35

implicit val system: ActorSystem = ActorSystem("example")

36

37

// Get read journal for querying

38

val readJournal = PersistenceQuery(system)

39

.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

40

41

// Query events by persistence ID

42

readJournal

43

.eventsByPersistenceId("user-123", 0L, Long.MaxValue)

44

.runWith(Sink.foreach { envelope =>

45

println(s"Event: ${envelope.event} at ${envelope.sequenceNr}")

46

})

47

48

// Query events by tag

49

readJournal

50

.eventsByTag("user-events", Offset.noOffset)

51

.runWith(Sink.foreach { envelope =>

52

println(s"Tagged event: ${envelope.event}")

53

})

54

```

55

56

## Architecture

57

58

Akka Persistence Query is built around several key components:

59

60

- **Read Journal Interface**: Pluggable backend implementations for different storage systems

61

- **Query API**: Standardized interface for event and state queries across different journal implementations

62

- **Offset System**: Position tracking mechanism for resumable stream consumption

63

- **Event Envelopes**: Metadata wrappers containing events, offsets, timestamps, and persistence information

64

- **Streaming Integration**: Built on Akka Streams for backpressure-aware event processing

65

- **Dual API**: Both untyped (original) and typed APIs with Scala and Java variants

66

67

## Capabilities

68

69

### Extension and Configuration

70

71

Main extension entry point for obtaining read journal instances from configured plugins.

72

73

```scala { .api }

74

object PersistenceQuery extends ExtensionId[PersistenceQuery] {

75

def get(system: ActorSystem): PersistenceQuery

76

def get(system: ClassicActorSystemProvider): PersistenceQuery

77

}

78

79

class PersistenceQuery(system: ExtendedActorSystem) extends Extension {

80

def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T

81

def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T

82

def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T

83

def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T

84

}

85

```

86

87

[Extension and Configuration](./extension.md)

88

89

### Offset Management

90

91

Position tracking system for resumable queries and stream consumption.

92

93

```scala { .api }

94

abstract class Offset

95

96

case class Sequence(value: Long) extends Offset with Ordered[Sequence]

97

case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID]

98

case class TimestampOffset(timestamp: Instant, readTimestamp: Instant, seen: Map[String, Long]) extends Offset

99

case object NoOffset extends Offset

100

101

object Offset {

102

def noOffset: Offset

103

def sequence(value: Long): Offset

104

def timeBasedUUID(uuid: UUID): Offset

105

def timestamp(instant: Instant): TimestampOffset

106

}

107

```

108

109

[Offset Management](./offsets.md)

110

111

### Event Envelopes

112

113

Event wrapper classes providing metadata for streamed events.

114

115

```scala { .api }

116

final class EventEnvelope(

117

val offset: Offset,

118

val persistenceId: String,

119

val sequenceNr: Long,

120

val event: Any,

121

val timestamp: Long,

122

val eventMetadata: Option[Any]

123

) extends Product4[Offset, String, Long, Any]

124

```

125

126

[Event Envelopes](./event-envelopes.md)

127

128

### Untyped Query API

129

130

Original query interface for event and persistence ID queries.

131

132

```scala { .api }

133

trait ReadJournal

134

135

trait EventsByPersistenceIdQuery extends ReadJournal {

136

def eventsByPersistenceId(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed]

137

}

138

139

trait EventsByTagQuery extends ReadJournal {

140

def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed]

141

}

142

143

trait PersistenceIdsQuery extends ReadJournal {

144

def persistenceIds(): Source[String, NotUsed]

145

}

146

```

147

148

[Untyped Query API](./untyped-queries.md)

149

150

### Typed Query API

151

152

Enhanced type-safe query interface with improved event envelopes and slice-based querying.

153

154

```scala { .api }

155

final class EventEnvelope[Event](

156

val offset: Offset,

157

val persistenceId: String,

158

val sequenceNr: Long,

159

val eventOption: Option[Event],

160

val timestamp: Long,

161

val eventMetadata: Option[Any],

162

val entityType: String,

163

val slice: Int,

164

val filtered: Boolean,

165

val source: String,

166

val tags: Set[String]

167

) {

168

/** Get the event value, throwing exception if not present or filtered */

169

def event: Event

170

171

/** Java API: Get the event value, throwing exception if not present or filtered */

172

def getEvent(): Event

173

174

/** Java API: Get the optional event value */

175

def getOptionalEvent(): Optional[Event]

176

177

/** Java API: Get the event metadata */

178

def getEventMetaData(): Optional[AnyRef]

179

180

/** Java API: Get the tags */

181

def getTags(): java.util.Set[String]

182

}

183

184

trait EventsBySliceQuery extends ReadJournal {

185

def eventsBySlices[Event](entityType: String, minSlice: Int, maxSlice: Int, offset: Offset): Source[EventEnvelope[Event], NotUsed]

186

def sliceForPersistenceId(persistenceId: String): Int

187

def sliceRanges(numberOfRanges: Int): immutable.Seq[Range]

188

}

189

```

190

191

[Typed Query API](./typed-queries.md)

192

193

### Durable State Queries

194

195

Query interface for durable state changes and persistence.

196

197

```scala { .api }

198

sealed trait DurableStateChange[A] {

199

def persistenceId: String

200

def offset: Offset

201

}

202

203

final class UpdatedDurableState[A](

204

val persistenceId: String,

205

val revision: Long,

206

val value: A,

207

override val offset: Offset,

208

val timestamp: Long

209

) extends DurableStateChange[A]

210

211

trait DurableStateStoreQuery[A] extends DurableStateStore[A] {

212

def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]

213

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed]

214

}

215

```

216

217

[Durable State Queries](./durable-state.md)

218

219

### Journal Implementations

220

221

Concrete read journal implementations for specific storage backends.

222

223

```scala { .api }

224

// LevelDB Implementation (deprecated)

225

class LeveldbReadJournal extends ReadJournal

226

with PersistenceIdsQuery

227

with CurrentPersistenceIdsQuery

228

with EventsByPersistenceIdQuery

229

with CurrentEventsByPersistenceIdQuery

230

with EventsByTagQuery

231

with CurrentEventsByTagQuery

232

233

object LeveldbReadJournal {

234

val Identifier = "akka.persistence.query.journal.leveldb"

235

}

236

237

// Firehose Implementation

238

class EventsBySliceFirehoseQuery extends ReadJournal

239

with EventsBySliceQuery

240

with EventsBySliceStartingFromSnapshotsQuery

241

with EventTimestampQuery

242

with LoadEventQuery

243

244

object EventsBySliceFirehoseQuery {

245

val Identifier = "akka.persistence.query.events-by-slice-firehose"

246

}

247

```

248

249

[Journal Implementations](./journal-implementations.md)

250

251

## Common Query Patterns

252

253

- **Live Streaming**: Use `eventsByPersistenceId` and `eventsByTag` for continuous event processing

254

- **Finite Queries**: Use `currentEventsByPersistenceId` and `currentEventsByTag` for batch processing

255

- **Horizontal Scaling**: Use typed slice-based queries for distributed event processing

256

- **State Reconstruction**: Combine snapshot and event queries for efficient state rebuilding

257

- **CQRS Read Models**: Build projections using event streams with offset tracking

258

259

## Types

260

261

```scala { .api }

262

/**

263

* A query plugin must implement a class that implements this trait.

264

* It provides the concrete implementations for the Java and Scala APIs.

265

*

266

* A read journal plugin must provide implementations for both

267

* scaladsl.ReadJournal and javadsl.ReadJournal.

268

*/

269

trait ReadJournalProvider {

270

/**

271

* The ReadJournal implementation for the Scala API.

272

* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.

273

*/

274

def scaladslReadJournal(): scaladsl.ReadJournal

275

276

/**

277

* The ReadJournal implementation for the Java API.

278

* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.

279

*/

280

def javadslReadJournal(): javadsl.ReadJournal

281

}

282

283

final class DeletedDurableState[A](

284

val persistenceId: String,

285

val revision: Long,

286

override val offset: Offset,

287

val timestamp: Long

288

) extends DurableStateChange[A]

289

290

object TimestampOffset {

291

val Zero: TimestampOffset

292

def apply(timestamp: Instant, seen: Map[String, Long]): TimestampOffset

293

def toTimestampOffset(offset: Offset): TimestampOffset

294

}

295

296

import java.util.Optional

297

import java.time.Instant

298

import scala.collection.immutable

299

```