Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The Message Passing system provides Scala-based message definitions for actor-based communication within the Flink runtime cluster. These message classes define the communication protocols between JobManagers, TaskManagers, and clients, enabling distributed coordination and control operations across the cluster.
Scala object containing message definitions for JobManager communication in the actor system.
object JobManagerMessages {
// Job submission and lifecycle messages
case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID
case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
case class RescaleJob(jobID: JobID, newParallelism: Int) extends RequiresLeaderSessionID
// Job status and monitoring messages
case class RequestJobStatus(jobID: JobID)
case class JobStatusResponse(jobID: JobID, status: JobStatus, timestamp: Long)
case class RequestRunningJobs()
case class RunningJobs(runningJobs: Iterable[ExecutionGraph])
case class RequestJobDetails(jobID: JobID)
case class JobDetails(jobID: JobID, jobName: String, startTime: Long, endTime: Long,
status: JobStatus, lastModification: Long)
// Execution graph access messages
case class RequestExecutionGraph(jobID: JobID)
case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph)
case class ExecutionGraphNotFound(jobID: JobID)
// Savepoint and checkpoint messages
case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String]) extends RequiresLeaderSessionID
case class SavepointSuccess(jobID: JobID, savepointPath: String, timestamp: Long)
case class SavepointFailure(jobID: JobID, cause: Throwable)
case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID
case class DisposeSavepointSuccess(savepointPath: String)
case class DisposeSavepointFailure(savepointPath: String, cause: Throwable)
// Accumulator and metrics messages
case class RequestAccumulators(jobID: JobID)
case class AccumulatorResultsFound(jobID: JobID, userAccumulators: Map[String, OptionalFailure[AnyRef]])
case class AccumulatorResultsNotFound(jobID: JobID)
// Configuration and cluster information
case object RequestClusterStatus
case class ClusterStatusWithTaskManagerInfo(
numTaskManagers: Int,
numSlotsTotal: Int,
numSlotsAvailable: Int,
taskManagerInfos: Iterable[TaskManagerInfo]
)
case object RequestClusterConfiguration
case class ClusterConfiguration(config: Configuration)
// Response messages
case class JobSubmitSuccess(jobID: JobID)
case class JobResultSuccess(result: JobExecutionResult)
case class JobResultFailure(cause: Throwable)
// Listening behaviour for job submissions
sealed trait ListeningBehaviour
case object ListeningBehaviour {
case object DETACHED extends ListeningBehaviour
case object EXECUTION_RESULT extends ListeningBehaviour
case object EXECUTION_RESULT_AND_STATE_CHANGES extends ListeningBehaviour
}
}object JobManagerMessages {
// Leader election and coordination
case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID
case object RequestLeaderSessionID
case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
// Resource management coordination
case class RegisterTaskManager(
taskManagerLocation: TaskManagerLocation,
resourceID: ResourceID,
dataPort: Int,
hardwareDescription: HardwareDescription
)
case class TaskManagerRegistrationSuccess(
resourceID: ResourceID,
blobPort: Int
)
case class TaskManagerRegistrationRejection(
resourceID: ResourceID,
reason: String
)
// Heartbeat messages
case class Heartbeat(from: ResourceID, heartbeatPayload: HeartbeatPayload)
case class HeartbeatResponse(from: ResourceID)
// Shutdown and cleanup
case object RequestShutdown
case class Acknowledge() extends AcknowledgeMessage
}Scala object containing message definitions for TaskManager communication.
object TaskManagerMessages {
// Task deployment and lifecycle
case class SubmitTask(tdd: TaskDeploymentDescriptor) extends RequiresLeaderSessionID
case class TaskSubmitted(executionAttemptID: ExecutionAttemptID)
case class TaskFailed(executionAttemptID: ExecutionAttemptID, cause: Throwable)
case class TaskFinished(executionAttemptID: ExecutionAttemptID, executionResult: ExecutionResult)
case class CancelTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
case class TaskCanceled(executionAttemptID: ExecutionAttemptID)
case class StopTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
case class TaskStopped(executionAttemptID: ExecutionAttemptID)
// Task status and monitoring
case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) extends RequiresLeaderSessionID
case class RequestTaskManagerStatus()
case class TaskManagerStatusResponse(
resourceID: ResourceID,
taskManagerLocation: TaskManagerLocation,
numberOfSlots: Int,
numberOfAvailableSlots: Int,
hardwareDescription: HardwareDescription
)
case class RequestRunningTasks()
case class RunningTasks(runningTasks: Collection[ExecutionAttemptID])
// Resource and slot management
case class RequestSlot(
slotID: SlotID,
jobID: JobID,
allocationID: AllocationID,
resourceProfile: ResourceProfile,
targetAddress: String
) extends RequiresLeaderSessionID
case class SlotOffer(
allocationID: AllocationID,
slotID: SlotID,
resourceProfile: ResourceProfile
)
case class FreeSlot(allocationID: AllocationID) extends RequiresLeaderSessionID
case class SlotFreed(allocationID: AllocationID)
// Checkpoint coordination
case class TriggerCheckpoint(
executionAttemptID: ExecutionAttemptID,
checkpointID: Long,
timestamp: Long,
checkpointOptions: CheckpointOptions
) extends RequiresLeaderSessionID
case class ConfirmCheckpoint(
executionAttemptID: ExecutionAttemptID,
checkpointID: Long,
checkpointMetrics: CheckpointMetrics,
subtaskState: TaskStateSnapshot
) extends RequiresLeaderSessionID
case class DeclineCheckpoint(
executionAttemptID: ExecutionAttemptID,
checkpointID: Long,
reason: CheckpointDeclineReason
) extends RequiresLeaderSessionID
// Registration and heartbeat
case class RegisterAtJobManager(
resourceID: ResourceID,
taskManagerLocation: TaskManagerLocation,
hardwareDescription: HardwareDescription
)
case class SendHeartbeat(resourceID: ResourceID, heartbeatPayload: AccumulatorSnapshot)
// Error handling and failure
case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)
case class FailTask(executionAttemptID: ExecutionAttemptID, cause: Throwable)
// Stack trace and debugging
case class SendStackTrace()
case class StackTrace(stackTrace: Array[StackTraceElement])
case class RequestStackTrace(executionAttemptID: ExecutionAttemptID)
case class StackTraceResponse(executionAttemptID: ExecutionAttemptID, stackTrace: Array[StackTraceElement])
}object TaskManagerMessages {
// Memory and resource management
case object RequestMemoryReport
case class MemoryReport(
managedMemory: Long,
totalMemory: Long,
availableMemory: Long,
gcStats: Array[GarbageCollectorStats]
)
case object RequestIOReport
case class IOReport(
diskSpaceInfo: Array[DiskSpaceInfo],
networkIOMetrics: NetworkIOMetrics
)
// Configuration and environment
case object RequestTaskManagerConfiguration
case class TaskManagerConfiguration(config: Configuration, tempDirectories: Array[File])
case object RequestEnvironmentInformation
case class EnvironmentInformation(
hostname: String,
taskManagerAddress: String,
dataPort: Int,
numberOfSlots: Int
)
// Disconnect and shutdown
case class Disconnect(message: String)
case object NotifyWhenRegisteredAtJobManager
case object RegisteredAtJobManager
case class TaskManagerShutdown()
}Scala object containing message definitions for job client communication.
object JobClientMessages {
// Job submission from client
case class SubmitJobAndWait(
jobGraph: JobGraph,
listenToExecutionResult: Boolean,
leaderSessionTimeout: FiniteDuration
)
case class SubmitJobDetached(jobGraph: JobGraph)
// Job control operations
case class CancelJob(jobID: JobID)
case class CancelJobWithSavepoint(jobID: JobID, savepointDirectory: String)
case class StopJob(jobID: JobID)
// Status and monitoring requests
case class RequestJobStatus(jobID: JobID)
case class RequestBlobManagerPort()
case class BlobManagerPort(port: Int)
case class RequestExecutionResult(jobID: JobID)
// Savepoint operations from client
case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String])
case class RequestSavepoint(jobID: JobID, savepointDirectory: String)
case class DisposeSavepoint(savepointPath: String)
// Configuration queries
case object RequestClusterConfiguration
case object RequestClusterStatus
// Response messages
case class JobExecutionResult(result: SerializedJobExecutionResult)
case class JobExecutionException(exception: SerializedThrowable)
case class JobStatusAnswer(jobID: JobID, status: JobStatus)
case class JobNotFound(jobID: JobID)
case class SavepointSuccess(jobID: JobID, savepointPath: String, triggerTime: Long)
case class SavepointFailure(jobID: JobID, cause: SerializedThrowable)
// Connection and session management
case object GetLeaderSessionID
case class LeaderSessionID(leaderSessionID: Option[UUID])
case class JobClientActorConnectionTimeoutException(message: String) extends Exception(message)
}Messages for ResourceManager communication and coordination.
object ResourceManagerMessages {
// Task Manager registration
case class RegisterTaskExecutor(
resourceID: ResourceID,
taskExecutorAddress: String,
dataPort: Int,
numberOfSlots: Int,
hardwareDescription: HardwareDescription
)
case class TaskExecutorRegistrationSuccess(
resourceManagerId: ResourceManagerId,
heartbeatInterval: Time
)
case class TaskExecutorRegistrationRejection(reason: String)
// Slot requests and allocation
case class RequestSlot(
jobMasterId: JobMasterId,
slotRequest: SlotRequest
)
case class SlotRequestRegistered(
slotRequestId: SlotRequestId,
targetAddress: String
)
case class SlotRequestFailed(
slotRequestId: SlotRequestId,
cause: Throwable
)
case class CancelSlotRequest(slotRequestId: SlotRequestId)
// Resource allocation and management
case class NotifyResourceAvailable(
resourceID: ResourceID,
availableSlots: Int
)
case class RequestResourceOverview()
case class ResourceOverview(
numberTaskManagers: Int,
numberRegisteredSlots: Int,
numberFreeSlots: Int
)
// JobManager coordination
case class RegisterJobManager(
jobMasterId: JobMasterId,
resourceID: ResourceID,
jobManagerAddress: String,
jobID: JobID
)
case class JobManagerRegistrationSuccess(heartbeatInterval: Time)
case class JobManagerRegistrationRejection(reason: String)
// Heartbeat and monitoring
case class TaskManagerHeartbeat(
resourceID: ResourceID,
slotReport: SlotReport
)
case class JobManagerHeartbeat(resourceID: ResourceID)
// Disconnection and cleanup
case class DisconnectTaskManager(resourceID: ResourceID, cause: Exception)
case class DisconnectJobManager(jobID: JobID, cause: Exception)
// Resource allocation requests
case class StartNewTaskManager(resourceProfile: ResourceProfile)
case class StopTaskManager(resourceID: ResourceID)
}// Base traits for message classification
trait RequiresLeaderSessionID {
def leaderSessionID: UUID
}
trait AcknowledgeMessage
sealed trait JobManagerMessage
sealed trait TaskManagerMessage
sealed trait ClientMessage
// Message decorators and wrappers
case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID
case class MessageDecorator[T](
target: ActorRef,
message: T,
timeout: FiniteDuration
)
// Response handling
sealed trait ResponseMessage
case class Success[T](result: T) extends ResponseMessage
case class Failure(cause: Throwable) extends ResponseMessageobject MessageSerializationUtils {
// Serialization helpers for complex message payloads
def serializeJobExecutionResult(result: JobExecutionResult): SerializedJobExecutionResult = {
new SerializedJobExecutionResult(
result.getJobID,
result.getNetRuntime,
ClassLoaderUtils.withContextClassLoader(
result.getClass.getClassLoader,
() => result.getAllAccumulatorResults
)
)
}
def serializeThrowable(throwable: Throwable): SerializedThrowable = {
new SerializedThrowable(throwable, throwable.getClass.getClassLoader)
}
def deserializeJobExecutionResult(
serialized: SerializedJobExecutionResult,
classLoader: ClassLoader
): JobExecutionResult = {
new JobExecutionResult(
serialized.getJobId,
serialized.getNetRuntime,
ClassLoaderUtils.withContextClassLoader(classLoader, serialized.getSerializedAccumulators)
)
}
}import akka.actor.{Actor, ActorLogging, ActorRef}
import org.apache.flink.runtime.messages.JobManagerMessages._
import java.util.UUID
class JobManagerActor extends Actor with ActorLogging {
private var leaderSessionID: Option[UUID] = None
private val runningJobs = mutable.Map[JobID, ExecutionGraph]()
override def receive: Receive = {
case SubmitJob(jobGraph, listeningBehaviour) =>
log.info(s"Received job submission: ${jobGraph.getJobID}")
try {
// Validate leader session
validateLeaderSession(sender())
// Create execution graph
val executionGraph = createExecutionGraph(jobGraph)
runningJobs += jobGraph.getJobID -> executionGraph
// Start job execution
executionGraph.scheduleForExecution()
sender() ! JobSubmitSuccess(jobGraph.getJobID)
// Handle listening behaviour
listeningBehaviour match {
case ListeningBehaviour.EXECUTION_RESULT =>
// Register for execution result notifications
registerForJobResult(jobGraph.getJobID, sender())
case ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES =>
// Register for both result and state change notifications
registerForJobUpdates(jobGraph.getJobID, sender())
case ListeningBehaviour.DETACHED =>
// No further notifications needed
}
} catch {
case ex: Exception =>
log.error(ex, s"Failed to submit job ${jobGraph.getJobID}")
sender() ! JobResultFailure(ex)
}
case CancelJob(jobID) =>
log.info(s"Received cancel request for job: $jobID")
runningJobs.get(jobID) match {
case Some(executionGraph) =>
try {
executionGraph.cancel()
sender() ! Acknowledge()
} catch {
case ex: Exception =>
sender() ! JobResultFailure(ex)
}
case None =>
sender() ! JobResultFailure(new JobNotFoundException(jobID))
}
case RequestJobStatus(jobID) =>
runningJobs.get(jobID) match {
case Some(executionGraph) =>
val status = executionGraph.getState
val timestamp = executionGraph.getStatusTimestamp(status)
sender() ! JobStatusResponse(jobID, status, timestamp)
case None =>
sender() ! JobResultFailure(new JobNotFoundException(jobID))
}
case RequestRunningJobs() =>
sender() ! RunningJobs(runningJobs.values)
case TriggerSavepoint(jobID, savepointDirectory) =>
log.info(s"Triggering savepoint for job $jobID")
runningJobs.get(jobID) match {
case Some(executionGraph) =>
val savepointFuture = executionGraph.triggerSavepoint(savepointDirectory.orNull)
savepointFuture.whenComplete { (savepointPath, throwable) =>
if (throwable != null) {
sender() ! SavepointFailure(jobID, throwable)
} else {
sender() ! SavepointSuccess(jobID, savepointPath, System.currentTimeMillis())
}
}
case None =>
sender() ! SavepointFailure(jobID, new JobNotFoundException(jobID))
}
case RequestLeaderSessionID =>
sender() ! ResponseLeaderSessionID(leaderSessionID)
case LeaderSessionMessage(sessionID, message) =>
if (leaderSessionID.contains(sessionID)) {
self.tell(message, sender())
} else {
log.warning(s"Received message with invalid leader session ID: $sessionID")
sender() ! JobResultFailure(new LeaderSessionIDException("Invalid leader session ID"))
}
}
private def validateLeaderSession(requester: ActorRef): Unit = {
if (leaderSessionID.isEmpty) {
throw new LeaderSessionIDException("No active leader session")
}
}
private def createExecutionGraph(jobGraph: JobGraph): ExecutionGraph = {
// Implementation for creating execution graph from job graph
// This would involve setting up vertices, scheduling, etc.
new ExecutionGraph(/* parameters */)
}
private def registerForJobResult(jobID: JobID, client: ActorRef): Unit = {
// Register client to receive job execution results
}
private def registerForJobUpdates(jobID: JobID, client: ActorRef): Unit = {
// Register client to receive job state changes and results
}
}import akka.actor.{Actor, ActorLogging}
import org.apache.flink.runtime.messages.TaskManagerMessages._
class TaskManagerActor extends Actor with ActorLogging {
private val runningTasks = mutable.Map[ExecutionAttemptID, Task]()
private var jobManagerRef: Option[ActorRef] = None
private var isRegistered = false
override def receive: Receive = {
case RegisterAtJobManager(resourceID, taskManagerLocation, hardwareDescription) =>
log.info(s"Registering at JobManager: ${sender()}")
jobManagerRef = Some(sender())
// Send registration message to JobManager
sender() ! RegisterTaskManager(
taskManagerLocation,
resourceID,
taskManagerLocation.getDataPort,
hardwareDescription
)
case SubmitTask(taskDeploymentDescriptor) =>
val executionAttemptID = taskDeploymentDescriptor.getExecutionAttemptId
log.info(s"Received task submission: $executionAttemptID")
try {
// Create and start task
val task = createTask(taskDeploymentDescriptor)
runningTasks += executionAttemptID -> task
// Start task execution
task.startTaskThread()
sender() ! TaskSubmitted(executionAttemptID)
} catch {
case ex: Exception =>
log.error(ex, s"Failed to submit task $executionAttemptID")
sender() ! TaskFailed(executionAttemptID, ex)
}
case CancelTask(executionAttemptID) =>
log.info(s"Cancelling task: $executionAttemptID")
runningTasks.get(executionAttemptID) match {
case Some(task) =>
try {
task.cancelExecution()
sender() ! TaskCanceled(executionAttemptID)
} catch {
case ex: Exception =>
log.error(ex, s"Failed to cancel task $executionAttemptID")
sender() ! TaskFailed(executionAttemptID, ex)
}
case None =>
log.warning(s"Attempted to cancel unknown task: $executionAttemptID")
sender() ! TaskCanceled(executionAttemptID) // Already not running
}
case TriggerCheckpoint(executionAttemptID, checkpointID, timestamp, checkpointOptions) =>
log.debug(s"Triggering checkpoint $checkpointID for task $executionAttemptID")
runningTasks.get(executionAttemptID) match {
case Some(task) =>
task.triggerCheckpointBarrier(checkpointID, timestamp, checkpointOptions)
case None =>
log.warning(s"Checkpoint triggered for unknown task: $executionAttemptID")
sender() ! DeclineCheckpoint(
executionAttemptID,
checkpointID,
CheckpointDeclineReason.TASK_NOT_RUNNING
)
}
case RequestTaskManagerStatus() =>
val numberOfSlots = getNumberOfSlots()
val availableSlots = getAvailableSlots()
sender() ! TaskManagerStatusResponse(
getResourceID(),
getTaskManagerLocation(),
numberOfSlots,
availableSlots,
getHardwareDescription()
)
case RequestRunningTasks() =>
sender() ! RunningTasks(runningTasks.keys.toList.asJavaCollection)
case SendHeartbeat(resourceID, accumulatorSnapshot) =>
jobManagerRef.foreach { jm =>
jm ! Heartbeat(resourceID, accumulatorSnapshot)
}
case TaskInFinalState(executionAttemptID) =>
log.info(s"Task finished: $executionAttemptID")
runningTasks.remove(executionAttemptID)
// Notify JobManager about task completion
jobManagerRef.foreach { jm =>
jm ! UpdateTaskExecutionState(createTaskExecutionState(executionAttemptID))
}
}
private def createTask(tdd: TaskDeploymentDescriptor): Task = {
// Implementation for creating a task from deployment descriptor
new Task(/* parameters from tdd */)
}
private def getNumberOfSlots(): Int = {
// Return total number of task slots
4 // Example
}
private def getAvailableSlots(): Int = {
// Return number of available task slots
getNumberOfSlots() - runningTasks.size
}
private def createTaskExecutionState(executionAttemptID: ExecutionAttemptID): TaskExecutionState = {
// Create task execution state for reporting
new TaskExecutionState(
executionAttemptID.getJobId,
executionAttemptID,
ExecutionState.FINISHED,
null // no error
)
}
}import akka.actor.{Actor, ActorRef, Props, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.flink.runtime.messages.JobClientMessages._
import scala.concurrent.duration._
import scala.util.{Success, Failure}
class FlinkJobClient(jobManagerRef: ActorRef)(implicit system: ActorSystem) {
implicit val timeout: Timeout = 30.seconds
implicit val ec = system.dispatcher
def submitJob(jobGraph: JobGraph, detached: Boolean = false): Unit = {
val message = if (detached) {
SubmitJobDetached(jobGraph)
} else {
SubmitJobAndWait(jobGraph, listenToExecutionResult = true, 30.seconds)
}
val resultFuture = jobManagerRef ? message
resultFuture.onComplete {
case Success(JobExecutionResult(result)) =>
println(s"Job ${jobGraph.getJobID} completed successfully")
println(s"Execution time: ${result.getNetRuntime} ms")
case Success(JobExecutionException(exception)) =>
println(s"Job ${jobGraph.getJobID} failed: ${exception.getMessage}")
case Failure(throwable) =>
println(s"Communication error: ${throwable.getMessage}")
}
}
def cancelJob(jobID: JobID): Unit = {
val resultFuture = jobManagerRef ? CancelJob(jobID)
resultFuture.onComplete {
case Success(Acknowledge()) =>
println(s"Job $jobID cancelled successfully")
case Success(JobResultFailure(cause)) =>
println(s"Failed to cancel job $jobID: ${cause.getMessage}")
case Failure(throwable) =>
println(s"Communication error: ${throwable.getMessage}")
}
}
def triggerSavepoint(jobID: JobID, savepointDirectory: Option[String]): Unit = {
val resultFuture = jobManagerRef ? TriggerSavepoint(jobID, savepointDirectory)
resultFuture.onComplete {
case Success(SavepointSuccess(jobId, savepointPath, triggerTime)) =>
println(s"Savepoint created for job $jobId at: $savepointPath")
case Success(SavepointFailure(jobId, cause)) =>
println(s"Savepoint failed for job $jobId: ${cause.getMessage}")
case Failure(throwable) =>
println(s"Communication error: ${throwable.getMessage}")
}
}
def getJobStatus(jobID: JobID): Unit = {
val resultFuture = jobManagerRef ? RequestJobStatus(jobID)
resultFuture.onComplete {
case Success(JobStatusAnswer(jobId, status)) =>
println(s"Job $jobId status: $status")
case Success(JobNotFound(jobId)) =>
println(s"Job $jobId not found")
case Failure(throwable) =>
println(s"Communication error: ${throwable.getMessage}")
}
}
def getClusterStatus(): Unit = {
val resultFuture = jobManagerRef ? RequestClusterStatus
resultFuture.onComplete {
case Success(status) =>
println(s"Cluster status: $status")
case Failure(throwable) =>
println(s"Communication error: ${throwable.getMessage}")
}
}
}trait MessageRouter {
def route(message: Any, sender: ActorRef): Unit = message match {
// Job management messages
case msg: JobManagerMessages.SubmitJob => handleJobSubmission(msg, sender)
case msg: JobManagerMessages.CancelJob => handleJobCancellation(msg, sender)
case msg: JobManagerMessages.RequestJobStatus => handleJobStatusRequest(msg, sender)
// Task management messages
case msg: TaskManagerMessages.SubmitTask => handleTaskSubmission(msg, sender)
case msg: TaskManagerMessages.CancelTask => handleTaskCancellation(msg, sender)
case msg: TaskManagerMessages.UpdateTaskExecutionState => handleTaskStateUpdate(msg, sender)
// Resource management messages
case msg: ResourceManagerMessages.RequestSlot => handleSlotRequest(msg, sender)
case msg: ResourceManagerMessages.RegisterTaskExecutor => handleTaskManagerRegistration(msg, sender)
// Unknown message
case unknown => handleUnknownMessage(unknown, sender)
}
protected def handleJobSubmission(msg: JobManagerMessages.SubmitJob, sender: ActorRef): Unit
protected def handleJobCancellation(msg: JobManagerMessages.CancelJob, sender: ActorRef): Unit
protected def handleJobStatusRequest(msg: JobManagerMessages.RequestJobStatus, sender: ActorRef): Unit
protected def handleTaskSubmission(msg: TaskManagerMessages.SubmitTask, sender: ActorRef): Unit
protected def handleTaskCancellation(msg: TaskManagerMessages.CancelTask, sender: ActorRef): Unit
protected def handleTaskStateUpdate(msg: TaskManagerMessages.UpdateTaskExecutionState, sender: ActorRef): Unit
protected def handleSlotRequest(msg: ResourceManagerMessages.RequestSlot, sender: ActorRef): Unit
protected def handleTaskManagerRegistration(msg: ResourceManagerMessages.RegisterTaskExecutor, sender: ActorRef): Unit
protected def handleUnknownMessage(message: Any, sender: ActorRef): Unit = {
throw new IllegalArgumentException(s"Unknown message type: ${message.getClass}")
}
}trait LeaderSessionManager {
private var currentLeaderSessionID: Option[UUID] = None
def becomeLeader(sessionID: UUID): Unit = {
currentLeaderSessionID = Some(sessionID)
onBecomeLeader(sessionID)
}
def revokeLeadership(): Unit = {
currentLeaderSessionID = None
onRevokeLeadership()
}
def validateLeaderSession(message: RequiresLeaderSessionID): Boolean = {
currentLeaderSessionID.contains(message.leaderSessionID)
}
def handleLeaderSessionMessage(message: Any, sender: ActorRef): Unit = message match {
case LeaderSessionMessage(sessionID, wrappedMessage) =>
if (currentLeaderSessionID.contains(sessionID)) {
handleMessage(wrappedMessage, sender)
} else {
sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
}
case msg: RequiresLeaderSessionID =>
if (validateLeaderSession(msg)) {
handleMessage(msg, sender)
} else {
sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
}
case other =>
handleMessage(other, sender)
}
protected def onBecomeLeader(sessionID: UUID): Unit = {}
protected def onRevokeLeadership(): Unit = {}
protected def handleMessage(message: Any, sender: ActorRef): Unit
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10