or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

message-passing.mddocs/

0

# Message Passing (Scala APIs)

1

2

The Message Passing system provides Scala-based message definitions for actor-based communication within the Flink runtime cluster. These message classes define the communication protocols between JobManagers, TaskManagers, and clients, enabling distributed coordination and control operations across the cluster.

3

4

## JobManager Messages

5

6

### JobManagerMessages

7

8

Scala object containing message definitions for JobManager communication in the actor system.

9

10

```scala { .api }

11

object JobManagerMessages {

12

// Job submission and lifecycle messages

13

case class SubmitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour) extends RequiresLeaderSessionID

14

case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID

15

case class StopJob(jobID: JobID) extends RequiresLeaderSessionID

16

case class RescaleJob(jobID: JobID, newParallelism: Int) extends RequiresLeaderSessionID

17

18

// Job status and monitoring messages

19

case class RequestJobStatus(jobID: JobID)

20

case class JobStatusResponse(jobID: JobID, status: JobStatus, timestamp: Long)

21

22

case class RequestRunningJobs()

23

case class RunningJobs(runningJobs: Iterable[ExecutionGraph])

24

25

case class RequestJobDetails(jobID: JobID)

26

case class JobDetails(jobID: JobID, jobName: String, startTime: Long, endTime: Long,

27

status: JobStatus, lastModification: Long)

28

29

// Execution graph access messages

30

case class RequestExecutionGraph(jobID: JobID)

31

case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph)

32

case class ExecutionGraphNotFound(jobID: JobID)

33

34

// Savepoint and checkpoint messages

35

case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String]) extends RequiresLeaderSessionID

36

case class SavepointSuccess(jobID: JobID, savepointPath: String, timestamp: Long)

37

case class SavepointFailure(jobID: JobID, cause: Throwable)

38

39

case class DisposeSavepoint(savepointPath: String) extends RequiresLeaderSessionID

40

case class DisposeSavepointSuccess(savepointPath: String)

41

case class DisposeSavepointFailure(savepointPath: String, cause: Throwable)

42

43

// Accumulator and metrics messages

44

case class RequestAccumulators(jobID: JobID)

45

case class AccumulatorResultsFound(jobID: JobID, userAccumulators: Map[String, OptionalFailure[AnyRef]])

46

case class AccumulatorResultsNotFound(jobID: JobID)

47

48

// Configuration and cluster information

49

case object RequestClusterStatus

50

case class ClusterStatusWithTaskManagerInfo(

51

numTaskManagers: Int,

52

numSlotsTotal: Int,

53

numSlotsAvailable: Int,

54

taskManagerInfos: Iterable[TaskManagerInfo]

55

)

56

57

case object RequestClusterConfiguration

58

case class ClusterConfiguration(config: Configuration)

59

60

// Response messages

61

case class JobSubmitSuccess(jobID: JobID)

62

case class JobResultSuccess(result: JobExecutionResult)

63

case class JobResultFailure(cause: Throwable)

64

65

// Listening behaviour for job submissions

66

sealed trait ListeningBehaviour

67

case object ListeningBehaviour {

68

case object DETACHED extends ListeningBehaviour

69

case object EXECUTION_RESULT extends ListeningBehaviour

70

case object EXECUTION_RESULT_AND_STATE_CHANGES extends ListeningBehaviour

71

}

72

}

73

```

74

75

### JobManager Coordination Messages

76

77

```scala { .api }

78

object JobManagerMessages {

79

// Leader election and coordination

80

case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID

81

82

case object RequestLeaderSessionID

83

case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])

84

85

// Resource management coordination

86

case class RegisterTaskManager(

87

taskManagerLocation: TaskManagerLocation,

88

resourceID: ResourceID,

89

dataPort: Int,

90

hardwareDescription: HardwareDescription

91

)

92

93

case class TaskManagerRegistrationSuccess(

94

resourceID: ResourceID,

95

blobPort: Int

96

)

97

98

case class TaskManagerRegistrationRejection(

99

resourceID: ResourceID,

100

reason: String

101

)

102

103

// Heartbeat messages

104

case class Heartbeat(from: ResourceID, heartbeatPayload: HeartbeatPayload)

105

case class HeartbeatResponse(from: ResourceID)

106

107

// Shutdown and cleanup

108

case object RequestShutdown

109

case class Acknowledge() extends AcknowledgeMessage

110

}

111

```

