0
# Message Passing (Scala APIs)
1
2
The Message Passing system provides Scala-based message definitions for actor-based communication within the Flink runtime cluster. These message classes define the communication protocols between JobManagers, TaskManagers, and clients, enabling distributed coordination and control operations across the cluster.
3
4
## JobManager Messages
5
6
### JobManagerMessages
7
8
Scala object containing message definitions for JobManager communication in the actor system.
9
10
```scala { .api }
11
object JobManagerMessages {
12
// Job submission and lifecycle messages
13
case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID
14
case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
15
case class StopJob(jobID: JobID) extends RequiresLeaderSessionID
16
case class RescaleJob(jobID: JobID, newParallelism: Int) extends RequiresLeaderSessionID
17
18
// Job status and monitoring messages
19
case class RequestJobStatus(jobID: JobID)
20
case class JobStatusResponse(jobID: JobID, status: JobStatus, timestamp: Long)
21
22
case class RequestRunningJobs()
23
case class RunningJobs(runningJobs: Iterable[ExecutionGraph])
24
25
case class RequestJobDetails(jobID: JobID)
26
case class JobDetails(jobID: JobID, jobName: String, startTime: Long, endTime: Long,
27
status: JobStatus, lastModification: Long)
28
29
// Execution graph access messages
30
case class RequestExecutionGraph(jobID: JobID)
31
case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph)
32
case class ExecutionGraphNotFound(jobID: JobID)
33
34
// Savepoint and checkpoint messages
35
case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String]) extends RequiresLeaderSessionID
36
case class SavepointSuccess(jobID: JobID, savepointPath: String, timestamp: Long)
37
case class SavepointFailure(jobID: JobID, cause: Throwable)
38
39
case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID
40
case class DisposeSavepointSuccess(savepointPath: String)
41
case class DisposeSavepointFailure(savepointPath: String, cause: Throwable)
42
43
// Accumulator and metrics messages
44
case class RequestAccumulators(jobID: JobID)
45
case class AccumulatorResultsFound(jobID: JobID, userAccumulators: Map[String, OptionalFailure[AnyRef]])
46
case class AccumulatorResultsNotFound(jobID: JobID)
47
48
// Configuration and cluster information
49
case object RequestClusterStatus
50
case class ClusterStatusWithTaskManagerInfo(
51
numTaskManagers: Int,
52
numSlotsTotal: Int,
53
numSlotsAvailable: Int,
54
taskManagerInfos: Iterable[TaskManagerInfo]
55
)
56
57
case object RequestClusterConfiguration
58
case class ClusterConfiguration(config: Configuration)
59
60
// Response messages
61
case class JobSubmitSuccess(jobID: JobID)
62
case class JobResultSuccess(result: JobExecutionResult)
63
case class JobResultFailure(cause: Throwable)
64
65
// Listening behaviour for job submissions
66
sealed trait ListeningBehaviour
67
case object ListeningBehaviour {
68
case object DETACHED extends ListeningBehaviour
69
case object EXECUTION_RESULT extends ListeningBehaviour
70
case object EXECUTION_RESULT_AND_STATE_CHANGES extends ListeningBehaviour
71
}
72
}
73
```
74
75
### JobManager Coordination Messages
76
77
```scala { .api }
78
object JobManagerMessages {
79
// Leader election and coordination
80
case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID
81
82
case object RequestLeaderSessionID
83
case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
84
85
// Resource management coordination
86
case class RegisterTaskManager(
87
taskManagerLocation: TaskManagerLocation,
88
resourceID: ResourceID,
89
dataPort: Int,
90
hardwareDescription: HardwareDescription
91
)
92
93
case class TaskManagerRegistrationSuccess(
94
resourceID: ResourceID,
95
blobPort: Int
96
)
97
98
case class TaskManagerRegistrationRejection(
99
resourceID: ResourceID,
100
reason: String
101
)
102
103
// Heartbeat messages
104
case class Heartbeat(from: ResourceID, heartbeatPayload: HeartbeatPayload)
105
case class HeartbeatResponse(from: ResourceID)
106
107
// Shutdown and cleanup
108
case object RequestShutdown
109
case class Acknowledge() extends AcknowledgeMessage
110
}
111
```
112
113
## TaskManager Messages
114
115
### TaskManagerMessages
116
117
Scala object containing message definitions for TaskManager communication.
118
119
```scala { .api }
120
object TaskManagerMessages {
121
// Task deployment and lifecycle
122
case class SubmitTask(tdd: TaskDeploymentDescriptor) extends RequiresLeaderSessionID
123
case class TaskSubmitted(executionAttemptID: ExecutionAttemptID)
124
case class TaskFailed(executionAttemptID: ExecutionAttemptID, cause: Throwable)
125
case class TaskFinished(executionAttemptID: ExecutionAttemptID, executionResult: ExecutionResult)
126
127
case class CancelTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
128
case class TaskCanceled(executionAttemptID: ExecutionAttemptID)
129
130
case class StopTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID
131
case class TaskStopped(executionAttemptID: ExecutionAttemptID)
132
133
// Task status and monitoring
134
case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) extends RequiresLeaderSessionID
135
136
case class RequestTaskManagerStatus()
137
case class TaskManagerStatusResponse(
138
resourceID: ResourceID,
139
taskManagerLocation: TaskManagerLocation,
140
numberOfSlots: Int,
141
numberOfAvailableSlots: Int,
142
hardwareDescription: HardwareDescription
143
)
144
145
case class RequestRunningTasks()
146
case class RunningTasks(runningTasks: Collection[ExecutionAttemptID])
147
148
// Resource and slot management
149
case class RequestSlot(
150
slotID: SlotID,
151
jobID: JobID,
152
allocationID: AllocationID,
153
resourceProfile: ResourceProfile,
154
targetAddress: String
155
) extends RequiresLeaderSessionID
156
157
case class SlotOffer(
158
allocationID: AllocationID,
159
slotID: SlotID,
160
resourceProfile: ResourceProfile
161
)
162
163
case class FreeSlot(allocationID: AllocationID) extends RequiresLeaderSessionID
164
case class SlotFreed(allocationID: AllocationID)
165
166
// Checkpoint coordination
167
case class TriggerCheckpoint(
168
executionAttemptID: ExecutionAttemptID,
169
checkpointID: Long,
170
timestamp: Long,
171
checkpointOptions: CheckpointOptions
172
) extends RequiresLeaderSessionID
173
174
case class ConfirmCheckpoint(
175
executionAttemptID: ExecutionAttemptID,
176
checkpointID: Long,
177
checkpointMetrics: CheckpointMetrics,
178
subtaskState: TaskStateSnapshot
179
) extends RequiresLeaderSessionID
180
181
case class DeclineCheckpoint(
182
executionAttemptID: ExecutionAttemptID,
183
checkpointID: Long,
184
reason: CheckpointDeclineReason
185
) extends RequiresLeaderSessionID
186
187
// Registration and heartbeat
188
case class RegisterAtJobManager(
189
resourceID: ResourceID,
190
taskManagerLocation: TaskManagerLocation,
191
hardwareDescription: HardwareDescription
192
)
193
194
case class SendHeartbeat(resourceID: ResourceID, heartbeatPayload: AccumulatorSnapshot)
195
196
// Error handling and failure
197
case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)
198
case class FailTask(executionAttemptID: ExecutionAttemptID, cause: Throwable)
199
200
// Stack trace and debugging
201
case class SendStackTrace()
202
case class StackTrace(stackTrace: Array[StackTraceElement])
203
204
case class RequestStackTrace(executionAttemptID: ExecutionAttemptID)
205
case class StackTraceResponse(executionAttemptID: ExecutionAttemptID, stackTrace: Array[StackTraceElement])
206
}
207
```
208
209
### TaskManager Coordination Messages
210
211
```scala { .api }
212
object TaskManagerMessages {
213
// Memory and resource management
214
case object RequestMemoryReport
215
case class MemoryReport(
216
managedMemory: Long,
217
totalMemory: Long,
218
availableMemory: Long,
219
gcStats: Array[GarbageCollectorStats]
220
)
221
222
case object RequestIOReport
223
case class IOReport(
224
diskSpaceInfo: Array[DiskSpaceInfo],
225
networkIOMetrics: NetworkIOMetrics
226
)
227
228
// Configuration and environment
229
case object RequestTaskManagerConfiguration
230
case class TaskManagerConfiguration(config: Configuration, tempDirectories: Array[File])
231
232
case object RequestEnvironmentInformation
233
case class EnvironmentInformation(
234
hostname: String,
235
taskManagerAddress: String,
236
dataPort: Int,
237
numberOfSlots: Int
238
)
239
240
// Disconnect and shutdown
241
case class Disconnect(message: String)
242
case object NotifyWhenRegisteredAtJobManager
243
case object RegisteredAtJobManager
244
245
case class TaskManagerShutdown()
246
}
247
```
248
249
## JobClient Messages
250
251
### JobClientMessages
252
253
Scala object containing message definitions for job client communication.
254
255
```scala { .api }
256
object JobClientMessages {
257
// Job submission from client
258
case class SubmitJobAndWait(
259
jobGraph: JobGraph,
260
listenToExecutionResult: Boolean,
261
leaderSessionTimeout: FiniteDuration
262
)
263
264
case class SubmitJobDetached(jobGraph: JobGraph)
265
266
// Job control operations
267
case class CancelJob(jobID: JobID)
268
case class CancelJobWithSavepoint(jobID: JobID, savepointDirectory: String)
269
case class StopJob(jobID: JobID)
270
271
// Status and monitoring requests
272
case class RequestJobStatus(jobID: JobID)
273
case class RequestBlobManagerPort()
274
case class BlobManagerPort(port: Int)
275
276
case class RequestExecutionResult(jobID: JobID)
277
278
// Savepoint operations from client
279
case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String])
280
case class RequestSavepoint(jobID: JobID, savepointDirectory: String)
281
282
case class DisposeSavepoint(savepointPath: String)
283
284
// Configuration queries
285
case object RequestClusterConfiguration
286
case object RequestClusterStatus
287
288
// Response messages
289
case class JobExecutionResult(result: SerializedJobExecutionResult)
290
case class JobExecutionException(exception: SerializedThrowable)
291
292
case class JobStatusAnswer(jobID: JobID, status: JobStatus)
293
case class JobNotFound(jobID: JobID)
294
295
case class SavepointSuccess(jobID: JobID, savepointPath: String, triggerTime: Long)
296
case class SavepointFailure(jobID: JobID, cause: SerializedThrowable)
297
298
// Connection and session management
299
case object GetLeaderSessionID
300
case class LeaderSessionID(leaderSessionID: Option[UUID])
301
302
case class JobClientActorConnectionTimeoutException(message: String) extends Exception(message)
303
}
304
```
305
306
## Resource Manager Messages
307
308
### ResourceManagerMessages
309
310
Messages for ResourceManager communication and coordination.
311
312
```scala { .api }
313
object ResourceManagerMessages {
314
// Task Manager registration
315
case class RegisterTaskExecutor(
316
resourceID: ResourceID,
317
taskExecutorAddress: String,
318
dataPort: Int,
319
numberOfSlots: Int,
320
hardwareDescription: HardwareDescription
321
)
322
323
case class TaskExecutorRegistrationSuccess(
324
resourceManagerId: ResourceManagerId,
325
heartbeatInterval: Time
326
)
327
328
case class TaskExecutorRegistrationRejection(reason: String)
329
330
// Slot requests and allocation
331
case class RequestSlot(
332
jobMasterId: JobMasterId,
333
slotRequest: SlotRequest
334
)
335
336
case class SlotRequestRegistered(
337
slotRequestId: SlotRequestId,
338
targetAddress: String
339
)
340
341
case class SlotRequestFailed(
342
slotRequestId: SlotRequestId,
343
cause: Throwable
344
)
345
346
case class CancelSlotRequest(slotRequestId: SlotRequestId)
347
348
// Resource allocation and management
349
case class NotifyResourceAvailable(
350
resourceID: ResourceID,
351
availableSlots: Int
352
)
353
354
case class RequestResourceOverview()
355
case class ResourceOverview(
356
numberTaskManagers: Int,
357
numberRegisteredSlots: Int,
358
numberFreeSlots: Int
359
)
360
361
// JobManager coordination
362
case class RegisterJobManager(
363
jobMasterId: JobMasterId,
364
resourceID: ResourceID,
365
jobManagerAddress: String,
366
jobID: JobID
367
)
368
369
case class JobManagerRegistrationSuccess(heartbeatInterval: Time)
370
case class JobManagerRegistrationRejection(reason: String)
371
372
// Heartbeat and monitoring
373
case class TaskManagerHeartbeat(
374
resourceID: ResourceID,
375
slotReport: SlotReport
376
)
377
378
case class JobManagerHeartbeat(resourceID: ResourceID)
379
380
// Disconnection and cleanup
381
case class DisconnectTaskManager(resourceID: ResourceID, cause: Exception)
382
case class DisconnectJobManager(jobID: JobID, cause: Exception)
383
384
// Resource allocation requests
385
case class StartNewTaskManager(resourceProfile: ResourceProfile)
386
case class StopTaskManager(resourceID: ResourceID)
387
}
388
```
389
390
## Common Message Traits and Utilities
391
392
### Base Message Traits
393
394
```scala { .api }
395
// Base traits for message classification
396
trait RequiresLeaderSessionID {
397
def leaderSessionID: UUID
398
}
399
400
trait AcknowledgeMessage
401
402
sealed trait JobManagerMessage
403
sealed trait TaskManagerMessage
404
sealed trait ClientMessage
405
406
// Message decorators and wrappers
407
case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID
408
409
case class MessageDecorator[T](
410
target: ActorRef,
411
message: T,
412
timeout: FiniteDuration
413
)
414
415
// Response handling
416
sealed trait ResponseMessage
417
case class Success[T](result: T) extends ResponseMessage
418
case class Failure(cause: Throwable) extends ResponseMessage
419
```
420
421
### Message Serialization Support
422
423
```scala { .api }
424
object MessageSerializationUtils {
425
// Serialization helpers for complex message payloads
426
def serializeJobExecutionResult(result: JobExecutionResult): SerializedJobExecutionResult = {
427
new SerializedJobExecutionResult(
428
result.getJobID,
429
result.getNetRuntime,
430
ClassLoaderUtils.withContextClassLoader(
431
result.getClass.getClassLoader,
432
() => result.getAllAccumulatorResults
433
)
434
)
435
}
436
437
def serializeThrowable(throwable: Throwable): SerializedThrowable = {
438
new SerializedThrowable(throwable, throwable.getClass.getClassLoader)
439
}
440
441
def deserializeJobExecutionResult(
442
serialized: SerializedJobExecutionResult,
443
classLoader: ClassLoader
444
): JobExecutionResult = {
445
new JobExecutionResult(
446
serialized.getJobId,
447
serialized.getNetRuntime,
448
ClassLoaderUtils.withContextClassLoader(classLoader, serialized.getSerializedAccumulators)
449
)
450
}
451
}
452
```
453
454
## Usage Examples
455
456
### JobManager Message Handling
457
458
```scala
459
import akka.actor.{Actor, ActorLogging, ActorRef}
460
import org.apache.flink.runtime.messages.JobManagerMessages._
461
import java.util.UUID
462
463
class JobManagerActor extends Actor with ActorLogging {
464
private var leaderSessionID: Option[UUID] = None
465
private val runningJobs = mutable.Map[JobID, ExecutionGraph]()
466
467
override def receive: Receive = {
468
case SubmitJob(jobGraph, listeningBehaviour) =>
469
log.info(s"Received job submission: ${jobGraph.getJobID}")
470
471
try {
472
// Validate leader session
473
validateLeaderSession(sender())
474
475
// Create execution graph
476
val executionGraph = createExecutionGraph(jobGraph)
477
runningJobs += jobGraph.getJobID -> executionGraph
478
479
// Start job execution
480
executionGraph.scheduleForExecution()
481
482
sender() ! JobSubmitSuccess(jobGraph.getJobID)
483
484
// Handle listening behaviour
485
listeningBehaviour match {
486
case ListeningBehaviour.EXECUTION_RESULT =>
487
// Register for execution result notifications
488
registerForJobResult(jobGraph.getJobID, sender())
489
case ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES =>
490
// Register for both result and state change notifications
491
registerForJobUpdates(jobGraph.getJobID, sender())
492
case ListeningBehaviour.DETACHED =>
493
// No further notifications needed
494
}
495
496
} catch {
497
case ex: Exception =>
498
log.error(ex, s"Failed to submit job ${jobGraph.getJobID}")
499
sender() ! JobResultFailure(ex)
500
}
501
502
case CancelJob(jobID) =>
503
log.info(s"Received cancel request for job: $jobID")
504
505
runningJobs.get(jobID) match {
506
case Some(executionGraph) =>
507
try {
508
executionGraph.cancel()
509
sender() ! Acknowledge()
510
} catch {
511
case ex: Exception =>
512
sender() ! JobResultFailure(ex)
513
}
514
case None =>
515
sender() ! JobResultFailure(new JobNotFoundException(jobID))
516
}
517
518
case RequestJobStatus(jobID) =>
519
runningJobs.get(jobID) match {
520
case Some(executionGraph) =>
521
val status = executionGraph.getState
522
val timestamp = executionGraph.getStatusTimestamp(status)
523
sender() ! JobStatusResponse(jobID, status, timestamp)
524
case None =>
525
sender() ! JobResultFailure(new JobNotFoundException(jobID))
526
}
527
528
case RequestRunningJobs() =>
529
sender() ! RunningJobs(runningJobs.values)
530
531
case TriggerSavepoint(jobID, savepointDirectory) =>
532
log.info(s"Triggering savepoint for job $jobID")
533
534
runningJobs.get(jobID) match {
535
case Some(executionGraph) =>
536
val savepointFuture = executionGraph.triggerSavepoint(savepointDirectory.orNull)
537
538
savepointFuture.whenComplete { (savepointPath, throwable) =>
539
if (throwable != null) {
540
sender() ! SavepointFailure(jobID, throwable)
541
} else {
542
sender() ! SavepointSuccess(jobID, savepointPath, System.currentTimeMillis())
543
}
544
}
545
case None =>
546
sender() ! SavepointFailure(jobID, new JobNotFoundException(jobID))
547
}
548
549
case RequestLeaderSessionID =>
550
sender() ! ResponseLeaderSessionID(leaderSessionID)
551
552
case LeaderSessionMessage(sessionID, message) =>
553
if (leaderSessionID.contains(sessionID)) {
554
self.tell(message, sender())
555
} else {
556
log.warning(s"Received message with invalid leader session ID: $sessionID")
557
sender() ! JobResultFailure(new LeaderSessionIDException("Invalid leader session ID"))
558
}
559
}
560
561
private def validateLeaderSession(requester: ActorRef): Unit = {
562
if (leaderSessionID.isEmpty) {
563
throw new LeaderSessionIDException("No active leader session")
564
}
565
}
566
567
private def createExecutionGraph(jobGraph: JobGraph): ExecutionGraph = {
568
// Implementation for creating execution graph from job graph
569
// This would involve setting up vertices, scheduling, etc.
570
new ExecutionGraph(/* parameters */)
571
}
572
573
private def registerForJobResult(jobID: JobID, client: ActorRef): Unit = {
574
// Register client to receive job execution results
575
}
576
577
private def registerForJobUpdates(jobID: JobID, client: ActorRef): Unit = {
578
// Register client to receive job state changes and results
579
}
580
}
581
```
582
583
### TaskManager Message Handling
584
585
```scala
586
import akka.actor.{Actor, ActorLogging}
587
import org.apache.flink.runtime.messages.TaskManagerMessages._
588
589
class TaskManagerActor extends Actor with ActorLogging {
590
private val runningTasks = mutable.Map[ExecutionAttemptID, Task]()
591
private var jobManagerRef: Option[ActorRef] = None
592
private var isRegistered = false
593
594
override def receive: Receive = {
595
case RegisterAtJobManager(resourceID, taskManagerLocation, hardwareDescription) =>
596
log.info(s"Registering at JobManager: ${sender()}")
597
jobManagerRef = Some(sender())
598
599
// Send registration message to JobManager
600
sender() ! RegisterTaskManager(
601
taskManagerLocation,
602
resourceID,
603
taskManagerLocation.getDataPort,
604
hardwareDescription
605
)
606
607
case SubmitTask(taskDeploymentDescriptor) =>
608
val executionAttemptID = taskDeploymentDescriptor.getExecutionAttemptId
609
log.info(s"Received task submission: $executionAttemptID")
610
611
try {
612
// Create and start task
613
val task = createTask(taskDeploymentDescriptor)
614
runningTasks += executionAttemptID -> task
615
616
// Start task execution
617
task.startTaskThread()
618
619
sender() ! TaskSubmitted(executionAttemptID)
620
621
} catch {
622
case ex: Exception =>
623
log.error(ex, s"Failed to submit task $executionAttemptID")
624
sender() ! TaskFailed(executionAttemptID, ex)
625
}
626
627
case CancelTask(executionAttemptID) =>
628
log.info(s"Cancelling task: $executionAttemptID")
629
630
runningTasks.get(executionAttemptID) match {
631
case Some(task) =>
632
try {
633
task.cancelExecution()
634
sender() ! TaskCanceled(executionAttemptID)
635
} catch {
636
case ex: Exception =>
637
log.error(ex, s"Failed to cancel task $executionAttemptID")
638
sender() ! TaskFailed(executionAttemptID, ex)
639
}
640
case None =>
641
log.warning(s"Attempted to cancel unknown task: $executionAttemptID")
642
sender() ! TaskCanceled(executionAttemptID) // Already not running
643
}
644
645
case TriggerCheckpoint(executionAttemptID, checkpointID, timestamp, checkpointOptions) =>
646
log.debug(s"Triggering checkpoint $checkpointID for task $executionAttemptID")
647
648
runningTasks.get(executionAttemptID) match {
649
case Some(task) =>
650
task.triggerCheckpointBarrier(checkpointID, timestamp, checkpointOptions)
651
case None =>
652
log.warning(s"Checkpoint triggered for unknown task: $executionAttemptID")
653
sender() ! DeclineCheckpoint(
654
executionAttemptID,
655
checkpointID,
656
CheckpointDeclineReason.TASK_NOT_RUNNING
657
)
658
}
659
660
case RequestTaskManagerStatus() =>
661
val numberOfSlots = getNumberOfSlots()
662
val availableSlots = getAvailableSlots()
663
664
sender() ! TaskManagerStatusResponse(
665
getResourceID(),
666
getTaskManagerLocation(),
667
numberOfSlots,
668
availableSlots,
669
getHardwareDescription()
670
)
671
672
case RequestRunningTasks() =>
673
sender() ! RunningTasks(runningTasks.keys.toList.asJavaCollection)
674
675
case SendHeartbeat(resourceID, accumulatorSnapshot) =>
676
jobManagerRef.foreach { jm =>
677
jm ! Heartbeat(resourceID, accumulatorSnapshot)
678
}
679
680
case TaskInFinalState(executionAttemptID) =>
681
log.info(s"Task finished: $executionAttemptID")
682
runningTasks.remove(executionAttemptID)
683
684
// Notify JobManager about task completion
685
jobManagerRef.foreach { jm =>
686
jm ! UpdateTaskExecutionState(createTaskExecutionState(executionAttemptID))
687
}
688
}
689
690
private def createTask(tdd: TaskDeploymentDescriptor): Task = {
691
// Implementation for creating a task from deployment descriptor
692
new Task(/* parameters from tdd */)
693
}
694
695
private def getNumberOfSlots(): Int = {
696
// Return total number of task slots
697
4 // Example
698
}
699
700
private def getAvailableSlots(): Int = {
701
// Return number of available task slots
702
getNumberOfSlots() - runningTasks.size
703
}
704
705
private def createTaskExecutionState(executionAttemptID: ExecutionAttemptID): TaskExecutionState = {
706
// Create task execution state for reporting
707
new TaskExecutionState(
708
executionAttemptID.getJobId,
709
executionAttemptID,
710
ExecutionState.FINISHED,
711
null // no error
712
)
713
}
714
}
715
```
716
717
### Client Message Communication
718
719
```scala
720
import akka.actor.{Actor, ActorRef, Props, ActorSystem}
721
import akka.pattern.ask
722
import akka.util.Timeout
723
import org.apache.flink.runtime.messages.JobClientMessages._
724
import scala.concurrent.duration._
725
import scala.util.{Success, Failure}
726
727
class FlinkJobClient(jobManagerRef: ActorRef)(implicit system: ActorSystem) {
728
implicit val timeout: Timeout = 30.seconds
729
implicit val ec = system.dispatcher
730
731
def submitJob(jobGraph: JobGraph, detached: Boolean = false): Unit = {
732
val message = if (detached) {
733
SubmitJobDetached(jobGraph)
734
} else {
735
SubmitJobAndWait(jobGraph, listenToExecutionResult = true, 30.seconds)
736
}
737
738
val resultFuture = jobManagerRef ? message
739
740
resultFuture.onComplete {
741
case Success(JobExecutionResult(result)) =>
742
println(s"Job ${jobGraph.getJobID} completed successfully")
743
println(s"Execution time: ${result.getNetRuntime} ms")
744
745
case Success(JobExecutionException(exception)) =>
746
println(s"Job ${jobGraph.getJobID} failed: ${exception.getMessage}")
747
748
case Failure(throwable) =>
749
println(s"Communication error: ${throwable.getMessage}")
750
}
751
}
752
753
def cancelJob(jobID: JobID): Unit = {
754
val resultFuture = jobManagerRef ? CancelJob(jobID)
755
756
resultFuture.onComplete {
757
case Success(Acknowledge()) =>
758
println(s"Job $jobID cancelled successfully")
759
760
case Success(JobResultFailure(cause)) =>
761
println(s"Failed to cancel job $jobID: ${cause.getMessage}")
762
763
case Failure(throwable) =>
764
println(s"Communication error: ${throwable.getMessage}")
765
}
766
}
767
768
def triggerSavepoint(jobID: JobID, savepointDirectory: Option[String]): Unit = {
769
val resultFuture = jobManagerRef ? TriggerSavepoint(jobID, savepointDirectory)
770
771
resultFuture.onComplete {
772
case Success(SavepointSuccess(jobId, savepointPath, triggerTime)) =>
773
println(s"Savepoint created for job $jobId at: $savepointPath")
774
775
case Success(SavepointFailure(jobId, cause)) =>
776
println(s"Savepoint failed for job $jobId: ${cause.getMessage}")
777
778
case Failure(throwable) =>
779
println(s"Communication error: ${throwable.getMessage}")
780
}
781
}
782
783
def getJobStatus(jobID: JobID): Unit = {
784
val resultFuture = jobManagerRef ? RequestJobStatus(jobID)
785
786
resultFuture.onComplete {
787
case Success(JobStatusAnswer(jobId, status)) =>
788
println(s"Job $jobId status: $status")
789
790
case Success(JobNotFound(jobId)) =>
791
println(s"Job $jobId not found")
792
793
case Failure(throwable) =>
794
println(s"Communication error: ${throwable.getMessage}")
795
}
796
}
797
798
def getClusterStatus(): Unit = {
799
val resultFuture = jobManagerRef ? RequestClusterStatus
800
801
resultFuture.onComplete {
802
case Success(status) =>
803
println(s"Cluster status: $status")
804
805
case Failure(throwable) =>
806
println(s"Communication error: ${throwable.getMessage}")
807
}
808
}
809
}
810
```
811
812
## Common Patterns
813
814
### Message Routing and Dispatch
815
816
```scala
817
trait MessageRouter {
818
def route(message: Any, sender: ActorRef): Unit = message match {
819
// Job management messages
820
case msg: JobManagerMessages.SubmitJob => handleJobSubmission(msg, sender)
821
case msg: JobManagerMessages.CancelJob => handleJobCancellation(msg, sender)
822
case msg: JobManagerMessages.RequestJobStatus => handleJobStatusRequest(msg, sender)
823
824
// Task management messages
825
case msg: TaskManagerMessages.SubmitTask => handleTaskSubmission(msg, sender)
826
case msg: TaskManagerMessages.CancelTask => handleTaskCancellation(msg, sender)
827
case msg: TaskManagerMessages.UpdateTaskExecutionState => handleTaskStateUpdate(msg, sender)
828
829
// Resource management messages
830
case msg: ResourceManagerMessages.RequestSlot => handleSlotRequest(msg, sender)
831
case msg: ResourceManagerMessages.RegisterTaskExecutor => handleTaskManagerRegistration(msg, sender)
832
833
// Unknown message
834
case unknown => handleUnknownMessage(unknown, sender)
835
}
836
837
protected def handleJobSubmission(msg: JobManagerMessages.SubmitJob, sender: ActorRef): Unit
838
protected def handleJobCancellation(msg: JobManagerMessages.CancelJob, sender: ActorRef): Unit
839
protected def handleJobStatusRequest(msg: JobManagerMessages.RequestJobStatus, sender: ActorRef): Unit
840
841
protected def handleTaskSubmission(msg: TaskManagerMessages.SubmitTask, sender: ActorRef): Unit
842
protected def handleTaskCancellation(msg: TaskManagerMessages.CancelTask, sender: ActorRef): Unit
843
protected def handleTaskStateUpdate(msg: TaskManagerMessages.UpdateTaskExecutionState, sender: ActorRef): Unit
844
845
protected def handleSlotRequest(msg: ResourceManagerMessages.RequestSlot, sender: ActorRef): Unit
846
protected def handleTaskManagerRegistration(msg: ResourceManagerMessages.RegisterTaskExecutor, sender: ActorRef): Unit
847
848
protected def handleUnknownMessage(message: Any, sender: ActorRef): Unit = {
849
throw new IllegalArgumentException(s"Unknown message type: ${message.getClass}")
850
}
851
}
852
```
853
854
### Leader Session Management
855
856
```scala
857
trait LeaderSessionManager {
858
private var currentLeaderSessionID: Option[UUID] = None
859
860
def becomeLeader(sessionID: UUID): Unit = {
861
currentLeaderSessionID = Some(sessionID)
862
onBecomeLeader(sessionID)
863
}
864
865
def revokeLeadership(): Unit = {
866
currentLeaderSessionID = None
867
onRevokeLeadership()
868
}
869
870
def validateLeaderSession(message: RequiresLeaderSessionID): Boolean = {
871
currentLeaderSessionID.contains(message.leaderSessionID)
872
}
873
874
def handleLeaderSessionMessage(message: Any, sender: ActorRef): Unit = message match {
875
case LeaderSessionMessage(sessionID, wrappedMessage) =>
876
if (currentLeaderSessionID.contains(sessionID)) {
877
handleMessage(wrappedMessage, sender)
878
} else {
879
sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
880
}
881
882
case msg: RequiresLeaderSessionID =>
883
if (validateLeaderSession(msg)) {
884
handleMessage(msg, sender)
885
} else {
886
sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))
887
}
888
889
case other =>
890
handleMessage(other, sender)
891
}
892
893
protected def onBecomeLeader(sessionID: UUID): Unit = {}
894
protected def onRevokeLeadership(): Unit = {}
895
protected def handleMessage(message: Any, sender: ActorRef): Unit
896
}
897
```