or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

durable-state.mdevent-envelopes.mdextension.mdindex.mdjournal-implementations.mdoffsets.mdtyped-queries.mduntyped-queries.md

extension.mddocs/

0

# Extension and Configuration

1

2

The PersistenceQuery extension provides the main entry point for obtaining read journal instances from configured plugins. It manages plugin lifecycle and provides both Scala and Java APIs.

3

4

## Capabilities

5

6

### PersistenceQuery Extension

7

8

Main actor system extension for persistence query functionality.

9

10

```scala { .api }

11

/**

12

* Persistence extension for queries.

13

*/

14

object PersistenceQuery extends ExtensionId[PersistenceQuery] with ExtensionIdProvider {

15

/** Get the extension instance for the given actor system */

16

def get(system: ActorSystem): PersistenceQuery

17

/** Get the extension instance for the given classic actor system provider */

18

def get(system: ClassicActorSystemProvider): PersistenceQuery

19

/** Lookup method for extension resolution */

20

def lookup: PersistenceQuery.type

21

}

22

23

class PersistenceQuery(system: ExtendedActorSystem) extends Extension {

24

/**

25

* Scala API: Returns the ReadJournal specified by the given read journal configuration entry

26

* @param readJournalPluginId Plugin identifier from configuration

27

* @return Configured read journal instance

28

*/

29

def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String): T

30

31

/**

32

* Scala API: Returns the ReadJournal with custom configuration

33

* @param readJournalPluginId Plugin identifier

34

* @param readJournalPluginConfig Custom plugin configuration

35

* @return Configured read journal instance

36

*/

37

def readJournalFor[T <: scaladsl.ReadJournal](readJournalPluginId: String, readJournalPluginConfig: Config): T

38

39

/**

40

* Java API: Returns the ReadJournal specified by the given read journal configuration entry

41

* @param clazz Class of the read journal implementation

42

* @param readJournalPluginId Plugin identifier from configuration

43

* @return Configured read journal instance

44

*/

45

def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T

46

47

/**

48

* Java API: Returns the ReadJournal with custom configuration

49

* @param clazz Class of the read journal implementation

50

* @param readJournalPluginId Plugin identifier

51

* @param readJournalPluginConfig Custom plugin configuration

52

* @return Configured read journal instance

53

*/

54

def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String, readJournalPluginConfig: Config): T

55

}

56

```

57

58

**Usage Examples:**

59

60

```scala

61

import akka.actor.ActorSystem

62

import akka.persistence.query.PersistenceQuery

63

import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal

64

65

implicit val system: ActorSystem = ActorSystem("example")

66

67

// Get read journal using plugin identifier

68

val readJournal = PersistenceQuery(system)

69

.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

70

71

// Get read journal with custom configuration

72

import com.typesafe.config.ConfigFactory

73

val customConfig = ConfigFactory.parseString("""

74

akka.persistence.query.journal.leveldb {

75

class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"

76

write-plugin = "akka.persistence.journal.leveldb"

77

dir = "target/custom-journal"

78

}

79

""")

80

81

val customReadJournal = PersistenceQuery(system)

82

.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier, customConfig)

83

```

84

85

Java API usage:

86

```java

87

import akka.actor.ActorSystem;

88

import akka.persistence.query.PersistenceQuery;

89

import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;

90

91

ActorSystem system = ActorSystem.create("example");

92

93

// Get read journal using plugin identifier

94

LeveldbReadJournal readJournal = PersistenceQuery.get(system)

95

.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier);

96

```

97

98

### ReadJournalProvider Interface

99

100

Plugin provider interface that read journal implementations must implement.

101

102

```scala { .api }

103

/**

104

* A query plugin must implement a class that implements this trait.

105

* It provides the concrete implementations for the Java and Scala APIs.

106

*/

107

trait ReadJournalProvider {

108

/**

109

* The ReadJournal implementation for the Scala API.

110

* This corresponds to the instance that is returned by PersistenceQuery#readJournalFor.

111

*/

112

def scaladslReadJournal(): scaladsl.ReadJournal

113

114

/**

115

* The ReadJournal implementation for the Java API.

116

* This corresponds to the instance that is returned by PersistenceQuery#getReadJournalFor.

117

*/

118

def javadslReadJournal(): javadsl.ReadJournal

119

}

120

```

121

122

**Implementation Example:**

123

124

```scala

125

class MyReadJournalProvider extends ReadJournalProvider {

126

override def scaladslReadJournal(): scaladsl.ReadJournal = {

127

new MyScalaReadJournal()

128

}

129

130

override def javadslReadJournal(): javadsl.ReadJournal = {

131

new MyJavaReadJournal()

132

}

133

}

134

```

135

136

## Configuration

137

138

### Plugin Configuration

139

140

Read journal plugins are configured in `application.conf`:

141

142

```hocon

143

# Example LevelDB read journal configuration

144

akka.persistence.query.journal.leveldb {

145

# Implementation class for the plugin

146

class = "akka.persistence.query.journal.leveldb.LeveldbReadJournalProvider"

147

148

# Reference to the write journal plugin

149

write-plugin = "akka.persistence.journal.leveldb"

150

151

# Directory where journal files are stored

152

dir = "target/journal"

153

154

# Maximum number of events to replay

155

max-buffer-size = 100

156

}

157

```

158

159

### Plugin Identifier Usage

160

161

Each read journal plugin has a unique identifier:

162

163

```scala

164

// Built-in plugin identifiers

165

val leveldbId = "akka.persistence.query.journal.leveldb"

166

val firehoseId = "akka.persistence.query.events-by-slice-firehose"

167

168

// Using with PersistenceQuery

169

val journal = PersistenceQuery(system).readJournalFor[MyReadJournal](leveldbId)

170

```

171

172

### Custom Plugin Registration

173

174

Register custom plugins in configuration:

175

176

```hocon

177

my-read-journal {

178

class = "com.example.MyReadJournalProvider"

179

# Additional plugin-specific configuration

180

connection-timeout = 5s

181

batch-size = 1000

182

}

183

```

184

185

Then use with PersistenceQuery:

186

187

```scala

188

val customJournal = PersistenceQuery(system)

189

.readJournalFor[MyReadJournal]("my-read-journal")

190

```

191

192

## Error Handling

193

194

### Common Configuration Errors

195

196

- **ClassNotFoundException**: Plugin class not found in classpath

197

- **ConfigurationException**: Missing or invalid plugin configuration

198

- **IllegalArgumentException**: Invalid plugin identifier

199

200

### Plugin Resolution

201

202

The extension uses the following resolution strategy:

203

1. Look up plugin configuration by identifier

204

2. Load and instantiate the provider class

205

3. Cache the instance for future requests

206

4. Return the appropriate Scala or Java DSL implementation

207

208

## Types

209

210

```scala { .api }

211

trait ExtensionId[T <: Extension] {

212

def createExtension(system: ExtendedActorSystem): T

213

}

214

215

trait ExtensionIdProvider {

216

def lookup: ExtensionId[_ <: Extension]

217

}

218

219

trait Extension

220

```