or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-system.mdconcurrent-utilities.mdexceptions.mdindex.mdrpc-configuration.mdrpc-system.md

exceptions.mddocs/

0

# Exception Handling

1

2

Specialized exception classes for RPC-specific error conditions, state management, and message handling in the Pekko RPC system.

3

4

## Capabilities

5

6

### RpcInvalidStateException

7

8

Exception indicating that an RPC endpoint or service is in an invalid state for the requested operation.

9

10

```java { .api }

11

/**

12

* Exception indicating invalid RPC state.

13

* Thrown when RPC operations are attempted on endpoints or services

14

* that are not in the appropriate state (e.g., stopped, terminated, or not started).

15

*/

16

public class RpcInvalidStateException extends FlinkRuntimeException {

17

18

/**

19

* Constructor with descriptive message.

20

* @param message Description of the invalid state condition

21

*/

22

public RpcInvalidStateException(String message);

23

24

/**

25

* Constructor with underlying cause.

26

* @param cause The underlying exception that led to the invalid state

27

*/

28

public RpcInvalidStateException(Throwable cause);

29

30

/**

31

* Constructor with both message and cause.

32

* @param message Description of the invalid state condition

33

* @param cause The underlying exception that led to the invalid state

34

*/

35

public RpcInvalidStateException(String message, Throwable cause);

36

}

37

```

38

39

**Usage Examples:**

40

41

```java

42

import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;

43

44

public class RpcEndpointImpl extends RpcEndpoint {

45

private volatile boolean isStarted = false;

46

private volatile boolean isTerminated = false;

47

48

public void performOperation() throws RpcInvalidStateException {

49

if (!isStarted) {

50

throw new RpcInvalidStateException(

51

"Cannot perform operation: RPC endpoint has not been started"

52

);

53

}

54

55

if (isTerminated) {

56

throw new RpcInvalidStateException(

57

"Cannot perform operation: RPC endpoint has been terminated"

58

);

59

}

60

61

// Perform the operation

62

doActualWork();

63

}

64

65

public void connectToRemote(String address) throws RpcInvalidStateException {

66

try {

67

if (getActorSystem().whenTerminated().isCompleted()) {

68

throw new RpcInvalidStateException(

69

"Cannot connect to remote: Actor system has been terminated"

70

);

71

}

72

73

// Attempt connection

74

establishConnection(address);

75

76

} catch (Exception e) {

77

throw new RpcInvalidStateException(

78

"Failed to connect to remote endpoint due to invalid state",

79

e

80

);

81

}

82

}

83

84

// Error handling in service lifecycle

85

public void handleStateTransition() {

86

try {

87

transitionToNextState();

88

} catch (IllegalStateException e) {

89

// Convert to RPC-specific exception

90

throw new RpcInvalidStateException(

91

"Invalid state transition in RPC service",

92

e

93

);

94

}

95

}

96

}

97

```

98

99

### UnknownMessageException

100

101

Exception for handling unknown or unsupported message types in RPC communication.

102

103

```java { .api }

104

/**

105

* Exception for unknown message types.

106

* Thrown when an RPC actor receives a message it cannot handle or recognize.

107

* Extends RpcRuntimeException to indicate RPC-specific runtime errors.

108

*/

109

public class UnknownMessageException extends RpcRuntimeException {

110

111

/**

112

* Constructor with descriptive message.

113

* @param message Description of the unknown message condition

114

*/

115

public UnknownMessageException(String message);

116

117

/**

118

* Constructor with message and underlying cause.

119

* @param message Description of the unknown message condition

120

* @param cause The underlying exception that occurred while processing

121

*/

122

public UnknownMessageException(String message, Throwable cause);

123

124

/**

125

* Constructor with underlying cause only.

126

* @param cause The underlying exception that occurred while processing

127

*/

128

public UnknownMessageException(Throwable cause);

129

}

130

```

131

132

**Usage Examples:**

133

134