112

113

## TaskManager Messages

114

115

### TaskManagerMessages

116

117

Scala object containing message definitions for TaskManager communication.

118

119

```scala { .api }

120

object TaskManagerMessages {

121

// Task deployment and lifecycle

122

case class SubmitTask(tdd: TaskDeploymentDescriptor) extends RequiresLeaderSessionID

123

case class TaskSubmitted(executionAttemptID: ExecutionAttemptID)

124

case class TaskFailed(executionAttemptID: ExecutionAttemptID, cause: Throwable)

125

case class TaskFinished(executionAttemptID: ExecutionAttemptID, executionResult: ExecutionResult)

126

127

case class CancelTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID

128

case class TaskCanceled(executionAttemptID: ExecutionAttemptID)

129

130

case class StopTask(executionAttemptID: ExecutionAttemptID) extends RequiresLeaderSessionID

131

case class TaskStopped(executionAttemptID: ExecutionAttemptID)

132

133

// Task status and monitoring

134

case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) extends RequiresLeaderSessionID

135

136

case class RequestTaskManagerStatus()

137

case class TaskManagerStatusResponse(

138

resourceID: ResourceID,

139

taskManagerLocation: TaskManagerLocation,

140

numberOfSlots: Int,

141

numberOfAvailableSlots: Int,

142

hardwareDescription: HardwareDescription

143

)

144

145

case class RequestRunningTasks()

146

case class RunningTasks(runningTasks: Collection[ExecutionAttemptID])

147

148

// Resource and slot management

149

case class RequestSlot(

150

slotID: SlotID,

151

jobID: JobID,

152

allocationID: AllocationID,

153

resourceProfile: ResourceProfile,

154

targetAddress: String

155

) extends RequiresLeaderSessionID

156

157

case class SlotOffer(

158

allocationID: AllocationID,

159

slotID: SlotID,

160

resourceProfile: ResourceProfile

161

)

162

163

case class FreeSlot(allocationID: AllocationID) extends RequiresLeaderSessionID

164

case class SlotFreed(allocationID: AllocationID)

165

166

// Checkpoint coordination

167

case class TriggerCheckpoint(

168

executionAttemptID: ExecutionAttemptID,

169

checkpointID: Long,

170

timestamp: Long,

171

checkpointOptions: CheckpointOptions

172

) extends RequiresLeaderSessionID

173

174

case class ConfirmCheckpoint(

175

executionAttemptID: ExecutionAttemptID,

176

checkpointID: Long,

177

checkpointMetrics: CheckpointMetrics,

178

subtaskState: TaskStateSnapshot

179

) extends RequiresLeaderSessionID

180

181

case class DeclineCheckpoint(

182

executionAttemptID: ExecutionAttemptID,

183

checkpointID: Long,

184

reason: CheckpointDeclineReason

185

) extends RequiresLeaderSessionID

186

187

// Registration and heartbeat

188

case class RegisterAtJobManager(

189

resourceID: ResourceID,

190

taskManagerLocation: TaskManagerLocation,

191

hardwareDescription: HardwareDescription

192

)

193

194

case class SendHeartbeat(resourceID: ResourceID, heartbeatPayload: AccumulatorSnapshot)

195

196

// Error handling and failure

197

case class TaskInFinalState(executionAttemptID: ExecutionAttemptID)

198

case class FailTask(executionAttemptID: ExecutionAttemptID, cause: Throwable)

199

200

// Stack trace and debugging

201

case class SendStackTrace()

202

case class StackTrace(stackTrace: Array[StackTraceElement])

203

204

case class RequestStackTrace(executionAttemptID: ExecutionAttemptID)

205

case class StackTraceResponse(executionAttemptID: ExecutionAttemptID, stackTrace: Array[StackTraceElement])

206

}

207

```

208

209

### TaskManager Coordination Messages

