CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-persistence-2-13

Event sourcing library for Akka persistence providing comprehensive event sourcing capabilities for building persistent, fault-tolerant applications using the actor model.

Pending
Overview
Eval results
Files

event-adapters.mddocs/

Event Adapters

Event transformation system for schema evolution and event format conversion between domain and journal representations.

Capabilities

EventAdapter Trait

Combined read and write event adapter interface.

/**
 * Combined read and write event adapter interface
 */
trait EventAdapter extends WriteEventAdapter with ReadEventAdapter

WriteEventAdapter

Converts domain events to journal format for persistence.

/**
 * Converts domain events to journal format for storage
 */
trait WriteEventAdapter {
  /** Return manifest string for the event type */
  def manifest(event: Any): String
  
  /** Convert domain event to journal representation */
  def toJournal(event: Any): Any
}

Usage Examples:

import akka.persistence.journal.WriteEventAdapter

class OrderEventAdapter extends WriteEventAdapter {
  override def manifest(event: Any): String = event match {
    case _: OrderPlaced => "order-placed-v2"
    case _: OrderCancelled => "order-cancelled-v1"
    case _ => ""
  }
  
  override def toJournal(event: Any): Any = event match {
    case OrderPlaced(orderId, items, total) =>
      // Convert to journal format with additional metadata
      JournalOrderPlaced(orderId, items, total, System.currentTimeMillis())
    case other => other
  }
}

ReadEventAdapter

Converts journal events back to domain format during recovery.

/**
 * Converts journal events to domain format during recovery
 */
trait ReadEventAdapter {
  /** Convert journal event to domain representation(s) */
  def fromJournal(event: Any, manifest: String): EventSeq
}

Usage Examples:

import akka.persistence.journal.{ReadEventAdapter, EventSeq}

class OrderEventReadAdapter extends ReadEventAdapter {
  override def fromJournal(event: Any, manifest: String): EventSeq = {
    manifest match {
      case "order-placed-v1" => 
        // Migrate old format to new format
        val old = event.asInstanceOf[OldOrderPlaced]
        EventSeq.single(OrderPlaced(old.id, old.items, calculateTotal(old.items)))
        
      case "order-placed-v2" =>
        EventSeq.single(event.asInstanceOf[JournalOrderPlaced].toDomainEvent)
        
      case "order-cancelled-v1" =>
        EventSeq.single(event)
        
      case _ => EventSeq.empty
    }
  }
}

EventSeq Container

Container for adapted events supporting single or multiple event results.

/**
 * Container for adapted events returned from ReadEventAdapter
 */
sealed abstract class EventSeq {
  /** Sequence of adapted events */
  def events: immutable.Seq[Any]
}

object EventSeq {
  /** Empty event sequence */
  def empty: EventSeq
  
  /** Single event sequence */
  def single(event: Any): EventSeq
  
  /** Multiple event sequence */
  def apply(events: Any*): EventSeq
  
  /** Java API for creating event sequences */
  def create(events: Any*): EventSeq
}

EventSeq Implementations

/**
 * Event sequence containing a single event
 */
case class SingleEventSeq(event: Any) extends EventSeq {
  override def events: immutable.Seq[Any] = Vector(event)
}

/**
 * Event sequence containing multiple events
 */
case class EventsSeq[E](events: immutable.Seq[E]) extends EventSeq

IdentityEventAdapter

No-operation adapter that passes events through unchanged.

/**
 * No-op adapter that passes events through unchanged
 */
case object IdentityEventAdapter extends EventAdapter {
  override def manifest(event: Any): String = ""
  override def toJournal(event: Any): Any = event
  override def fromJournal(event: Any, manifest: String): EventSeq = EventSeq.single(event)
}

Event Tagging

Support for tagging events for journal implementations that support it.

/**
 * Wraps events with tags for journal implementations that support tagging
 */
case class Tagged(payload: Any, tags: Set[String]) {
  /** Java constructor */
  def this(payload: Any, tags: java.util.Set[String]) = 
    this(payload, tags.asScala.toSet)
}

Usage Examples:

import akka.persistence.journal.Tagged

class TaggingEventAdapter extends WriteEventAdapter {
  override def manifest(event: Any): String = event.getClass.getSimpleName
  
  override def toJournal(event: Any): Any = event match {
    case OrderPlaced(orderId, items, total) =>
      val tags = Set("order", "placed") ++ 
                (if (total > 1000) Set("high-value") else Set.empty) ++
                items.map(_.category).toSet
      Tagged(event, tags)
      
    case OrderCancelled(orderId, reason) =>
      Tagged(event, Set("order", "cancelled", reason))
      
    case other => other
  }
}

Example: Complete Event Adapter Implementation

import akka.persistence.journal._