```java

135

import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;

136

import org.apache.pekko.actor.AbstractActor;

137

138

public class RpcActorImpl extends AbstractActor {

139

140

@Override

141

public Receive createReceive() {

142

return receiveBuilder()

143

.match(String.class, this::handleStringMessage)

144

.match(Integer.class, this::handleIntegerMessage)

145

.match(StartMessage.class, this::handleStartMessage)

146

.match(StopMessage.class, this::handleStopMessage)

147

.matchAny(this::handleUnknownMessage)

148

.build();

149

}

150

151

private void handleStringMessage(String message) {

152

// Handle string messages

153

logger.debug("Received string message: {}", message);

154

}

155

156

private void handleIntegerMessage(Integer message) {

157

// Handle integer messages

158

logger.debug("Received integer message: {}", message);

159

}

160

161

private void handleStartMessage(StartMessage message) {

162

// Handle start control message

163

logger.info("Starting RPC actor");

164

}

165

166

private void handleStopMessage(StopMessage message) {

167

// Handle stop control message

168

logger.info("Stopping RPC actor");

169

}

170

171

private void handleUnknownMessage(Object message) {

172

String messageType = message != null ? message.getClass().getSimpleName() : "null";

173

String errorMsg = String.format(

174

"Received unknown message type '%s': %s",

175

messageType,

176

message

177

);

178

179

logger.warn(errorMsg);

180

181

// Send error response back to sender

182

getSender().tell(

183

new UnknownMessageException(errorMsg),

184

getSelf()

185

);

186

}

187

}

188

189

// Message handler with exception propagation

190

public class MessageProcessor {

191

192

public void processMessage(Object message) throws UnknownMessageException {

193

if (message instanceof CommandMessage) {

194

processCommand((CommandMessage) message);

195

} else if (message instanceof QueryMessage) {

196

processQuery((QueryMessage) message);

197

} else if (message instanceof EventMessage) {

198

processEvent((EventMessage) message);

199

} else {

200

throw new UnknownMessageException(

201

"Cannot process message of type: " + message.getClass().getName()

202

);

203

}

204

}

205

206

public void handleMessageWithRecovery(Object message) {

207

try {

208

processMessage(message);

209

} catch (UnknownMessageException e) {

210

logger.error("Failed to process unknown message", e);

211

212

// Attempt fallback processing

213

try {

214

processFallback(message);

215

} catch (Exception fallbackException) {

216

throw new UnknownMessageException(

217

"Message processing failed completely",

218

fallbackException

219

);

220

}

221

}

222

}

223

224

private void processCommand(CommandMessage cmd) { /* implementation */ }

225

private void processQuery(QueryMessage query) { /* implementation */ }

226

private void processEvent(EventMessage event) { /* implementation */ }

227

private void processFallback(Object message) { /* fallback implementation */ }

228

}

229

```

230

231

### Exception Handling Patterns

232

233

Common patterns for handling RPC exceptions in distributed environments.

234

235

```java { .api }

236

/**

237

* Utility methods for common RPC exception handling patterns.

238

*/

239

public class RpcExceptionHandling {

240

241

/**

242

* Determines if an exception indicates a recoverable RPC error.

243

* @param exception Exception to analyze

244

* @return true if the error might be recoverable with retry

245

*/

246

public static boolean isRecoverableException(Throwable exception);

247

248

/**

249

* Determines if an exception indicates RPC endpoint termination.

250

* @param exception Exception to analyze

251

* @return true if the exception indicates endpoint termination

252

*/

253

public static boolean isEndpointTerminatedException(Throwable exception);

254

255

/**

256

* Extracts the root cause from a chain of RPC exceptions.

257

* @param exception Exception to analyze

258

* @return Root cause exception

259

*/

260

public static Throwable getRootCause(Throwable exception);

261

}

262

```

263

264

**Comprehensive Error Handling Examples:**

265

266