210

211

```scala { .api }

212

object TaskManagerMessages {

213

// Memory and resource management

214

case object RequestMemoryReport

215

case class MemoryReport(

216

managedMemory: Long,

217

totalMemory: Long,

218

availableMemory: Long,

219

gcStats: Array[GarbageCollectorStats]

220

)

221

222

case object RequestIOReport

223

case class IOReport(

224

diskSpaceInfo: Array[DiskSpaceInfo],

225

networkIOMetrics: NetworkIOMetrics

226

)

227

228

// Configuration and environment

229

case object RequestTaskManagerConfiguration

230

case class TaskManagerConfiguration(config: Configuration, tempDirectories: Array[File])

231

232

case object RequestEnvironmentInformation

233

case class EnvironmentInformation(

234

hostname: String,

235

taskManagerAddress: String,

236

dataPort: Int,

237

numberOfSlots: Int

238

)

239

240

// Disconnect and shutdown

241

case class Disconnect(message: String)

242

case object NotifyWhenRegisteredAtJobManager

243

case object RegisteredAtJobManager

244

245

case class TaskManagerShutdown()

246

}

247

```

248

249

## JobClient Messages

250

251

### JobClientMessages

252

253

Scala object containing message definitions for job client communication.

254

255

```scala { .api }

256

object JobClientMessages {

257

// Job submission from client

258

case class SubmitJobAndWait(

259

jobGraph: JobGraph,

260

listenToExecutionResult: Boolean,

261

leaderSessionTimeout: FiniteDuration

262

)

263

264

case class SubmitJobDetached(jobGraph: JobGraph)

265

266

// Job control operations

267

case class CancelJob(jobID: JobID)

268

case class CancelJobWithSavepoint(jobID: JobID, savepointDirectory: String)

269

case class StopJob(jobID: JobID)

270

271

// Status and monitoring requests

272

case class RequestJobStatus(jobID: JobID)

273

case class RequestBlobManagerPort()

274

case class BlobManagerPort(port: Int)

275

276

case class RequestExecutionResult(jobID: JobID)

277

278

// Savepoint operations from client

279

case class TriggerSavepoint(jobID: JobID, savepointDirectory: Option[String])

280

case class RequestSavepoint(jobID: JobID, savepointDirectory: String)

281

282

case class DisposeSavepoint(savepointPath: String)

283

284

// Configuration queries

285

case object RequestClusterConfiguration

286

case object RequestClusterStatus

287

288

// Response messages

289

case class JobExecutionResult(result: SerializedJobExecutionResult)

290

case class JobExecutionException(exception: SerializedThrowable)

291

292

case class JobStatusAnswer(jobID: JobID, status: JobStatus)

293

case class JobNotFound(jobID: JobID)

294

295

case class SavepointSuccess(jobID: JobID, savepointPath: String, triggerTime: Long)

296

case class SavepointFailure(jobID: JobID, cause: SerializedThrowable)

297

298

// Connection and session management

299

case object GetLeaderSessionID

300

case class LeaderSessionID(leaderSessionID: Option[UUID])

301

302

case class JobClientActorConnectionTimeoutException(message: String) extends Exception(message)

303

}

304

```

305

306

## Resource Manager Messages

307

308

### ResourceManagerMessages

309

310

Messages for ResourceManager communication and coordination.

311

312

