Event streaming, logging capabilities, and event bus functionality for system-wide communication and monitoring.
System-wide event publishing and subscription mechanism.
/**
* Event stream for system-wide event distribution
*/
class EventStream extends ActorEventBus with LookupClassification {
def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean
def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean
def unsubscribe(subscriber: ActorRef): Unit
def publish(event: AnyRef): Unit
def startUnsubscriber(): ActorRef
}
/**
* Event bus abstractions
*/
abstract class EventBus {
type Event
type Classifier
type Subscriber
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
def unsubscribe(subscriber: Subscriber): Unit
def publish(event: Event): Unit
}
trait ActorEventBus extends EventBus {
type Subscriber = ActorRef
}Logging interfaces and implementations for different logging backends.
/**
* Core logging interface
*/
trait LoggingAdapter {
def error(message: String): Unit
def error(cause: Throwable, message: String): Unit
def warning(message: String): Unit
def info(message: String): Unit
def debug(message: String): Unit
def isErrorEnabled: Boolean
def isWarningEnabled: Boolean
def isInfoEnabled: Boolean
def isDebugEnabled: Boolean
}
/**
* Bus-based logging adapter
*/
class BusLogging(val bus: EventStream, val logSource: String, val logClass: Class[_], val loggingFilter: LoggingFilter)
extends LoggingAdapter
/**
* Actor logging mixin
*/
trait ActorLogging { this: Actor =>
val log: LoggingAdapter = akka.event.Logging(context.system, this)
}
/**
* Diagnostic logging with MDC support
*/
trait DiagnosticActorLogging { this: Actor =>
val log: DiagnosticLoggingAdapter = akka.event.Logging(context.system, this)
}Usage Examples:
import akka.actor.{Actor, ActorLogging}
import akka.event.Logging
class LoggingActor extends Actor with ActorLogging {
def receive = {
case "info" => log.info("This is an info message")
case "warn" => log.warning("This is a warning")
case "error" => log.error("This is an error")
case ex: Exception => log.error(ex, "Exception occurred")
}
}
// Manual logging
class ManualLoggingActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case msg => log.info(s"Received: $msg")
}
}Standard event types and dead letter handling.
/**
* Dead letter types
*/
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
final case class SuppressedDeadLetter(message: DeadLetterSuppression, sender: ActorRef, recipient: ActorRef)
final case class Dropped(message: Any, reason: String, sender: ActorRef, recipient: ActorRef)
/**
* Dead letter listener
*/
class DeadLetterListener extends Actor {
def receive = {
case DeadLetter(msg, from, to) =>
println(s"Dead letter: $msg from $from to $to")
}
}