0
# Event Adapters
1
2
Event transformation system for schema evolution and event format conversion between domain and journal representations.
3
4
## Capabilities
5
6
### EventAdapter Trait
7
8
Combined read and write event adapter interface.
9
10
```scala { .api }
11
/**
12
* Combined read and write event adapter interface
13
*/
14
trait EventAdapter extends WriteEventAdapter with ReadEventAdapter
15
```
16
17
### WriteEventAdapter
18
19
Converts domain events to journal format for persistence.
20
21
```scala { .api }
22
/**
23
* Converts domain events to journal format for storage
24
*/
25
trait WriteEventAdapter {
26
/** Return manifest string for the event type */
27
def manifest(event: Any): String
28
29
/** Convert domain event to journal representation */
30
def toJournal(event: Any): Any
31
}
32
```
33
34
**Usage Examples:**
35
36
```scala
37
import akka.persistence.journal.WriteEventAdapter
38
39
class OrderEventAdapter extends WriteEventAdapter {
40
override def manifest(event: Any): String = event match {
41
case _: OrderPlaced => "order-placed-v2"
42
case _: OrderCancelled => "order-cancelled-v1"
43
case _ => ""
44
}
45
46
override def toJournal(event: Any): Any = event match {
47
case OrderPlaced(orderId, items, total) =>
48
// Convert to journal format with additional metadata
49
JournalOrderPlaced(orderId, items, total, System.currentTimeMillis())
50
case other => other
51
}
52
}
53
```
54
55
### ReadEventAdapter
56
57
Converts journal events back to domain format during recovery.
58
59
```scala { .api }
60
/**
61
* Converts journal events to domain format during recovery
62
*/
63
trait ReadEventAdapter {
64
/** Convert journal event to domain representation(s) */
65
def fromJournal(event: Any, manifest: String): EventSeq
66
}
67
```
68
69
**Usage Examples:**
70
71
```scala
72
import akka.persistence.journal.{ReadEventAdapter, EventSeq}
73
74
class OrderEventReadAdapter extends ReadEventAdapter {
75
override def fromJournal(event: Any, manifest: String): EventSeq = {
76
manifest match {
77
case "order-placed-v1" =>
78
// Migrate old format to new format
79
val old = event.asInstanceOf[OldOrderPlaced]
80
EventSeq.single(OrderPlaced(old.id, old.items, calculateTotal(old.items)))
81
82
case "order-placed-v2" =>
83
EventSeq.single(event.asInstanceOf[JournalOrderPlaced].toDomainEvent)
84
85
case "order-cancelled-v1" =>
86
EventSeq.single(event)
87
88
case _ => EventSeq.empty
89
}
90
}
91
}
92
```
93
94
### EventSeq Container
95
96
Container for adapted events supporting single or multiple event results.
97
98
```scala { .api }
99
/**
100
* Container for adapted events returned from ReadEventAdapter
101
*/
102
sealed abstract class EventSeq {
103
/** Sequence of adapted events */
104
def events: immutable.Seq[Any]
105
}
106
107
object EventSeq {
108
/** Empty event sequence */
109
def empty: EventSeq
110
111
/** Single event sequence */
112
def single(event: Any): EventSeq
113
114
/** Multiple event sequence */
115
def apply(events: Any*): EventSeq
116
117
/** Java API for creating event sequences */
118
def create(events: Any*): EventSeq
119
}
120
```
121
122
#### EventSeq Implementations
123
124
```scala { .api }
125
/**
126
* Event sequence containing a single event
127
*/
128
case class SingleEventSeq(event: Any) extends EventSeq {
129
override def events: immutable.Seq[Any] = Vector(event)
130
}
131
132
/**
133
* Event sequence containing multiple events
134
*/
135
case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeq
136
```
137
138
### IdentityEventAdapter
139
140
No-operation adapter that passes events through unchanged.
141
142
```scala { .api }
143
/**
144
* No-op adapter that passes events through unchanged
145
*/
146
case object IdentityEventAdapter extends EventAdapter {
147
override def manifest(event: Any): String = ""
148
override def toJournal(event: Any): Any = event
149
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
150
}
151
```
152
153
### Event Tagging
154
155
Support for tagging events for journal implementations that support it.
156
157
```scala { .api }
158
/**
159
* Wraps events with tags for journal implementations that support tagging
160
*/
161
case class Tagged(payload: Any, tags: Set[String]) {
162
/** Java constructor */
163
def this(payload: Any, tags: java.util.Set[String]) =
164
this(payload, tags.asScala.toSet)
165
}
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import akka.persistence.journal.Tagged
172
173
class TaggingEventAdapter extends WriteEventAdapter {
174
override def manifest(event: Any): String = event.getClass.getSimpleName
175
176
override def toJournal(event: Any): Any = event match {
177
case OrderPlaced(orderId, items, total) =>
178
val tags = Set("order", "placed") ++
179
(if (total > 1000) Set("high-value") else Set.empty) ++
180
items.map(_.category).toSet
181
Tagged(event, tags)
182
183
case OrderCancelled(orderId, reason) =>
184
Tagged(event, Set("order", "cancelled", reason))
185
186
case other => other
187
}
188
}
189
```
190
191
### Example: Complete Event Adapter Implementation
192
193
```scala
194
import akka.persistence.journal._
195
196
// Domain events
197
sealed trait OrderEvent
198
case class OrderPlaced(orderId: String, items: List[OrderItem], total: BigDecimal) extends OrderEvent
199
case class OrderCancelled(orderId: String, reason: String) extends OrderEvent
200
case class OrderShipped(orderId: String, trackingNumber: String) extends OrderEvent
201
202
// Journal representations
203
case class JournalOrderPlaced(
204
orderId: String,
205
items: List[OrderItem],
206
total: BigDecimal,
207
timestamp: Long,
208
version: Int = 2
209
)
210
211
case class JournalOrderCancelled(
212
orderId: String,
213
reason: String,
214
timestamp: Long
215
)
216
217
class OrderEventAdapter extends EventAdapter {
218
override def manifest(event: Any): String = event match {
219
case _: OrderPlaced => "order-placed-v2"
220
case _: OrderCancelled => "order-cancelled-v2"
221
case _: OrderShipped => "order-shipped-v1"
222
case _ => ""
223
}
224
225
override def toJournal(event: Any): Any = event match {
226
case OrderPlaced(orderId, items, total) =>
227
val tags = Set("order", "placed") ++
228
(if (total > 500) Set("high-value") else Set.empty)
229
Tagged(
230
JournalOrderPlaced(orderId, items, total, System.currentTimeMillis()),
231
tags
232
)
233
234
case OrderCancelled(orderId, reason) =>
235
Tagged(
236
JournalOrderCancelled(orderId, reason, System.currentTimeMillis()),
237
Set("order", "cancelled")
238
)
239
240
case OrderShipped(orderId, trackingNumber) =>
241
Tagged(event, Set("order", "shipped"))
242
243
case other => other
244
}
245
246
override def fromJournal(event: Any, manifest: String): EventSeq = {
247
manifest match {
248
// Handle version 2 events (current)
249
case "order-placed-v2" =>
250
val journal = event.asInstanceOf[JournalOrderPlaced]
251
EventSeq.single(OrderPlaced(journal.orderId, journal.items, journal.total))
252
253
case "order-cancelled-v2" =>
254
val journal = event.asInstanceOf[JournalOrderCancelled]
255
EventSeq.single(OrderCancelled(journal.orderId, journal.reason))
256
257
// Handle legacy version 1 events (migration)
258
case "order-placed-v1" =>
259
val old = event.asInstanceOf[OldOrderPlaced]
260
// Migrate old format and potentially split into multiple events
261
val orderPlaced = OrderPlaced(old.id, old.items, old.total)
262
if (old.wasExpedited) {
263
// Split expedited orders into placement + shipping events
264
EventSeq(
265
orderPlaced,
266
OrderShipped(old.id, "EXPEDITED-" + old.id)
267
)
268
} else {
269
EventSeq.single(orderPlaced)
270
}
271
272
case "order-cancelled-v1" =>
273
val old = event.asInstanceOf[OldOrderCancelled]
274
EventSeq.single(OrderCancelled(old.orderId, old.cancellationReason))
275
276
case _ => EventSeq.empty
277
}
278
}
279
}
280
```
281
282
### Adapter Configuration
283
284
Event adapters are configured in application.conf:
285
286
```hocon
287
akka.persistence.journal {
288
plugin = "akka.persistence.journal.leveldb"
289
290
# Event adapter configuration
291
leveldb {
292
event-adapters {
293
order-adapter = "com.example.OrderEventAdapter"
294
user-adapter = "com.example.UserEventAdapter"
295
}
296
297
event-adapter-bindings {
298
"com.example.OrderEvent" = order-adapter
299
"com.example.UserEvent" = user-adapter
300
}
301
}
302
}
303
```
304
305
### Advanced Patterns
306
307
#### Conditional Event Processing
308
309
```scala
310
class ConditionalEventAdapter extends EventAdapter {
311
override def fromJournal(event: Any, manifest: String): EventSeq = {
312
event match {
313
case problematicEvent: ProblematicEvent if shouldSkip(problematicEvent) =>
314
// Skip problematic events during recovery
315
EventSeq.empty
316
317
case validEvent: ValidEvent if shouldUpgrade(validEvent) =>
318
// Upgrade event to newer version
319
EventSeq.single(upgradeEvent(validEvent))
320
321
case batchEvent: BatchEvent =>
322
// Split batch events into individual events
323
EventSeq(batchEvent.events: _*)
324
325
case other =>
326
EventSeq.single(other)
327
}
328
}
329
}
330
```
331
332
#### Event Filtering and Transformation
333
334
```scala
335
class FilteringEventAdapter extends ReadEventAdapter {
336
override def fromJournal(event: Any, manifest: String): EventSeq = {
337
event match {
338
case SensitiveEvent(data) if shouldRedact(data) =>
339
// Redact sensitive information
340
EventSeq.single(SensitiveEvent(redact(data)))
341
342
case DeprecatedEvent(info) =>
343
// Transform deprecated events to new format
344
EventSeq.single(NewFormatEvent.fromDeprecated(info))
345
346
case InvalidEvent(_) =>
347
// Skip invalid events entirely
348
EventSeq.empty
349
350
case other => EventSeq.single(other)
351
}
352
}
353
}