```scala { .api }

313

object ResourceManagerMessages {

314

// Task Manager registration

315

case class RegisterTaskExecutor(

316

resourceID: ResourceID,

317

taskExecutorAddress: String,

318

dataPort: Int,

319

numberOfSlots: Int,

320

hardwareDescription: HardwareDescription

321

)

322

323

case class TaskExecutorRegistrationSuccess(

324

resourceManagerId: ResourceManagerId,

325

heartbeatInterval: Time

326

)

327

328

case class TaskExecutorRegistrationRejection(reason: String)

329

330

// Slot requests and allocation

331

case class RequestSlot(

332

jobMasterId: JobMasterId,

333

slotRequest: SlotRequest

334

)

335

336

case class SlotRequestRegistered(

337

slotRequestId: SlotRequestId,

338

targetAddress: String

339

)

340

341

case class SlotRequestFailed(

342

slotRequestId: SlotRequestId,

343

cause: Throwable

344

)

345

346

case class CancelSlotRequest(slotRequestId: SlotRequestId)

347

348

// Resource allocation and management

349

case class NotifyResourceAvailable(

350

resourceID: ResourceID,

351

availableSlots: Int

352

)

353

354

case class RequestResourceOverview()

355

case class ResourceOverview(

356

numberTaskManagers: Int,

357

numberRegisteredSlots: Int,

358

numberFreeSlots: Int

359

)

360

361

// JobManager coordination

362

case class RegisterJobManager(

363

jobMasterId: JobMasterId,

364

resourceID: ResourceID,

365

jobManagerAddress: String,

366

jobID: JobID

367

)

368

369

case class JobManagerRegistrationSuccess(heartbeatInterval: Time)

370

case class JobManagerRegistrationRejection(reason: String)

371

372

// Heartbeat and monitoring

373

case class TaskManagerHeartbeat(

374

resourceID: ResourceID,

375

slotReport: SlotReport

376

)

377

378

case class JobManagerHeartbeat(resourceID: ResourceID)

379

380

// Disconnection and cleanup

381

case class DisconnectTaskManager(resourceID: ResourceID, cause: Exception)

382

case class DisconnectJobManager(jobID: JobID, cause: Exception)

383

384

// Resource allocation requests

385

case class StartNewTaskManager(resourceProfile: ResourceProfile)

386

case class StopTaskManager(resourceID: ResourceID)

387

}

388

```

389

390

## Common Message Traits and Utilities

391

392

### Base Message Traits

393

394

```scala { .api }

395

// Base traits for message classification

396

trait RequiresLeaderSessionID {

397

def leaderSessionID: UUID

398

}

399

400

trait AcknowledgeMessage

401

402

sealed trait JobManagerMessage

403

sealed trait TaskManagerMessage

404

sealed trait ClientMessage

405

406

// Message decorators and wrappers

407

case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) extends RequiresLeaderSessionID

408

409

case class MessageDecorator[T](

410

target: ActorRef,

411

message: T,

412

timeout: FiniteDuration

413

)

414

415

// Response handling

416

sealed trait ResponseMessage

417

case class Success[T](result: T) extends ResponseMessage

418

case class Failure(cause: Throwable) extends ResponseMessage

419

```

420

421

### Message Serialization Support

422

423

```scala { .api }

424

object MessageSerializationUtils {

425

// Serialization helpers for complex message payloads

426

def serializeJobExecutionResult(result: JobExecutionResult): SerializedJobExecutionResult = {

427

new SerializedJobExecutionResult(

428

result.getJobID,

429

result.getNetRuntime,

430

ClassLoaderUtils.withContextClassLoader(

431

result.getClass.getClassLoader,

432

() => result.getAllAccumulatorResults

433

)

434

)

435

}

436

437

def serializeThrowable(throwable: Throwable): SerializedThrowable = {

438

new SerializedThrowable(throwable, throwable.getClass.getClassLoader)

439

}

440

441

def deserializeJobExecutionResult(

442

serialized: SerializedJobExecutionResult,

443

classLoader: ClassLoader

444

): JobExecutionResult = {

445

new JobExecutionResult(

446

serialized.getJobId,

447

serialized.getNetRuntime,

448

ClassLoaderUtils.withContextClassLoader(classLoader, serialized.getSerializedAccumulators)

449

)

450

}

451

}

452

```

453

454

## Usage Examples

455

456

### JobManager Message Handling

457

458

