CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

message-passing.mddocs/

Message Passing (Scala APIs)

The Message Passing system provides Scala-based message definitions for actor-based communication within the Flink runtime cluster. These message classes define the communication protocols between JobManagers, TaskManagers, and clients, enabling distributed coordination and control operations across the cluster.

JobManager Messages

JobManagerMessages

Scala object containing message definitions for JobManager communication in the actor system.

object JobManagerMessages {
  // Job submission and lifecycle messages
  case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID
  case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
  case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
  case class RescaleJob(jobID: JobID, newParallelism: Int) extends RequiresLeaderSessionID
  
  // Job status and monitoring messages
  case class RequestJobStatus(jobID: JobID)
  case class JobStatusResponse(jobID: JobID, status: JobStatus, timestamp: Long)
  
  case class RequestRunningJobs()
  case class RunningJobs(runningJobs: Iterable[ExecutionGraph])
  
  case class RequestJobDetails(jobID: JobID)
  case class JobDetails(jobID: JobID, jobName: String, startTime: Long, endTime: Long, 
                       status: JobStatus, lastModification: Long)
  
  // Execution graph access messages
  case class RequestExecutionGraph(jobID: JobID)
  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph)
  case class ExecutionGraphNotFound(jobID: JobID)
  
  // Savepoint and checkpoint messages
  case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String]) extends RequiresLeaderSessionID
  case class SavepointSuccess(jobID: JobID, savepointPath: String, timestamp: Long)
  case class SavepointFailure(jobID: JobID, cause: Throwable)
  
  case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID
  case class DisposeSavepointSuccess(savepointPath: String)
  case class DisposeSavepointFailure(savepointPath: String, cause: Throwable)
  
  // Accumulator and metrics messages
  case class RequestAccumulators(jobID: JobID)
  case class AccumulatorResultsFound(jobID: JobID, userAccumulators: Map[String, OptionalFailure[AnyRef]])
  case class AccumulatorResultsNotFound(jobID: JobID)
  
  // Configuration and cluster information
  case object RequestClusterStatus
  case class ClusterStatusWithTaskManagerInfo(
    numTaskManagers: Int,
    numSlotsTotal: Int,
    numSlotsAvailable: Int,
    taskManagerInfos: Iterable[TaskManagerInfo]
  )
  
  case object RequestClusterConfiguration  
  case class ClusterConfiguration(config: Configuration)
  
  // Response messages
  case class JobSubmitSuccess(jobID: JobID)
  case class JobResultSuccess(result: JobExecutionResult)
  case class JobResultFailure(cause: Throwable)
  
  // Listening behaviour for job submissions
  sealed trait ListeningBehaviour
  case object ListeningBehaviour {
    case object DETACHED extends ListeningBehaviour
    case object EXECUTION_RESULT extends ListeningBehaviour  
    case object EXECUTION_RESULT_AND_STATE_CHANGES extends ListeningBehaviour
  }
}

JobManager Coordination Messages

object JobManagerMessages {
  // Leader election and coordination
  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID
  
  case object RequestLeaderSessionID
  case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
  
  // Resource management coordination
  case class RegisterTaskManager(
    taskManagerLocation: TaskManagerLocation,
    resourceID: ResourceID,
    dataPort: Int,
    hardwareDescription: HardwareDescription
  )
  
  case class TaskManagerRegistrationSuccess(
    resourceID: ResourceID, 
    blobPort: Int
  )
  
  case class TaskManagerRegistrationRejection(
    resourceID: ResourceID, 
    reason: String
  )
  
  // Heartbeat messages
  case class Heartbeat(from: ResourceID, heartbeatPayload: HeartbeatPayload)
  case class HeartbeatResponse(from: ResourceID)
  
  // Shutdown and cleanup
  case object RequestShutdown
  case class Acknowledge() extends AcknowledgeMessage
}

TaskManager Messages

TaskManagerMessages

Scala object containing message definitions for TaskManager communication.

object TaskManagerMessages {
  // Task deployment and lifecycle
  case class SubmitTask(tdd: TaskDeploymentDescriptor) extends RequiresLeaderSessionID
  case class TaskSubmitted(executionAttemptID: ExecutionAttemptID)
  case class TaskFailed(executionAttemptID: ExecutionAttemptID, cause: Throwable)
  case class TaskFinished(executionAttemptID: ExecutionAttemptID, executionResult: ExecutionResult)
  
