Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.
—
Event transformation system for schema evolution and event format conversion between domain and journal representations.
Combined read and write event adapter interface.
/**
* Combined read and write event adapter interface
*/
trait EventAdapter extends WriteEventAdapter with ReadEventAdapterConverts domain events to journal format for persistence.
/**
* Converts domain events to journal format for storage
*/
trait WriteEventAdapter {
/** Return manifest string for the event type */
def manifest(event: Any): String
/** Convert domain event to journal representation */
def toJournal(event: Any): Any
}Usage Examples:
import akka.persistence.journal.WriteEventAdapter
class OrderEventAdapter extends WriteEventAdapter {
override def manifest(event: Any): String = event match {
case _: OrderPlaced => "order-placed-v2"
case _: OrderCancelled => "order-cancelled-v1"
case _ => ""
}
override def toJournal(event: Any): Any = event match {
case OrderPlaced(orderId, items, total) =>
// Convert to journal format with additional metadata
JournalOrderPlaced(orderId, items, total, System.currentTimeMillis())
case other => other
}
}Converts journal events back to domain format during recovery.
/**
* Converts journal events to domain format during recovery
*/
trait ReadEventAdapter {
/** Convert journal event to domain representation(s) */
def fromJournal(event: Any, manifest: String): EventSeq
}Usage Examples:
import akka.persistence.journal.{ReadEventAdapter, EventSeq}
class OrderEventReadAdapter extends ReadEventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = {
manifest match {
case "order-placed-v1" =>
// Migrate old format to new format
val old = event.asInstanceOf[OldOrderPlaced]
EventSeq.single(OrderPlaced(old.id, old.items, calculateTotal(old.items)))
case "order-placed-v2" =>
EventSeq.single(event.asInstanceOf[JournalOrderPlaced].toDomainEvent)
case "order-cancelled-v1" =>
EventSeq.single(event)
case _ => EventSeq.empty
}
}
}Container for adapted events supporting single or multiple event results.
/**
* Container for adapted events returned from ReadEventAdapter
*/
sealed abstract class EventSeq {
/** Sequence of adapted events */
def events: immutable.Seq[Any]
}
object EventSeq {
/** Empty event sequence */
def empty: EventSeq
/** Single event sequence */
def single(event: Any): EventSeq
/** Multiple event sequence */
def apply(events: Any*): EventSeq
/** Java API for creating event sequences */
def create(events: Any*): EventSeq
}/**
* Event sequence containing a single event
*/
case class SingleEventSeq(event: Any) extends EventSeq {
override def events: immutable.Seq[Any] = Vector(event)
}
/**
* Event sequence containing multiple events
*/
case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeqNo-operation adapter that passes events through unchanged.
/**
* No-op adapter that passes events through unchanged
*/
case object IdentityEventAdapter extends EventAdapter {
override def manifest(event: Any): String = ""
override def toJournal(event: Any): Any = event
override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
}Support for tagging events for journal implementations that support it.
/**
* Wraps events with tags for journal implementations that support tagging
*/
case class Tagged(payload: Any, tags: Set[String]) {
/** Java constructor */
def this(payload: Any, tags: java.util.Set[String]) =
this(payload, tags.asScala.toSet)
}Usage Examples:
import akka.persistence.journal.Tagged
class TaggingEventAdapter extends WriteEventAdapter {
override def manifest(event: Any): String = event.getClass.getSimpleName
override def toJournal(event: Any): Any = event match {
case OrderPlaced(orderId, items, total) =>
val tags = Set("order", "placed") ++
(if (total > 1000) Set("high-value") else Set.empty) ++
items.map(_.category).toSet
Tagged(event, tags)
case OrderCancelled(orderId, reason) =>
Tagged(event, Set("order", "cancelled", reason))
case other => other
}
}import akka.persistence.journal._
// Domain events
sealed trait OrderEvent
case class OrderPlaced(orderId: String, items: List[OrderItem], total: BigDecimal) extends OrderEvent
case class OrderCancelled(orderId: String, reason: String) extends OrderEvent
case class OrderShipped(orderId: String, trackingNumber: String) extends OrderEvent
// Journal representations
case class JournalOrderPlaced(
orderId: String,
items: List[OrderItem],
total: BigDecimal,
timestamp: Long,
version: Int = 2
)
case class JournalOrderCancelled(
orderId: String,
reason: String,
timestamp: Long
)
class OrderEventAdapter extends EventAdapter {
override def manifest(event: Any): String = event match {
case _: OrderPlaced => "order-placed-v2"
case _: OrderCancelled => "order-cancelled-v2"
case _: OrderShipped => "order-shipped-v1"
case _ => ""
}
override def toJournal(event: Any): Any = event match {
case OrderPlaced(orderId, items, total) =>
val tags = Set("order", "placed") ++
(if (total > 500) Set("high-value") else Set.empty)
Tagged(
JournalOrderPlaced(orderId, items, total, System.currentTimeMillis()),
tags
)
case OrderCancelled(orderId, reason) =>
Tagged(
JournalOrderCancelled(orderId, reason, System.currentTimeMillis()),
Set("order", "cancelled")
)
case OrderShipped(orderId, trackingNumber) =>
Tagged(event, Set("order", "shipped"))
case other => other
}
override def fromJournal(event: Any, manifest: String): EventSeq = {
manifest match {
// Handle version 2 events (current)
case "order-placed-v2" =>
val journal = event.asInstanceOf[JournalOrderPlaced]
EventSeq.single(OrderPlaced(journal.orderId, journal.items, journal.total))
case "order-cancelled-v2" =>
val journal = event.asInstanceOf[JournalOrderCancelled]
EventSeq.single(OrderCancelled(journal.orderId, journal.reason))
// Handle legacy version 1 events (migration)
case "order-placed-v1" =>
val old = event.asInstanceOf[OldOrderPlaced]
// Migrate old format and potentially split into multiple events
val orderPlaced = OrderPlaced(old.id, old.items, old.total)
if (old.wasExpedited) {
// Split expedited orders into placement + shipping events
EventSeq(
orderPlaced,
OrderShipped(old.id, "EXPEDITED-" + old.id)
)
} else {
EventSeq.single(orderPlaced)
}
case "order-cancelled-v1" =>
val old = event.asInstanceOf[OldOrderCancelled]
EventSeq.single(OrderCancelled(old.orderId, old.cancellationReason))
case _ => EventSeq.empty
}
}
}Event adapters are configured in application.conf:
akka.persistence.journal {
plugin = "akka.persistence.journal.leveldb"
# Event adapter configuration
leveldb {
event-adapters {
order-adapter = "com.example.OrderEventAdapter"
user-adapter = "com.example.UserEventAdapter"
}
event-adapter-bindings {
"com.example.OrderEvent" = order-adapter
"com.example.UserEvent" = user-adapter
}
}
}class ConditionalEventAdapter extends EventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = {
event match {
case problematicEvent: ProblematicEvent if shouldSkip(problematicEvent) =>
// Skip problematic events during recovery
EventSeq.empty
case validEvent: ValidEvent if shouldUpgrade(validEvent) =>
// Upgrade event to newer version
EventSeq.single(upgradeEvent(validEvent))
case batchEvent: BatchEvent =>
// Split batch events into individual events
EventSeq(batchEvent.events: _*)
case other =>
EventSeq.single(other)
}
}
}class FilteringEventAdapter extends ReadEventAdapter {
override def fromJournal(event: Any, manifest: String): EventSeq = {
event match {
case SensitiveEvent(data) if shouldRedact(data) =>
// Redact sensitive information
EventSeq.single(SensitiveEvent(redact(data)))
case DeprecatedEvent(info) =>
// Transform deprecated events to new format
EventSeq.single(NewFormatEvent.fromDeprecated(info))
case InvalidEvent(_) =>
// Skip invalid events entirely
EventSeq.empty
case other => EventSeq.single(other)
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13