```scala

459

import akka.actor.{Actor, ActorLogging, ActorRef}

460

import org.apache.flink.runtime.messages.JobManagerMessages._

461

import java.util.UUID

462

463

class JobManagerActor extends Actor with ActorLogging {

464

private var leaderSessionID: Option[UUID] = None

465

private val runningJobs = mutable.Map[JobID, ExecutionGraph]()

466

467

override def receive: Receive = {

468

case SubmitJob(jobGraph, listeningBehaviour) =>

469

log.info(s"Received job submission: ${jobGraph.getJobID}")

470

471

try {

472

// Validate leader session

473

validateLeaderSession(sender())

474

475

// Create execution graph

476

val executionGraph = createExecutionGraph(jobGraph)

477

runningJobs += jobGraph.getJobID -> executionGraph

478

479

// Start job execution

480

executionGraph.scheduleForExecution()

481

482

sender() ! JobSubmitSuccess(jobGraph.getJobID)

483

484

// Handle listening behaviour

485

listeningBehaviour match {

486

case ListeningBehaviour.EXECUTION_RESULT =>

487

// Register for execution result notifications

488

registerForJobResult(jobGraph.getJobID, sender())

489

case ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES =>

490

// Register for both result and state change notifications

491

registerForJobUpdates(jobGraph.getJobID, sender())

492

case ListeningBehaviour.DETACHED =>

493

// No further notifications needed

494

}

495

496

} catch {

497

case ex: Exception =>

498

log.error(ex, s"Failed to submit job ${jobGraph.getJobID}")

499

sender() ! JobResultFailure(ex)

500

}

501

502

case CancelJob(jobID) =>

503

log.info(s"Received cancel request for job: $jobID")

504

505

runningJobs.get(jobID) match {

506

case Some(executionGraph) =>

507

try {

508

executionGraph.cancel()

509

sender() ! Acknowledge()

510

} catch {

511

case ex: Exception =>

512

sender() ! JobResultFailure(ex)

513

}

514

case None =>

515

sender() ! JobResultFailure(new JobNotFoundException(jobID))

516

}

517

518

case RequestJobStatus(jobID) =>

519

runningJobs.get(jobID) match {

520

case Some(executionGraph) =>

521

val status = executionGraph.getState

522

val timestamp = executionGraph.getStatusTimestamp(status)

523

sender() ! JobStatusResponse(jobID, status, timestamp)

524

case None =>

525

sender() ! JobResultFailure(new JobNotFoundException(jobID))

526

}

527

528

case RequestRunningJobs() =>

529

sender() ! RunningJobs(runningJobs.values)

530

531

case TriggerSavepoint(jobID, savepointDirectory) =>

532

log.info(s"Triggering savepoint for job $jobID")

533

534

runningJobs.get(jobID) match {

535

case Some(executionGraph) =>

536

val savepointFuture = executionGraph.triggerSavepoint(savepointDirectory.orNull)

537

538

savepointFuture.whenComplete { (savepointPath, throwable) =>

539

if (throwable != null) {

540

sender() ! SavepointFailure(jobID, throwable)

541

} else {

542

sender() ! SavepointSuccess(jobID, savepointPath, System.currentTimeMillis())

543

}

544

}

545

case None =>

546

sender() ! SavepointFailure(jobID, new JobNotFoundException(jobID))

547

}

548

549

case RequestLeaderSessionID =>

550

sender() ! ResponseLeaderSessionID(leaderSessionID)

551

552

case LeaderSessionMessage(sessionID, message) =>

553

if (leaderSessionID.contains(sessionID)) {

554

self.tell(message, sender())

555

} else {

556

log.warning(s"Received message with invalid leader session ID: $sessionID")

557

sender() ! JobResultFailure(new LeaderSessionIDException("Invalid leader session ID"))

558

}

559

}

560

561

private def validateLeaderSession(requester: ActorRef): Unit = {

562

if (leaderSessionID.isEmpty) {

563

throw new LeaderSessionIDException("No active leader session")

564

}

565

}

566

567

private def createExecutionGraph(jobGraph: JobGraph): ExecutionGraph = {

568

// Implementation for creating execution graph from job graph

569

// This would involve setting up vertices, scheduling, etc.

570

new ExecutionGraph(/* parameters */)

571

}

572

573

private def registerForJobResult(jobID: JobID, client: ActorRef): Unit = {

574

// Register client to receive job execution results

575

}

576

577

private def registerForJobUpdates(jobID: JobID, client: ActorRef): Unit = {

578

// Register client to receive job state changes and results

579

}

580

}

581

```