// Domain events
sealed trait OrderEvent
case class OrderPlaced(orderId: String, items: List[OrderItem], total: BigDecimal) extends OrderEvent
case class OrderCancelled(orderId: String, reason: String) extends OrderEvent
case class OrderShipped(orderId: String, trackingNumber: String) extends OrderEvent

// Journal representations
case class JournalOrderPlaced(
  orderId: String, 
  items: List[OrderItem], 
  total: BigDecimal,
  timestamp: Long,
  version: Int = 2
)

case class JournalOrderCancelled(
  orderId: String, 
  reason: String,
  timestamp: Long
)

class OrderEventAdapter extends EventAdapter {
  override def manifest(event: Any): String = event match {
    case _: OrderPlaced => "order-placed-v2"
    case _: OrderCancelled => "order-cancelled-v2"
    case _: OrderShipped => "order-shipped-v1" 
    case _ => ""
  }
  
  override def toJournal(event: Any): Any = event match {
    case OrderPlaced(orderId, items, total) =>
      val tags = Set("order", "placed") ++ 
                (if (total > 500) Set("high-value") else Set.empty)
      Tagged(
        JournalOrderPlaced(orderId, items, total, System.currentTimeMillis()),
        tags
      )
      
    case OrderCancelled(orderId, reason) =>
      Tagged(
        JournalOrderCancelled(orderId, reason, System.currentTimeMillis()),
        Set("order", "cancelled")
      )
      
    case OrderShipped(orderId, trackingNumber) =>
      Tagged(event, Set("order", "shipped"))
      
    case other => other
  }
  
  override def fromJournal(event: Any, manifest: String): EventSeq = {
    manifest match {
      // Handle version 2 events (current)
      case "order-placed-v2" =>
        val journal = event.asInstanceOf[JournalOrderPlaced]
        EventSeq.single(OrderPlaced(journal.orderId, journal.items, journal.total))
        
      case "order-cancelled-v2" =>
        val journal = event.asInstanceOf[JournalOrderCancelled]  
        EventSeq.single(OrderCancelled(journal.orderId, journal.reason))
        
      // Handle legacy version 1 events (migration)
      case "order-placed-v1" =>
        val old = event.asInstanceOf[OldOrderPlaced]
        // Migrate old format and potentially split into multiple events
        val orderPlaced = OrderPlaced(old.id, old.items, old.total)
        if (old.wasExpedited) {
          // Split expedited orders into placement + shipping events
          EventSeq(
            orderPlaced,
            OrderShipped(old.id, "EXPEDITED-" + old.id)
          )
        } else {
          EventSeq.single(orderPlaced)
        }
        
      case "order-cancelled-v1" =>
        val old = event.asInstanceOf[OldOrderCancelled]
        EventSeq.single(OrderCancelled(old.orderId, old.cancellationReason))
        
      case _ => EventSeq.empty
    }
  }
}

Adapter Configuration

Event adapters are configured in application.conf:

akka.persistence.journal {
  plugin = "akka.persistence.journal.leveldb"
  
  # Event adapter configuration
  leveldb {
    event-adapters {
      order-adapter = "com.example.OrderEventAdapter"
      user-adapter = "com.example.UserEventAdapter"
    }
    
    event-adapter-bindings {
      "com.example.OrderEvent" = order-adapter
      "com.example.UserEvent" = user-adapter
    }
  }
}

Advanced Patterns

Conditional Event Processing

class ConditionalEventAdapter extends EventAdapter {
  override def fromJournal(event: Any, manifest: String): EventSeq = {
    event match {
      case problematicEvent: ProblematicEvent if shouldSkip(problematicEvent) =>
        // Skip problematic events during recovery
        EventSeq.empty
        
      case validEvent: ValidEvent if shouldUpgrade(validEvent) =>
        // Upgrade event to newer version
        EventSeq.single(upgradeEvent(validEvent))
        
      case batchEvent: BatchEvent =>
        // Split batch events into individual events
        EventSeq(batchEvent.events: _*)
        
      case other =>
        EventSeq.single(other)
    }
  }
}

Event Filtering and Transformation

class FilteringEventAdapter extends ReadEventAdapter {
  override def fromJournal(event: Any, manifest: String): EventSeq = {
    event match {
      case SensitiveEvent(data) if shouldRedact(data) =>
        // Redact sensitive information
        EventSeq.single(SensitiveEvent(redact(data)))
        
      case DeprecatedEvent(info) =>
        // Transform deprecated events to new format
        EventSeq.single(NewFormatEvent.fromDeprecated(info))
        
      case InvalidEvent(_) =>
        // Skip invalid events entirely
        EventSeq.empty
        
      case other => EventSeq.single(other)
    }
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-persistence-2-13

docs

at-least-once-delivery.md

durable-state.md

event-adapters.md

index.md

journal-api.md

persistent-actors.md

plugin-development.md

snapshots.md

tile.json