or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

at-least-once-delivery.mddurable-state.mdevent-adapters.mdindex.mdjournal-api.mdpersistent-actors.mdplugin-development.mdsnapshots.md

event-adapters.mddocs/

0

# Event Adapters

1

2

Event transformation system for schema evolution and event format conversion between domain and journal representations.

3

4

## Capabilities

5

6

### EventAdapter Trait

7

8

Combined read and write event adapter interface.

9

10

```scala { .api }

11

/**

12

* Combined read and write event adapter interface

13

*/

14

trait EventAdapter extends WriteEventAdapter with ReadEventAdapter

15

```

16

17

### WriteEventAdapter

18

19

Converts domain events to journal format for persistence.

20

21

```scala { .api }

22

/**

23

* Converts domain events to journal format for storage

24

*/

25

trait WriteEventAdapter {

26

/** Return manifest string for the event type */

27

def manifest(event: Any): String

28

29

/** Convert domain event to journal representation */

30

def toJournal(event: Any): Any

31

}

32

```

33

34

**Usage Examples:**

35

36

```scala

37

import akka.persistence.journal.WriteEventAdapter

38

39

class OrderEventAdapter extends WriteEventAdapter {

40

override def manifest(event: Any): String = event match {

41

case _: OrderPlaced => "order-placed-v2"

42

case _: OrderCancelled => "order-cancelled-v1"

43

case _ => ""

44

}

45

46

override def toJournal(event: Any): Any = event match {

47

case OrderPlaced(orderId, items, total) =>

48

// Convert to journal format with additional metadata

49

JournalOrderPlaced(orderId, items, total, System.currentTimeMillis())

50

case other => other

51

}

52

}

53

```

54

55

### ReadEventAdapter

56

57

Converts journal events back to domain format during recovery.

58

59

```scala { .api }

60

/**

61

* Converts journal events to domain format during recovery

62

*/

63

trait ReadEventAdapter {

64

/** Convert journal event to domain representation(s) */

65

def fromJournal(event: Any, manifest: String): EventSeq

66

}

67

```

68

69

**Usage Examples:**

70

71

```scala

72

import akka.persistence.journal.{ReadEventAdapter, EventSeq}

73

74

class OrderEventReadAdapter extends ReadEventAdapter {

75

override def fromJournal(event: Any, manifest: String): EventSeq = {

76

manifest match {

77

case "order-placed-v1" =>

78

// Migrate old format to new format

79

val old = event.asInstanceOf[OldOrderPlaced]

80

EventSeq.single(OrderPlaced(old.id, old.items, calculateTotal(old.items)))

81

82

case "order-placed-v2" =>

83

EventSeq.single(event.asInstanceOf[JournalOrderPlaced].toDomainEvent)

84

85

case "order-cancelled-v1" =>

86

EventSeq.single(event)

87

88

case _ => EventSeq.empty

89

}

90

}

91

}

92

```

93

94

### EventSeq Container

95

96

Container for adapted events supporting single or multiple event results.

97

98

```scala { .api }

99

/**

100

* Container for adapted events returned from ReadEventAdapter

101

*/

102

sealed abstract class EventSeq {

103

/** Sequence of adapted events */

104

def events: immutable.Seq[Any]

105

}

106

107

object EventSeq {

108

/** Empty event sequence */

109

def empty: EventSeq

110

111

/** Single event sequence */

112

def single(event: Any): EventSeq

113

114

/** Multiple event sequence */

115

def apply(events: Any*): EventSeq

116

117

/** Java API for creating event sequences */

118

def create(events: Any*): EventSeq

119

}

120

```

121

122

#### EventSeq Implementations

123

124

```scala { .api }

125

/**

126

* Event sequence containing a single event

127

*/

128

case class SingleEventSeq(event: Any) extends EventSeq {

129

override def events: immutable.Seq[Any] = Vector(event)

130

}

131

132

/**

133

* Event sequence containing multiple events

134

*/

135

case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeq

136

```

137

138

### IdentityEventAdapter

139

140

No-operation adapter that passes events through unchanged.

141

142

```scala { .api }

143

/**

144

* No-op adapter that passes events through unchanged

145

*/

146

case object IdentityEventAdapter extends EventAdapter {

147

override def manifest(event: Any): String = ""

148

override def toJournal(event: Any): Any = event

149

override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)

150

}

151

```

152

153

### Event Tagging

154

155

Support for tagging events for journal implementations that support it.

156

157

```scala { .api }

158

/**

159

* Wraps events with tags for journal implementations that support tagging

160

*/

161

case class Tagged(payload: Any, tags: Set[String]) {

162

/** Java constructor */

163

def this(payload: Any, tags: java.util.Set[String]) =

164

this(payload, tags.asScala.toSet)

165

}

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import akka.persistence.journal.Tagged

172

173

class TaggingEventAdapter extends WriteEventAdapter {

174

override def manifest(event: Any): String = event.getClass.getSimpleName

175

176

override def toJournal(event: Any): Any = event match {

177

case OrderPlaced(orderId, items, total) =>

178

val tags = Set("order", "placed") ++

179

(if (total > 1000) Set("high-value") else Set.empty) ++

180

items.map(_.category).toSet

181

Tagged(event, tags)

182

183

case OrderCancelled(orderId, reason) =>

184

Tagged(event, Set("order", "cancelled", reason))

185

186

case other => other

187

}

188

}

189

```