582

583

### TaskManager Message Handling

584

585

```scala

586

import akka.actor.{Actor, ActorLogging}

587

import org.apache.flink.runtime.messages.TaskManagerMessages._

588

589

class TaskManagerActor extends Actor with ActorLogging {

590

private val runningTasks = mutable.Map[ExecutionAttemptID, Task]()

591

private var jobManagerRef: Option[ActorRef] = None

592

private var isRegistered = false

593

594

override def receive: Receive = {

595

case RegisterAtJobManager(resourceID, taskManagerLocation, hardwareDescription) =>

596

log.info(s"Registering at JobManager: ${sender()}")

597

jobManagerRef = Some(sender())

598

599

// Send registration message to JobManager

600

sender() ! RegisterTaskManager(

601

taskManagerLocation,

602

resourceID,

603

taskManagerLocation.getDataPort,

604

hardwareDescription

605

)

606

607

case SubmitTask(taskDeploymentDescriptor) =>

608

val executionAttemptID = taskDeploymentDescriptor.getExecutionAttemptId

609

log.info(s"Received task submission: $executionAttemptID")

610

611

try {

612

// Create and start task

613

val task = createTask(taskDeploymentDescriptor)

614

runningTasks += executionAttemptID -> task

615

616

// Start task execution

617

task.startTaskThread()

618

619

sender() ! TaskSubmitted(executionAttemptID)

620

621

} catch {

622

case ex: Exception =>

623

log.error(ex, s"Failed to submit task $executionAttemptID")

624

sender() ! TaskFailed(executionAttemptID, ex)

625

}

626

627

case CancelTask(executionAttemptID) =>

628

log.info(s"Cancelling task: $executionAttemptID")

629

630

runningTasks.get(executionAttemptID) match {

631

case Some(task) =>

632

try {

633

task.cancelExecution()

634

sender() ! TaskCanceled(executionAttemptID)

635

} catch {

636

case ex: Exception =>

637

log.error(ex, s"Failed to cancel task $executionAttemptID")

638

sender() ! TaskFailed(executionAttemptID, ex)

639

}

640

case None =>

641

log.warning(s"Attempted to cancel unknown task: $executionAttemptID")

642

sender() ! TaskCanceled(executionAttemptID) // Already not running

643

}

644

645

case TriggerCheckpoint(executionAttemptID, checkpointID, timestamp, checkpointOptions) =>

646

log.debug(s"Triggering checkpoint $checkpointID for task $executionAttemptID")

647

648

runningTasks.get(executionAttemptID) match {

649

case Some(task) =>

650

task.triggerCheckpointBarrier(checkpointID, timestamp, checkpointOptions)

651

case None =>

652

log.warning(s"Checkpoint triggered for unknown task: $executionAttemptID")

653

sender() ! DeclineCheckpoint(

654

executionAttemptID,

655

checkpointID,

656

CheckpointDeclineReason.TASK_NOT_RUNNING

657

)

658

}

659

660

case RequestTaskManagerStatus() =>

661

val numberOfSlots = getNumberOfSlots()

662

val availableSlots = getAvailableSlots()

663

664

sender() ! TaskManagerStatusResponse(

665

getResourceID(),

666

getTaskManagerLocation(),

667

numberOfSlots,

668

availableSlots,

669

getHardwareDescription()

670

)

671

672

case RequestRunningTasks() =>

673

sender() ! RunningTasks(runningTasks.keys.toList.asJavaCollection)

674

675

case SendHeartbeat(resourceID, accumulatorSnapshot) =>

676

jobManagerRef.foreach { jm =>

677

jm ! Heartbeat(resourceID, accumulatorSnapshot)

678

}

679

680

case TaskInFinalState(executionAttemptID) =>

681

log.info(s"Task finished: $executionAttemptID")

682

runningTasks.remove(executionAttemptID)

683

684

// Notify JobManager about task completion

685

jobManagerRef.foreach { jm =>

686

jm ! UpdateTaskExecutionState(createTaskExecutionState(executionAttemptID))

687

}

688

}

689

690

private def createTask(tdd: TaskDeploymentDescriptor): Task = {

691

// Implementation for creating a task from deployment descriptor

692

new Task(/* parameters from tdd */)

693

}

694

695

private def getNumberOfSlots(): Int = {

696

// Return total number of task slots

697

4 // Example

698

}

699

700

private def getAvailableSlots(): Int = {

701

// Return number of available task slots

702

getNumberOfSlots() - runningTasks.size

703

}

704

705

private def createTaskExecutionState(executionAttemptID: ExecutionAttemptID): TaskExecutionState = {

706

// Create task execution state for reporting

707

new TaskExecutionState(

708

executionAttemptID.getJobId,

709

executionAttemptID,

710

ExecutionState.FINISHED,

711

null // no error

712

)

713

}

714

}

715

```