  case class CancelTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
  case class TaskCanceled(executionAttemptID: ExecutionAttemptID)
  
  case class StopTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
  case class TaskStopped(executionAttemptID: ExecutionAttemptID)
  
  // Task status and monitoring
  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) extends RequiresLeaderSessionID
  
  case class RequestTaskManagerStatus()
  case class TaskManagerStatusResponse(
    resourceID: ResourceID,
    taskManagerLocation: TaskManagerLocation,
    numberOfSlots: Int,
    numberOfAvailableSlots: Int,
    hardwareDescription: HardwareDescription
  )
  
  case class RequestRunningTasks()
  case class RunningTasks(runningTasks: Collection[ExecutionAttemptID])
  
  // Resource and slot management  
  case class RequestSlot(
    slotID: SlotID,
    jobID: JobID,
    allocationID: AllocationID,
    resourceProfile: ResourceProfile,
    targetAddress: String
  ) extends RequiresLeaderSessionID
  
  case class SlotOffer(
    allocationID: AllocationID,
    slotID: SlotID,
    resourceProfile: ResourceProfile
  )
  
  case class FreeSlot(allocationID: AllocationID) extends RequiresLeaderSessionID
  case class SlotFreed(allocationID: AllocationID)
  
  // Checkpoint coordination
  case class TriggerCheckpoint(
    executionAttemptID: ExecutionAttemptID,
    checkpointID: Long,
    timestamp: Long,
    checkpointOptions: CheckpointOptions
  ) extends RequiresLeaderSessionID
  
  case class ConfirmCheckpoint(
    executionAttemptID: ExecutionAttemptID,
    checkpointID: Long,
    checkpointMetrics: CheckpointMetrics,
    subtaskState: TaskStateSnapshot
  ) extends RequiresLeaderSessionID
  
  case class DeclineCheckpoint(
    executionAttemptID: ExecutionAttemptID,
    checkpointID: Long,
    reason: CheckpointDeclineReason
  ) extends RequiresLeaderSessionID
  
  // Registration and heartbeat
  case class RegisterAtJobManager(
    resourceID: ResourceID,
    taskManagerLocation: TaskManagerLocation,
    hardwareDescription: HardwareDescription
  )
  
  case class SendHeartbeat(resourceID: ResourceID, heartbeatPayload: AccumulatorSnapshot)
  
  // Error handling and failure
  case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)
  case class FailTask(executionAttemptID: ExecutionAttemptID, cause: Throwable)
  
  // Stack trace and debugging
  case class SendStackTrace()
  case class StackTrace(stackTrace: Array[StackTraceElement])
  
  case class RequestStackTrace(executionAttemptID: ExecutionAttemptID)
  case class StackTraceResponse(executionAttemptID: ExecutionAttemptID, stackTrace: Array[StackTraceElement])
}

TaskManager Coordination Messages

object TaskManagerMessages {
  // Memory and resource management
  case object RequestMemoryReport
  case class MemoryReport(
    managedMemory: Long,
    totalMemory: Long,
    availableMemory: Long,
    gcStats: Array[GarbageCollectorStats]
  )
  
  case object RequestIOReport  
  case class IOReport(
    diskSpaceInfo: Array[DiskSpaceInfo],
    networkIOMetrics: NetworkIOMetrics
  )
  
  // Configuration and environment
  case object RequestTaskManagerConfiguration
  case class TaskManagerConfiguration(config: Configuration, tempDirectories: Array[File])
  
  case object RequestEnvironmentInformation
  case class EnvironmentInformation(
    hostname: String,
    taskManagerAddress: String,
    dataPort: Int,
    numberOfSlots: Int
  )
  
  // Disconnect and shutdown
  case class Disconnect(message: String)
  case object NotifyWhenRegisteredAtJobManager
  case object RegisteredAtJobManager
  
  case class TaskManagerShutdown()
}

JobClient Messages

JobClientMessages

Scala object containing message definitions for job client communication.

