0
# Extension and Configuration
1
2
The PersistenceQuery extension provides the main entry point for obtaining read journal instances from configured plugins. It manages plugin lifecycle and provides both Scala and Java APIs.
3
4
## Capabilities
5
6
### PersistenceQuery Extension
7
8
Main actor system extension for persistence query functionality.
9
10
```scala { .api }
11
/**
12
* Persistence extension for queries.
13
*/
14
object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider {
15
/** Get the extension instance for the given actor system */
16
def get(system: ActorSystem): PersistenceQuery
17
/** Get the extension instance for the given classic actor system provider */
18
def get(system: ClassicActorSystemProvider): PersistenceQuery
19
/** Lookup method for extension resolution */
20
def lookup: PersistenceQuery.type
21
}
22
23
class PersistenceQuery(system: ExtendedActorSystem) extends Extension {
24
/**
25
* Scala API: Returns the ReadJournal specified by the given read journal configuration entry
26
* @param readJournalPluginId Plugin identifier from configuration
27
* @return Configured read journal instance
28
*/
29
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T
30
31
/**
32
* Scala API: Returns the ReadJournal with custom configuration
33
* @param readJournalPluginId Plugin identifier
34
* @param readJournalPluginConfig Custom plugin configuration
35
* @return Configured read journal instance
36
*/
37
def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T
38
39
/**
40
* Java API: Returns the ReadJournal specified by the given read journal configuration entry
41
* @param clazz Class of the read journal implementation
42
* @param readJournalPluginId Plugin identifier from configuration
43
* @return Configured read journal instance
44
*/
45
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T
46
47
/**
48
* Java API: Returns the ReadJournal with custom configuration
49
* @param clazz Class of the read journal implementation
50
* @param readJournalPluginId Plugin identifier
51
* @param readJournalPluginConfig Custom plugin configuration
52
* @return Configured read journal instance
53
*/
54
def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T
55
}
56
```
57
58
**Usage Examples:**
59
60
```scala
61
import akka.actor.ActorSystem
62
import akka.persistence.query.PersistenceQuery
63
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
64
65
implicit val system: ActorSystem = ActorSystem("example")
66
67
// Get read journal using plugin identifier
68
val readJournal = PersistenceQuery(system)
69
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
70
71
// Get read journal with custom configuration
72
import com.typesafe.config.ConfigFactory
73
val customConfig = ConfigFactory.parseString("""
74
akka.persistence.query.journal.leveldb {
75
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
76
write-plugin = "akka.persistence.journal.leveldb"
77
dir = "target/custom-journal"
78
}
79
""")
80
81
val customReadJournal = PersistenceQuery(system)
82
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier, customConfig)
83
```
84
85
Java API usage:
86
```java
87
import akka.actor.ActorSystem;
88
import akka.persistence.query.PersistenceQuery;
89
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
90
91
ActorSystem system = ActorSystem.create("example");
92
93
// Get read journal using plugin identifier
94
LeveldbReadJournal readJournal = PersistenceQuery.get(system)
95
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);
96
```
97
98
### ReadJournalProvider Interface
99
100
Plugin provider interface that read journal implementations must implement.
101
102
```scala { .api }
103
/**
104
* A query plugin must implement a class that implements this trait.
105
* It provides the concrete implementations for the Java and Scala APIs.
106
*/
107
trait ReadJournalProvider {
108
/**
109
* The ReadJournal implementation for the Scala API.
110
* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.
111
*/
112
def scaladslReadJournal(): scaladsl.ReadJournal
113
114
/**
115
* The ReadJournal implementation for the Java API.
116
* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.
117
*/
118
def javadslReadJournal(): javadsl.ReadJournal
119
}
120
```
121
122
**Implementation Example:**
123
124
```scala
125
class MyReadJournalProvider extends ReadJournalProvider {
126
override def scaladslReadJournal(): scaladsl.ReadJournal = {
127
new MyScalaReadJournal()
128
}
129
130
override def javadslReadJournal(): javadsl.ReadJournal = {
131
new MyJavaReadJournal()
132
}
133
}
134
```
135
136
## Configuration
137
138
### Plugin Configuration
139
140
Read journal plugins are configured in `application.conf`:
141
142
```hocon
143
# Example LevelDB read journal configuration
144
akka.persistence.query.journal.leveldb {
145
# Implementation class for the plugin
146
class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"
147
148
# Reference to the write journal plugin
149
write-plugin = "akka.persistence.journal.leveldb"
150
151
# Directory where journal files are stored
152
dir = "target/journal"
153
154
# Maximum number of events to replay
155
max-buffer-size = 100
156
}
157
```
158
159
### Plugin Identifier Usage
160
161
Each read journal plugin has a unique identifier:
162
163
```scala
164
// Built-in plugin identifiers
165
val leveldbId = "akka.persistence.query.journal.leveldb"
166
val firehoseId = "akka.persistence.query.events-by-slice-firehose"
167
168
// Using with PersistenceQuery
169
val journal = PersistenceQuery(system).readJournalFor[MyReadJournal](leveldbId)
170
```
171
172
### Custom Plugin Registration
173
174
Register custom plugins in configuration:
175
176
```hocon
177
my-read-journal {
178
class = "com.example.MyReadJournalProvider"
179
# Additional plugin-specific configuration
180
connection-timeout = 5s
181
batch-size = 1000
182
}
183
```
184
185
Then use with PersistenceQuery:
186
187
```scala
188
val customJournal = PersistenceQuery(system)
189
.readJournalFor[MyReadJournal]("my-read-journal")
190
```
191
192
## Error Handling
193
194
### Common Configuration Errors
195
196
- **ClassNotFoundException**: Plugin class not found in classpath
197
- **ConfigurationException**: Missing or invalid plugin configuration
198
- **IllegalArgumentException**: Invalid plugin identifier
199
200
### Plugin Resolution
201
202
The extension uses the following resolution strategy:
203
1. Look up plugin configuration by identifier
204
2. Load and instantiate the provider class
205
3. Cache the instance for future requests
206
4. Return the appropriate Scala or Java DSL implementation
207
208
## Types
209
210
```scala { .api }
211
trait ExtensionId[T <: Extension] {
212
def createExtension(system: ExtendedActorSystem): T
213
}
214
215
trait ExtensionIdProvider {
216
def lookup: ExtensionId[_ <: Extension]
217
}
218
219
trait Extension
220
```