716

717

### Client Message Communication

718

719

```scala

720

import akka.actor.{Actor, ActorRef, Props, ActorSystem}

721

import akka.pattern.ask

722

import akka.util.Timeout

723

import org.apache.flink.runtime.messages.JobClientMessages._

724

import scala.concurrent.duration._

725

import scala.util.{Success, Failure}

726

727

class FlinkJobClient(jobManagerRef: ActorRef)(implicit system: ActorSystem) {

728

implicit val timeout: Timeout = 30.seconds

729

implicit val ec = system.dispatcher

730

731

def submitJob(jobGraph: JobGraph, detached: Boolean = false): Unit = {

732

val message = if (detached) {

733

SubmitJobDetached(jobGraph)

734

} else {

735

SubmitJobAndWait(jobGraph, listenToExecutionResult = true, 30.seconds)

736

}

737

738

val resultFuture = jobManagerRef ? message

739

740

resultFuture.onComplete {

741

case Success(JobExecutionResult(result)) =>

742

println(s"Job ${jobGraph.getJobID} completed successfully")

743

println(s"Execution time: ${result.getNetRuntime} ms")

744

745

case Success(JobExecutionException(exception)) =>

746

println(s"Job ${jobGraph.getJobID} failed: ${exception.getMessage}")

747

748

case Failure(throwable) =>

749

println(s"Communication error: ${throwable.getMessage}")

750

}

751

}

752

753

def cancelJob(jobID: JobID): Unit = {

754

val resultFuture = jobManagerRef ? CancelJob(jobID)

755

756

resultFuture.onComplete {

757

case Success(Acknowledge()) =>

758

println(s"Job $jobID cancelled successfully")

759

760

case Success(JobResultFailure(cause)) =>

761

println(s"Failed to cancel job $jobID: ${cause.getMessage}")

762

763

case Failure(throwable) =>

764

println(s"Communication error: ${throwable.getMessage}")

765

}

766

}

767

768

def triggerSavepoint(jobID: JobID, savepointDirectory: Option[String]): Unit = {

769

val resultFuture = jobManagerRef ? TriggerSavepoint(jobID, savepointDirectory)

770

771

resultFuture.onComplete {

772

case Success(SavepointSuccess(jobId, savepointPath, triggerTime)) =>

773

println(s"Savepoint created for job $jobId at: $savepointPath")

774

775

case Success(SavepointFailure(jobId, cause)) =>

776

println(s"Savepoint failed for job $jobId: ${cause.getMessage}")

777

778

case Failure(throwable) =>

779

println(s"Communication error: ${throwable.getMessage}")

780

}

781

}

782

783

def getJobStatus(jobID: JobID): Unit = {

784

val resultFuture = jobManagerRef ? RequestJobStatus(jobID)

785

786

resultFuture.onComplete {

787

case Success(JobStatusAnswer(jobId, status)) =>

788

println(s"Job $jobId status: $status")

789

790

case Success(JobNotFound(jobId)) =>

791

println(s"Job $jobId not found")

792

793

case Failure(throwable) =>

794

println(s"Communication error: ${throwable.getMessage}")

795

}

796

}

797

798

def getClusterStatus(): Unit = {

799

val resultFuture = jobManagerRef ? RequestClusterStatus

800

801

resultFuture.onComplete {

802

case Success(status) =>

803

println(s"Cluster status: $status")

804

805

case Failure(throwable) =>

806

println(s"Communication error: ${throwable.getMessage}")

807

}

808

}

809

}

810

```