object JobClientMessages {
  // Job submission from client
  case class SubmitJobAndWait(
    jobGraph: JobGraph,
    listenToExecutionResult: Boolean,
    leaderSessionTimeout: FiniteDuration
  )
  
  case class SubmitJobDetached(jobGraph: JobGraph)
  
  // Job control operations
  case class CancelJob(jobID: JobID)
  case class CancelJobWithSavepoint(jobID: JobID, savepointDirectory: String)
  case class StopJob(jobID: JobID)
  
  // Status and monitoring requests
  case class RequestJobStatus(jobID: JobID)
  case class RequestBlobManagerPort()
  case class BlobManagerPort(port: Int)
  
  case class RequestExecutionResult(jobID: JobID)
  
  // Savepoint operations from client
  case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String])
  case class RequestSavepoint(jobID: JobID, savepointDirectory: String)
  
  case class DisposeSavepoint(savepointPath: String)
  
  // Configuration queries
  case object RequestClusterConfiguration
  case object RequestClusterStatus
  
  // Response messages
  case class JobExecutionResult(result: SerializedJobExecutionResult)
  case class JobExecutionException(exception: SerializedThrowable)
  
  case class JobStatusAnswer(jobID: JobID, status: JobStatus)
  case class JobNotFound(jobID: JobID)
  
  case class SavepointSuccess(jobID: JobID, savepointPath: String, triggerTime: Long)
  case class SavepointFailure(jobID: JobID, cause: SerializedThrowable)
  
  // Connection and session management
  case object GetLeaderSessionID
  case class LeaderSessionID(leaderSessionID: Option[UUID])
  
  case class JobClientActorConnectionTimeoutException(message: String) extends Exception(message)
}

Resource Manager Messages

ResourceManagerMessages

Messages for ResourceManager communication and coordination.

object ResourceManagerMessages {
  // Task Manager registration
  case class RegisterTaskExecutor(
    resourceID: ResourceID,
    taskExecutorAddress: String,
    dataPort: Int,
    numberOfSlots: Int,
    hardwareDescription: HardwareDescription
  )
  
  case class TaskExecutorRegistrationSuccess(
    resourceManagerId: ResourceManagerId,
    heartbeatInterval: Time
  )
  
  case class TaskExecutorRegistrationRejection(reason: String)
  
  // Slot requests and allocation
  case class RequestSlot(
    jobMasterId: JobMasterId,
    slotRequest: SlotRequest
  )
  
  case class SlotRequestRegistered(
    slotRequestId: SlotRequestId,
    targetAddress: String
  )
  
  case class SlotRequestFailed(
    slotRequestId: SlotRequestId,
    cause: Throwable
  )
  
  case class CancelSlotRequest(slotRequestId: SlotRequestId)
  
  // Resource allocation and management
  case class NotifyResourceAvailable(
    resourceID: ResourceID,
    availableSlots: Int
  )
  
  case class RequestResourceOverview()
  case class ResourceOverview(
    numberTaskManagers: Int,
    numberRegisteredSlots: Int,
    numberFreeSlots: Int
  )
  
  // JobManager coordination
  case class RegisterJobManager(
    jobMasterId: JobMasterId,
    resourceID: ResourceID,
    jobManagerAddress: String,
    jobID: JobID
  )
  
  case class JobManagerRegistrationSuccess(heartbeatInterval: Time)
  case class JobManagerRegistrationRejection(reason: String)
  
  // Heartbeat and monitoring
  case class TaskManagerHeartbeat(
    resourceID: ResourceID,
    slotReport: SlotReport
  )
  
  case class JobManagerHeartbeat(resourceID: ResourceID)
  
  // Disconnection and cleanup
  case class DisconnectTaskManager(resourceID: ResourceID, cause: Exception)
  case class DisconnectJobManager(jobID: JobID, cause: Exception)
  
  // Resource allocation requests
  case class StartNewTaskManager(resourceProfile: ResourceProfile)
  case class StopTaskManager(resourceID: ResourceID)
}

Common Message Traits and Utilities

Base Message Traits

// Base traits for message classification
trait RequiresLeaderSessionID {
  def leaderSessionID: UUID
}

trait AcknowledgeMessage

sealed trait JobManagerMessage
sealed trait TaskManagerMessage  
sealed trait ClientMessage

// Message decorators and wrappers
case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID

case class MessageDecorator[T](
  target: ActorRef,
  message: T,
  timeout: FiniteDuration
)

// Response handling
sealed trait ResponseMessage
case class Success[T](result: T) extends ResponseMessage
case class Failure(cause: Throwable) extends ResponseMessage

Message Serialization Support

object MessageSerializationUtils {
  // Serialization helpers for complex message payloads
  def serializeJobExecutionResult(result: JobExecutionResult): SerializedJobExecutionResult = {
    new SerializedJobExecutionResult(
      result.getJobID,
      result.getNetRuntime,
      ClassLoaderUtils.withContextClassLoader(
        result.getClass.getClassLoader,
        () => result.getAllAccumulatorResults
      )
    )
  }
  
  def serializeThrowable(throwable: Throwable): SerializedThrowable = {
    new SerializedThrowable(throwable, throwable.getClass.getClassLoader)
  }
  
  def deserializeJobExecutionResult(
    serialized: SerializedJobExecutionResult,
    classLoader: ClassLoader
  ): JobExecutionResult = {
    new JobExecutionResult(
      serialized.getJobId,
      serialized.getNetRuntime,
      ClassLoaderUtils.withContextClassLoader(classLoader, serialized.getSerializedAccumulators)
    )
  }
}

Usage Examples

JobManager Message Handling

import akka.actor.{Actor, ActorLogging, ActorRef}
import org.apache.flink.runtime.messages.JobManagerMessages._
import java.util.UUID

class JobManagerActor extends Actor with ActorLogging {
  private var leaderSessionID: Option[UUID] = None
  private val runningJobs = mutable.Map[JobID, ExecutionGraph]()
  
  override def receive: Receive = {
    case SubmitJob(jobGraph, listeningBehaviour) =>
      log.info(s"Received job submission: ${jobGraph.getJobID}")
      
      try {
        // Validate leader session
        validateLeaderSession(sender())
        
        // Create execution graph
        val executionGraph = createExecutionGraph(jobGraph)
        runningJobs += jobGraph.getJobID -> executionGraph
        
        // Start job execution
        executionGraph.scheduleForExecution()
        
        sender() ! JobSubmitSuccess(jobGraph.getJobID)
        
        // Handle listening behaviour
        listeningBehaviour match {
          case ListeningBehaviour.EXECUTION_RESULT =>
            // Register for execution result notifications
            registerForJobResult(jobGraph.getJobID, sender())
          case ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES =>
            // Register for both result and state change notifications
            registerForJobUpdates(jobGraph.getJobID, sender())
          case ListeningBehaviour.DETACHED =>
            // No further notifications needed
        }
        
      } catch {
        case ex: Exception =>
          log.error(ex, s"Failed to submit job ${jobGraph.getJobID}")
          sender() ! JobResultFailure(ex)
      }
    
    case CancelJob(jobID) =>
      log.info(s"Received cancel request for job: $jobID")
      
      runningJobs.get(jobID) match {
        case Some(executionGraph) =>
          try {
            executionGraph.cancel()
            sender() ! Acknowledge()
          } catch {
            case ex: Exception =>
              sender() ! JobResultFailure(ex)
          }
        case None =>
          sender() ! JobResultFailure(new JobNotFoundException(jobID))
      }
    
    case RequestJobStatus(jobID) =>
      runningJobs.get(jobID) match {
        case Some(executionGraph) =>
          val status = executionGraph.getState
          val timestamp = executionGraph.getStatusTimestamp(status)
          sender() ! JobStatusResponse(jobID, status, timestamp)
        case None =>
          sender() ! JobResultFailure(new JobNotFoundException(jobID))
      }
    
    case RequestRunningJobs() =>
      sender() ! RunningJobs(runningJobs.values)
    
    case TriggerSavepoint(jobID, savepointDirectory) =>
      log.info(s"Triggering savepoint for job $jobID")
      
      runningJobs.get(jobID) match {
        case Some(executionGraph) =>
          val savepointFuture = executionGraph.triggerSavepoint(savepointDirectory.orNull)
          
          savepointFuture.whenComplete { (savepointPath, throwable) =>
            if (throwable != null) {
              sender() ! SavepointFailure(jobID, throwable)
            } else {
              sender() ! SavepointSuccess(jobID, savepointPath, System.currentTimeMillis())
            }
          }
        case None =>
          sender() ! SavepointFailure(jobID, new JobNotFoundException(jobID))
      }
    
    case RequestLeaderSessionID =>
      sender() ! ResponseLeaderSessionID(leaderSessionID)
    
    case LeaderSessionMessage(sessionID, message) =>
      if (leaderSessionID.contains(sessionID)) {
        self.tell(message, sender())
      } else {
        log.warning(s"Received message with invalid leader session ID: $sessionID")
        sender() ! JobResultFailure(new LeaderSessionIDException("Invalid leader session ID"))
      }
  }
  