```java

267

import org.apache.flink.runtime.rpc.pekko.exceptions.*;

268

import java.util.concurrent.CompletableFuture;

269

import java.util.concurrent.CompletionException;

270

271

public class RobustRpcClient {

272

private static final int MAX_RETRIES = 3;

273

private static final long RETRY_DELAY_MS = 1000;

274

275

public <T> CompletableFuture<T> callWithRetry(

276

String endpoint,

277

String methodName,

278

Object... args) {

279

280

return callWithRetryInternal(endpoint, methodName, 0, args);

281

}

282

283

private <T> CompletableFuture<T> callWithRetryInternal(

284

String endpoint,

285

String methodName,

286

int attempt,

287

Object... args) {

288

289

return makeRpcCall(endpoint, methodName, args)

290

.handle((result, throwable) -> {

291

if (throwable == null) {

292

return CompletableFuture.completedFuture(result);

293

}

294

295

// Unwrap CompletionException

296

Throwable actualException = throwable instanceof CompletionException

297

? throwable.getCause() : throwable;

298

299

if (shouldRetry(actualException, attempt)) {

300

logger.warn("RPC call failed (attempt {}), retrying: {}",

301

attempt + 1, actualException.getMessage());

302

303

return CompletableFuture

304

.delayedExecutor(RETRY_DELAY_MS, TimeUnit.MILLISECONDS)

305

.execute(() -> callWithRetryInternal(endpoint, methodName, attempt + 1, args));

306

} else {

307

logger.error("RPC call failed permanently after {} attempts", attempt + 1, actualException);

308

return CompletableFuture.<T>failedFuture(actualException);

309

}

310

})

311

.thenCompose(future -> future);

312

}

313

314

private boolean shouldRetry(Throwable exception, int attempt) {

315

if (attempt >= MAX_RETRIES) {

316

return false;

317

}

318

319

// Don't retry on invalid state - usually permanent

320

if (exception instanceof RpcInvalidStateException) {

321

return false;

322

}

323

324

// Don't retry on unknown message - usually a programming error

325

if (exception instanceof UnknownMessageException) {

326

return false;

327

}

328

329

// Retry on network issues, timeouts, etc.

330

return isRetryableException(exception);

331

}

332

333

private boolean isRetryableException(Throwable exception) {

334

return exception instanceof java.net.ConnectException ||

335

exception instanceof java.util.concurrent.TimeoutException ||

336

exception instanceof org.apache.pekko.actor.ActorNotFound ||

337

(exception.getMessage() != null &&

338

exception.getMessage().contains("connection refused"));

339

}

340

341

// Circuit breaker pattern for RPC calls

342

public class RpcCircuitBreaker {

343

private volatile State state = State.CLOSED;

344

private volatile int failureCount = 0;

345

private volatile long lastFailureTime = 0;

346

347

private static final int FAILURE_THRESHOLD = 5;

348

private static final long TIMEOUT_MS = 60000; // 1 minute

349

350

enum State { CLOSED, OPEN, HALF_OPEN }

351

352

public <T> CompletableFuture<T> execute(CompletableFuture<T> operation) {

353

if (state == State.OPEN) {

354

if (System.currentTimeMillis() - lastFailureTime > TIMEOUT_MS) {

355

state = State.HALF_OPEN;

356

} else {

357

return CompletableFuture.failedFuture(

358

new RpcInvalidStateException("Circuit breaker is OPEN")

359

);

360

}

361

}

362

363

return operation

364

.whenComplete((result, throwable) -> {

365

if (throwable == null) {

366

onSuccess();

367

} else {

368

onFailure();

369

}

370

});

371

}

372

373

private void onSuccess() {

374

failureCount = 0;

375

state = State.CLOSED;

376

}

377

378

private void onFailure() {

379

failureCount++;

380

lastFailureTime = System.currentTimeMillis();

381

382

if (failureCount >= FAILURE_THRESHOLD) {

383

state = State.OPEN;

384

}

385

}

386

}

387

}

388

```

389

390

**Error Recovery Strategies:**

391

392

```java

393

public class RpcErrorRecovery {

394

395

// Graceful degradation on RPC failures

396

public String getJobStatus(String jobId) {

397

try {

398

return jobManagerGateway.getJobStatus(jobId).get();

399

} catch (RpcInvalidStateException e) {

400

logger.warn("JobManager not available, returning cached status", e);

401

return getCachedJobStatus(jobId);

402

} catch (UnknownMessageException e) {

403

logger.error("JobManager doesn't support getJobStatus operation", e);

404

return "UNKNOWN";

405

} catch (Exception e) {

406

logger.error("Failed to get job status", e);

407

throw new RuntimeException("Job status unavailable", e);

408

}

409

}

410

411

// Failover to backup endpoints

412

public CompletableFuture<String> callWithFailover(List<String> endpoints, String method) {

413

if (endpoints.isEmpty()) {

414

return CompletableFuture.failedFuture(

415

new RpcInvalidStateException("No endpoints available")

416

);

417

}

418

419

return callEndpoint(endpoints.get(0), method)

420

.handle((result, throwable) -> {

421

if (throwable == null) {

422

return CompletableFuture.completedFuture(result);

423

}

424

425

if (endpoints.size() > 1) {

426

logger.warn("Primary endpoint failed, trying backup: {}", throwable.getMessage());

427

return callWithFailover(endpoints.subList(1, endpoints.size()), method);

428

} else {

429

return CompletableFuture.<String>failedFuture(throwable);

430

}

431

})

432

.thenCompose(future -> future);

433

}

434

435

private CompletableFuture<String> callEndpoint(String endpoint, String method) {

436

// Implementation for calling specific endpoint

437

return CompletableFuture.completedFuture("result");

438

}

439

440

private String getCachedJobStatus(String jobId) {

441

// Implementation for getting cached status

442

return "CACHED_STATUS";

443

}

444

}

445

```