811

812

## Common Patterns

813

814

### Message Routing and Dispatch

815

816

```scala

817

trait MessageRouter {

818

def route(message: Any, sender: ActorRef): Unit = message match {

819

// Job management messages

820

case msg: JobManagerMessages.SubmitJob => handleJobSubmission(msg, sender)

821

case msg: JobManagerMessages.CancelJob => handleJobCancellation(msg, sender)

822

case msg: JobManagerMessages.RequestJobStatus => handleJobStatusRequest(msg, sender)

823

824

// Task management messages

825

case msg: TaskManagerMessages.SubmitTask => handleTaskSubmission(msg, sender)

826

case msg: TaskManagerMessages.CancelTask => handleTaskCancellation(msg, sender)

827

case msg: TaskManagerMessages.UpdateTaskExecutionState => handleTaskStateUpdate(msg, sender)

828

829

// Resource management messages

830

case msg: ResourceManagerMessages.RequestSlot => handleSlotRequest(msg, sender)

831

case msg: ResourceManagerMessages.RegisterTaskExecutor => handleTaskManagerRegistration(msg, sender)

832

833

// Unknown message

834

case unknown => handleUnknownMessage(unknown, sender)

835

}

836

837

protected def handleJobSubmission(msg: JobManagerMessages.SubmitJob, sender: ActorRef): Unit

838

protected def handleJobCancellation(msg: JobManagerMessages.CancelJob, sender: ActorRef): Unit

839

protected def handleJobStatusRequest(msg: JobManagerMessages.RequestJobStatus, sender: ActorRef): Unit

840

841

protected def handleTaskSubmission(msg: TaskManagerMessages.SubmitTask, sender: ActorRef): Unit

842

protected def handleTaskCancellation(msg: TaskManagerMessages.CancelTask, sender: ActorRef): Unit

843

protected def handleTaskStateUpdate(msg: TaskManagerMessages.UpdateTaskExecutionState, sender: ActorRef): Unit

844

845

protected def handleSlotRequest(msg: ResourceManagerMessages.RequestSlot, sender: ActorRef): Unit

846

protected def handleTaskManagerRegistration(msg: ResourceManagerMessages.RegisterTaskExecutor, sender: ActorRef): Unit

847

848

protected def handleUnknownMessage(message: Any, sender: ActorRef): Unit = {

849

throw new IllegalArgumentException(s"Unknown message type: ${message.getClass}")

850

}

851

}

852

```

853

854

### Leader Session Management

855

856

```scala

857

trait LeaderSessionManager {

858

private var currentLeaderSessionID: Option[UUID] = None

859

860

def becomeLeader(sessionID: UUID): Unit = {

861

currentLeaderSessionID = Some(sessionID)

862

onBecomeLeader(sessionID)

863

}

864

865

def revokeLeadership(): Unit = {

866

currentLeaderSessionID = None

867

onRevokeLeadership()

868

}

869

870

def validateLeaderSession(message: RequiresLeaderSessionID): Boolean = {

871

currentLeaderSessionID.contains(message.leaderSessionID)

872

}

873

874

def handleLeaderSessionMessage(message: Any, sender: ActorRef): Unit = message match {

875

case LeaderSessionMessage(sessionID, wrappedMessage) =>

876

if (currentLeaderSessionID.contains(sessionID)) {

877

handleMessage(wrappedMessage, sender)

878

} else {

879

sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))

880

}

881

882

case msg: RequiresLeaderSessionID =>

883

if (validateLeaderSession(msg)) {

884

handleMessage(msg, sender)

885

} else {

886

sender ! JobResultFailure(new LeaderSessionIDException("Invalid leader session"))

887

}

888

889

case other =>

890

handleMessage(other, sender)

891

}

892

893

protected def onBecomeLeader(sessionID: UUID): Unit = {}

894

protected def onRevokeLeadership(): Unit = {}

895

protected def handleMessage(message: Any, sender: ActorRef): Unit

896

}

897

```