  private def validateLeaderSession(requester: ActorRef): Unit = {
    if (leaderSessionID.isEmpty) {
      throw new LeaderSessionIDException("No active leader session")
    }
  }
  
  private def createExecutionGraph(jobGraph: JobGraph): ExecutionGraph = {
    // Implementation for creating execution graph from job graph
    // This would involve setting up vertices, scheduling, etc.
    new ExecutionGraph(/* parameters */)
  }
  
  private def registerForJobResult(jobID: JobID, client: ActorRef): Unit = {
    // Register client to receive job execution results
  }
  
  private def registerForJobUpdates(jobID: JobID, client: ActorRef): Unit = {
    // Register client to receive job state changes and results
  }
}

TaskManager Message Handling

import akka.actor.{Actor, ActorLogging}
import org.apache.flink.runtime.messages.TaskManagerMessages._

class TaskManagerActor extends Actor with ActorLogging {
  private val runningTasks = mutable.Map[ExecutionAttemptID, Task]()
  private var jobManagerRef: Option[ActorRef] = None
  private var isRegistered = false
  
  override def receive: Receive = {
    case RegisterAtJobManager(resourceID, taskManagerLocation, hardwareDescription) =>
      log.info(s"Registering at JobManager: ${sender()}")
      jobManagerRef = Some(sender())
      
      // Send registration message to JobManager
      sender() ! RegisterTaskManager(
        taskManagerLocation,
        resourceID,
        taskManagerLocation.getDataPort,
        hardwareDescription
      )
    
    case SubmitTask(taskDeploymentDescriptor) =>
      val executionAttemptID = taskDeploymentDescriptor.getExecutionAttemptId
      log.info(s"Received task submission: $executionAttemptID")
      
      try {
        // Create and start task
        val task = createTask(taskDeploymentDescriptor)
        runningTasks += executionAttemptID -> task
        
        // Start task execution
        task.startTaskThread()
        
        sender() ! TaskSubmitted(executionAttemptID)
        
      } catch {
        case ex: Exception =>
          log.error(ex, s"Failed to submit task $executionAttemptID")
          sender() ! TaskFailed(executionAttemptID, ex)
      }
    
    case CancelTask(executionAttemptID) =>
      log.info(s"Cancelling task: $executionAttemptID")
      
      runningTasks.get(executionAttemptID) match {
        case Some(task) =>
          try {
            task.cancelExecution()
            sender() ! TaskCanceled(executionAttemptID)
          } catch {
            case ex: Exception =>
              log.error(ex, s"Failed to cancel task $executionAttemptID")
              sender() ! TaskFailed(executionAttemptID, ex)
          }
        case None =>
          log.warning(s"Attempted to cancel unknown task: $executionAttemptID")
          sender() ! TaskCanceled(executionAttemptID) // Already not running
      }
    
    case TriggerCheckpoint(executionAttemptID, checkpointID, timestamp, checkpointOptions) =>
      log.debug(s"Triggering checkpoint $checkpointID for task $executionAttemptID")
      
      runningTasks.get(executionAttemptID) match {
        case Some(task) =>
          task.triggerCheckpointBarrier(checkpointID, timestamp, checkpointOptions)
        case None =>
          log.warning(s"Checkpoint triggered for unknown task: $executionAttemptID")
          sender() ! DeclineCheckpoint(
            executionAttemptID, 
            checkpointID, 
            CheckpointDeclineReason.TASK_NOT_RUNNING
          )
      }
    
    case RequestTaskManagerStatus() =>
      val numberOfSlots = getNumberOfSlots()
      val availableSlots = getAvailableSlots()
      
      sender() ! TaskManagerStatusResponse(
        getResourceID(),
        getTaskManagerLocation(),
        numberOfSlots,
        availableSlots,
        getHardwareDescription()
      )
    
    case RequestRunningTasks() =>
      sender() ! RunningTasks(runningTasks.keys.toList.asJavaCollection)
    
    case SendHeartbeat(resourceID, accumulatorSnapshot) =>
      jobManagerRef.foreach { jm =>
        jm ! Heartbeat(resourceID, accumulatorSnapshot)
      }
    
    case TaskInFinalState(executionAttemptID) =>
      log.info(s"Task finished: $executionAttemptID")
      runningTasks.remove(executionAttemptID)
      
      // Notify JobManager about task completion
      jobManagerRef.foreach { jm =>
        jm ! UpdateTaskExecutionState(createTaskExecutionState(executionAttemptID))
      }
  }
  