190

191

### Example: Complete Event Adapter Implementation

192

193

```scala

194

import akka.persistence.journal._

195

196

// Domain events

197

sealed trait OrderEvent

198

case class OrderPlaced(orderId: String, items: List[OrderItem], total: BigDecimal) extends OrderEvent

199

case class OrderCancelled(orderId: String, reason: String) extends OrderEvent

200

case class OrderShipped(orderId: String, trackingNumber: String) extends OrderEvent

201

202

// Journal representations

203

case class JournalOrderPlaced(

204

orderId: String,

205

items: List[OrderItem],

206

total: BigDecimal,

207

timestamp: Long,

208

version: Int = 2

209

)

210

211

case class JournalOrderCancelled(

212

orderId: String,

213

reason: String,

214

timestamp: Long

215

)

216

217

class OrderEventAdapter extends EventAdapter {

218

override def manifest(event: Any): String = event match {

219

case _: OrderPlaced => "order-placed-v2"

220

case _: OrderCancelled => "order-cancelled-v2"

221

case _: OrderShipped => "order-shipped-v1"

222

case _ => ""

223

}

224

225

override def toJournal(event: Any): Any = event match {

226

case OrderPlaced(orderId, items, total) =>

227

val tags = Set("order", "placed") ++

228

(if (total > 500) Set("high-value") else Set.empty)

229

Tagged(

230

JournalOrderPlaced(orderId, items, total, System.currentTimeMillis()),

231

tags

232

)

233

234

case OrderCancelled(orderId, reason) =>

235

Tagged(

236

JournalOrderCancelled(orderId, reason, System.currentTimeMillis()),

237

Set("order", "cancelled")

238

)

239

240

case OrderShipped(orderId, trackingNumber) =>

241

Tagged(event, Set("order", "shipped"))

242

243

case other => other

244

}

245

246

override def fromJournal(event: Any, manifest: String): EventSeq = {

247

manifest match {

248

// Handle version 2 events (current)

249

case "order-placed-v2" =>

250

val journal = event.asInstanceOf[JournalOrderPlaced]

251

EventSeq.single(OrderPlaced(journal.orderId, journal.items, journal.total))

252

253

case "order-cancelled-v2" =>

254

val journal = event.asInstanceOf[JournalOrderCancelled]

255

EventSeq.single(OrderCancelled(journal.orderId, journal.reason))

256

257

// Handle legacy version 1 events (migration)

258

case "order-placed-v1" =>

259

val old = event.asInstanceOf[OldOrderPlaced]

260

// Migrate old format and potentially split into multiple events

261

val orderPlaced = OrderPlaced(old.id, old.items, old.total)

262

if (old.wasExpedited) {

263

// Split expedited orders into placement + shipping events

264

EventSeq(

265

orderPlaced,

266

OrderShipped(old.id, "EXPEDITED-" + old.id)

267

)

268

} else {

269

EventSeq.single(orderPlaced)

270

}

271

272

case "order-cancelled-v1" =>

273

val old = event.asInstanceOf[OldOrderCancelled]

274

EventSeq.single(OrderCancelled(old.orderId, old.cancellationReason))

275

276

case _ => EventSeq.empty

277

}

278

}

279

}

280

```

281

282

### Adapter Configuration

283

284

Event adapters are configured in application.conf:

285

286

```hocon

287

akka.persistence.journal {

288

plugin = "akka.persistence.journal.leveldb"

289

290

# Event adapter configuration

291

leveldb {

292

event-adapters {

293

order-adapter = "com.example.OrderEventAdapter"

294

user-adapter = "com.example.UserEventAdapter"

295

}

296

297

event-adapter-bindings {

298

"com.example.OrderEvent" = order-adapter

299

"com.example.UserEvent" = user-adapter

300

}

301

}

302

}

303

```

304

305

### Advanced Patterns

306

307

#### Conditional Event Processing

308

309

```scala

310

class ConditionalEventAdapter extends EventAdapter {

311

override def fromJournal(event: Any, manifest: String): EventSeq = {

312

event match {

313

case problematicEvent: ProblematicEvent if shouldSkip(problematicEvent) =>

314

// Skip problematic events during recovery

315

EventSeq.empty

316

317

case validEvent: ValidEvent if shouldUpgrade(validEvent) =>

318

// Upgrade event to newer version

319

EventSeq.single(upgradeEvent(validEvent))

320

321

case batchEvent: BatchEvent =>

322

// Split batch events into individual events

323

EventSeq(batchEvent.events: _*)

324

325

case other =>

326

EventSeq.single(other)

327

}

328

}

329

}

330

```

331

332

#### Event Filtering and Transformation

333

334

```scala

335

class FilteringEventAdapter extends ReadEventAdapter {

336

override def fromJournal(event: Any, manifest: String): EventSeq = {

337

event match {

338

case SensitiveEvent(data) if shouldRedact(data) =>

339

// Redact sensitive information

340

EventSeq.single(SensitiveEvent(redact(data)))

341

342

case DeprecatedEvent(info) =>

343

// Transform deprecated events to new format

344

EventSeq.single(NewFormatEvent.fromDeprecated(info))

345

346

case InvalidEvent(_) =>

347

// Skip invalid events entirely

348

EventSeq.empty

349

350

case other => EventSeq.single(other)

351

}

352

}

353

}