  private def createTask(tdd: TaskDeploymentDescriptor): Task = {
    // Implementation for creating a task from deployment descriptor
    new Task(/* parameters from tdd */)
  }
  
  private def getNumberOfSlots(): Int = {
    // Return total number of task slots
    4 // Example
  }
  
  private def getAvailableSlots(): Int = {
    // Return number of available task slots
    getNumberOfSlots() - runningTasks.size
  }
  
  private def createTaskExecutionState(executionAttemptID: ExecutionAttemptID): TaskExecutionState = {
    // Create task execution state for reporting
    new TaskExecutionState(
      executionAttemptID.getJobId,
      executionAttemptID,
      ExecutionState.FINISHED,
      null // no error
    )
  }
}

Client Message Communication

import akka.actor.{Actor, ActorRef, Props, ActorSystem}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.flink.runtime.messages.JobClientMessages._
import scala.concurrent.duration._
import scala.util.{Success, Failure}

class FlinkJobClient(jobManagerRef: ActorRef)(implicit system: ActorSystem) {
  implicit val timeout: Timeout = 30.seconds
  implicit val ec = system.dispatcher
  
  def submitJob(jobGraph: JobGraph, detached: Boolean = false): Unit = {
    val message = if (detached) {
      SubmitJobDetached(jobGraph)
    } else {
      SubmitJobAndWait(jobGraph, listenToExecutionResult = true, 30.seconds)
    }
    
    val resultFuture = jobManagerRef ? message
    
    resultFuture.onComplete {
      case Success(JobExecutionResult(result)) =>
        println(s"Job ${jobGraph.getJobID} completed successfully")
        println(s"Execution time: ${result.getNetRuntime} ms")
        
      case Success(JobExecutionException(exception)) =>
        println(s"Job ${jobGraph.getJobID} failed: ${exception.getMessage}")
        
      case Failure(throwable) =>
        println(s"Communication error: ${throwable.getMessage}")
    }
  }
  
  def cancelJob(jobID: JobID): Unit = {
    val resultFuture = jobManagerRef ? CancelJob(jobID)
    
    resultFuture.onComplete {
      case Success(Acknowledge()) =>
        println(s"Job $jobID cancelled successfully")
        
      case Success(JobResultFailure(cause)) =>
        println(s"Failed to cancel job $jobID: ${cause.getMessage}")
        
      case Failure(throwable) =>
        println(s"Communication error: ${throwable.getMessage}")
    }
  }
  
  def triggerSavepoint(jobID: JobID, savepointDirectory: Option[String]): Unit = {
    val resultFuture = jobManagerRef ? TriggerSavepoint(jobID, savepointDirectory)
    
    resultFuture.onComplete {
      case Success(SavepointSuccess(jobId, savepointPath, triggerTime)) =>
        println(s"Savepoint created for job $jobId at: $savepointPath")
        
      case Success(SavepointFailure(jobId, cause)) =>
        println(s"Savepoint failed for job $jobId: ${cause.getMessage}")
        
      case Failure(throwable) =>
        println(s"Communication error: ${throwable.getMessage}")
    }
  }
  
  def getJobStatus(jobID: JobID): Unit = {
    val resultFuture = jobManagerRef ? RequestJobStatus(jobID)
    
    resultFuture.onComplete {
      case Success(JobStatusAnswer(jobId, status)) =>
        println(s"Job $jobId status: $status")
        
      case Success(JobNotFound(jobId)) =>
        println(s"Job $jobId not found")
        
      case Failure(throwable) =>
        println(s"Communication error: ${throwable.getMessage}")
    }
  }
  
  def getClusterStatus(): Unit = {
    val resultFuture = jobManagerRef ? RequestClusterStatus
    
    resultFuture.onComplete {
      case Success(status) =>
        println(s"Cluster status: $status")
        
      case Failure(throwable) =>
        println(s"Communication error: ${throwable.getMessage}")
    }
  }
}

Common Patterns

Message Routing and Dispatch

trait MessageRouter {
  def route(message: Any, sender: ActorRef): Unit = message match {
    // Job management messages
    case msg: JobManagerMessages.SubmitJob => handleJobSubmission(msg, sender)
    case msg: JobManagerMessages.CancelJob => handleJobCancellation(msg, sender)
    case msg: JobManagerMessages.RequestJobStatus => handleJobStatusRequest(msg, sender)
    
    // Task management messages  
    case msg: TaskManagerMessages.SubmitTask => handleTaskSubmission(msg, sender)
    case msg: TaskManagerMessages.CancelTask => handleTaskCancellation(msg, sender)
    case msg: TaskManagerMessages.UpdateTaskExecutionState => handleTaskStateUpdate(msg, sender)
    
    // Resource management messages
    case msg: ResourceManagerMessages.RequestSlot => handleSlotRequest(msg, sender)
    case msg: ResourceManagerMessages.RegisterTaskExecutor => handleTaskManagerRegistration(msg, sender)
    
    // Unknown message
    case unknown => handleUnknownMessage(unknown, sender)
  }
  
  protected def handleJobSubmission(msg: JobManagerMessages.SubmitJob, sender: ActorRef): Unit
  protected def handleJobCancellation(msg: JobManagerMessages.CancelJob, sender: ActorRef): Unit
  protected def handleJobStatusRequest(msg: JobManagerMessages.RequestJobStatus, sender: ActorRef): Unit
  
  protected def handleTaskSubmission(msg: TaskManagerMessages.SubmitTask, sender: ActorRef): Unit
  protected def handleTaskCancellation(msg: TaskManagerMessages.CancelTask, sender: ActorRef): Unit
  protected def handleTaskStateUpdate(msg: TaskManagerMessages.UpdateTaskExecutionState, sender: ActorRef): Unit
  
  protected def handleSlotRequest(msg: ResourceManagerMessages.RequestSlot, sender: ActorRef): Unit
  protected def handleTaskManagerRegistration(msg: ResourceManagerMessages.RegisterTaskExecutor, sender: ActorRef): Unit
  
  protected def handleUnknownMessage(message: Any, sender: ActorRef): Unit = {
    throw new IllegalArgumentException(s"Unknown message type: ${message.getClass}")
  }
}

Leader Session Management

trait LeaderSessionManager {
  private var currentLeaderSessionID: Option[UUID] = None
  
  def becomeLeader(sessionID: UUID): Unit = {
    currentLeaderSessionID = Some(sessionID)
    onBecomeLeader(sessionID)
  }
  
  def revokeLeadership(): Unit = {
    currentLeaderSessionID = None
    onRevokeLeadership()
  }
  
  def validateLeaderSession(message: RequiresLeaderSessionID): Boolean = {
    currentLeaderSessionID.contains(message.leaderSessionID)
  }
  
  def handleLeaderSessionMessage(message: Any, sender: ActorRef): Unit = message match {
    case LeaderSessionMessage(sessionID, wrappedMessage) =>
      if (currentLeaderSessionID.contains(sessionID)) {
        handleMessage(wrappedMessage, sender)
      } else {
        sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
      }
    
    case msg: RequiresLeaderSessionID =>
      if (validateLeaderSession(msg)) {
        handleMessage(msg, sender)
      } else {
        sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
      }
    
    case other =>
      handleMessage(other, sender)
  }
  
  protected def onBecomeLeader(sessionID: UUID): Unit = {}
  protected def onRevokeLeadership(): Unit = {}
  protected def handleMessage(message: Any, sender: ActorRef): Unit
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json