or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-system.mdconcurrent-utilities.mdexceptions.mdindex.mdrpc-configuration.mdrpc-system.md

actor-system.mddocs/

0

# Actor System Management

1

2

Tools and utilities for starting, configuring, and managing Pekko actor systems with proper thread pool configuration, SSL support, and Flink-specific optimizations.

3

4

## Capabilities

5

6

### ActorSystemBootstrapTools

7

8

Provides factory methods for creating and configuring Pekko actor systems in various deployment scenarios.

9

10

```java { .api }

11

/**

12

* Tools for starting and configuring Pekko actor systems with Flink-specific settings.

13

*/

14

public class ActorSystemBootstrapTools {

15

16

/**

17

* Starts remote actor system with external address binding.

18

* @param configuration Flink configuration object

19

* @param externalAddress External address to bind to

20

* @param externalPortRange Port range specification (e.g., "6123-6130")

21

* @param logger Logger instance for bootstrap messages

22

* @return Configured ActorSystem instance

23

* @throws Exception if actor system creation fails

24

*/

25

public static ActorSystem startRemoteActorSystem(

26

Configuration configuration,

27

String externalAddress,

28

String externalPortRange,

29

Logger logger

30

) throws Exception;

31

32

/**

33

* Starts remote actor system with full configuration options.

34

* @param configuration Flink configuration object

35

* @param actorSystemName Name for the actor system

36

* @param externalAddress External address to bind to

37

* @param externalPortRange Port range specification

38

* @param bindAddress Internal bind address (can be different from external)

39

* @param bindPort Optional specific bind port

40

* @param logger Logger instance

41

* @param actorSystemExecutorConfiguration Executor configuration

42

* @param customConfig Additional Pekko configuration

43

* @return Configured ActorSystem instance

44

* @throws Exception if actor system creation fails

45

*/

46

public static ActorSystem startRemoteActorSystem(

47

Configuration configuration,

48

String actorSystemName,

49

String externalAddress,

50

String externalPortRange,

51

String bindAddress,

52

Optional<Integer> bindPort,

53

Logger logger,

54

Config actorSystemExecutorConfiguration,

55

Config customConfig

56

) throws Exception;

57

58

/**

59

* Starts local actor system for single-node scenarios.

60

* @param configuration Flink configuration object

61

* @param actorSystemName Name for the actor system

62

* @param logger Logger instance

63

* @param actorSystemExecutorConfiguration Executor configuration

64

* @param customConfig Additional Pekko configuration

65

* @return Configured ActorSystem instance

66

* @throws Exception if actor system creation fails

67

*/

68

public static ActorSystem startLocalActorSystem(

69

Configuration configuration,

70

String actorSystemName,

71

Logger logger,

72

Config actorSystemExecutorConfiguration,

73

Config customConfig

74

) throws Exception;

75

76

/**

77

* Gets fork-join executor configuration from Flink configuration.

78

* @param configuration Flink configuration object

79

* @return ForkJoinExecutorConfiguration for actor system

80

*/

81

public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(

82

Configuration configuration

83

);

84

85

/**

86

* Gets fork-join executor configuration optimized for remote communication.

87

* @param configuration Flink configuration object

88

* @return ForkJoinExecutorConfiguration for remote actor system

89

*/

90

public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(

91

Configuration configuration

92

);

93

}

94

```

95

96

**Usage Examples:**

97

98

```java

99

import org.apache.flink.configuration.Configuration;

100

import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;

101

import org.apache.pekko.actor.ActorSystem;

102

import org.slf4j.Logger;

103

import org.slf4j.LoggerFactory;

104

105

Logger logger = LoggerFactory.getLogger(MyClass.class);

106

Configuration config = new Configuration();

107

108

// Start local actor system for development

109

ActorSystem localSystem = ActorSystemBootstrapTools.startLocalActorSystem(

110

config,

111

"flink-local",

112

logger,

113

null, // default executor config

114

null // default Pekko config

115

);

116

117

// Start remote actor system for cluster deployment

118

ActorSystem remoteSystem = ActorSystemBootstrapTools.startRemoteActorSystem(

119

config,

120

"flink-cluster",

121

logger

122

);

123

124

// Start remote system with specific configuration

125

RpcSystem.ForkJoinExecutorConfiguration executorConfig =

126

ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(config);

127

128

ActorSystem configuredSystem = ActorSystemBootstrapTools.startRemoteActorSystem(

129

config,

130

"flink-production",

131

"192.168.1.100",

132

"6123-6130",

133

"0.0.0.0",

134

Optional.empty(),

135

logger,

136

executorConfig.toConfig(),

137

null

138

);

139

```

140

141

### PekkoUtils

142

143

Comprehensive utility class for Pekko configuration, actor system management, and URL handling.

144

145

```java { .api }

146

/**

147

* Utility methods for Pekko actor system configuration and management.

148

*/

149

public class PekkoUtils {

150

151

/**

152

* Gets the standard Flink actor system name.

153

* @return Standard actor system name used by Flink

154

*/

155

public static String getFlinkActorSystemName();

156

157

/**

158

* Gets thread pool executor configuration from settings.

159

* @param configuration Thread pool executor configuration

160

* @return Pekko Config object for thread pool executor

161

*/

162

public static Config getThreadPoolExecutorConfig(

163

RpcSystem.FixedThreadPoolExecutorConfiguration configuration

164

);

165

166

/**

167

* Gets fork-join executor configuration from settings.

168

* @param configuration Fork-join executor configuration

169

* @return Pekko Config object for fork-join executor

170

*/

171

public static Config getForkJoinExecutorConfig(

172

RpcSystem.ForkJoinExecutorConfiguration configuration

173

);

174

175

/**

176

* Creates local actor system with Flink configuration.

177

* @param configuration Flink configuration object

178

* @return Local ActorSystem instance

179

*/

180

public static ActorSystem createLocalActorSystem(Configuration configuration);

181

182

/**

183

* Creates actor system with custom name and configuration.

184

* @param actorSystemName Name for the actor system

185

* @param config Pekko configuration object

186

* @return ActorSystem instance

187

*/

188

public static ActorSystem createActorSystem(String actorSystemName, Config config);

189

190

/**

191

* Creates default actor system with standard settings.

192

* @return Default ActorSystem instance

193

*/

194

public static ActorSystem createDefaultActorSystem();

195

196

/**

197

* Gets Pekko configuration for external address binding.

198

* @param configuration Flink configuration object

199

* @param externalAddress External address to bind to

200

* @return Pekko Config object

201

*/

202

public static Config getConfig(Configuration configuration, HostAndPort externalAddress);

203

204

/**

205

* Gets Pekko configuration with separate bind address.

206

* @param configuration Flink configuration object

207

* @param externalAddress External address for external communication

208

* @param bindAddress Internal bind address

209

* @param executorConfig Executor configuration

210

* @return Pekko Config object

211

*/

212

public static Config getConfig(

213

Configuration configuration,

214

HostAndPort externalAddress,

215

HostAndPort bindAddress,

216

Config executorConfig

217

);

218

219

/**

220

* Gets address from running actor system.

221

* @param system ActorSystem instance

222

* @return Address object representing the system's address

223

*/

224

public static Address getAddress(ActorSystem system);

225

226

/**

227

* Gets RPC URL for a specific actor.

228

* @param system ActorSystem containing the actor

229

* @param actor ActorRef to generate URL for

230

* @return RPC URL string for the actor

231

*/

232

public static String getRpcURL(ActorSystem system, ActorRef actor);

233

234

/**

235

* Extracts address from RPC URL string.

236

* @param rpcURL RPC URL to parse

237

* @return Address object extracted from URL

238

* @throws MalformedURLException if URL is malformed

239

*/

240

public static Address getAddressFromRpcURL(String rpcURL) throws MalformedURLException;

241

242

/**

243

* Extracts InetSocketAddress from RPC URL string.

244

* @param rpcURL RPC URL to parse

245

* @return InetSocketAddress extracted from URL

246

* @throws Exception if URL cannot be parsed

247

*/

248

public static InetSocketAddress getInetSocketAddressFromRpcURL(String rpcURL) throws Exception;

249

250

/**

251

* Terminates actor system gracefully.

252

* @param actorSystem ActorSystem to terminate

253

* @return CompletableFuture indicating termination completion

254

*/

255

public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem);

256

}

257

```

258

259

**Advanced Configuration Examples:**

260

261

```java

262

import org.apache.flink.runtime.rpc.pekko.PekkoUtils;

263

import org.apache.flink.runtime.rpc.pekko.HostAndPort;

264

import org.apache.pekko.actor.Address;

265

import com.typesafe.config.Config;

266

267

// Create actor system with custom executor configuration

268

RpcSystem.ForkJoinExecutorConfiguration executorConfig =

269

new RpcSystem.ForkJoinExecutorConfiguration(8, 64, 2.0);

270

Config pekkoExecutorConfig = PekkoUtils.getForkJoinExecutorConfig(executorConfig);

271

272

ActorSystem customSystem = PekkoUtils.createActorSystem(

273

"flink-custom",

274

pekkoExecutorConfig

275

);

276

277

// Configure for external access with separate bind address

278

HostAndPort externalAddress = new HostAndPort("public.example.com", 6123);

279

HostAndPort bindAddress = new HostAndPort("0.0.0.0", 6123);

280

281

Config clusterConfig = PekkoUtils.getConfig(

282

flinkConfig,

283

externalAddress,

284

bindAddress,

285

pekkoExecutorConfig

286

);

287

288

// Extract network information from running systems

289

Address systemAddress = PekkoUtils.getAddress(customSystem);

290

String actorUrl = PekkoUtils.getRpcURL(customSystem, someActor);

291

292

// Parse URLs for connection information

293

InetSocketAddress socketAddress = PekkoUtils.getInetSocketAddressFromRpcURL(

294

"pekko://flink@cluster-node:6123/user/jobmanager"

295

);

296

297

// Graceful shutdown

298

CompletableFuture<Void> termination = PekkoUtils.terminateActorSystem(customSystem);

299

termination.thenRun(() -> logger.info("Actor system terminated"));

300

```

301

302

### RobustActorSystem

303

304

Enhanced ActorSystem implementation with configurable exception handling for production environments.

305

306

```java { .api }

307

/**

308

* ActorSystem with configurable UncaughtExceptionHandler for robust error handling.

309

*/

310

public abstract class RobustActorSystem extends ActorSystemImpl {

311

312

/**

313

* Constructor for RobustActorSystem.

314

* @param name Name of the actor system

315

* @param applicationConfig Application configuration

316

* @param classLoader ClassLoader for the system

317

* @param defaultExecutionContext Default execution context

318

* @param setup ActorSystemSetup configuration

319

*/

320

public RobustActorSystem(

321

String name,

322

Config applicationConfig,

323

ClassLoader classLoader,

324

Option<ExecutionContext> defaultExecutionContext,

325

ActorSystemSetup setup

326

);

327

328

/**

329

* Factory method to create RobustActorSystem.

330

* @param name Name of the actor system

331

* @param applicationConfig Application configuration

332

* @return RobustActorSystem instance

333

*/

334

public static RobustActorSystem create(String name, Config applicationConfig);

335

}

336

```

337

338

### Support Classes

339

340

Additional classes that provide specialized functionality for actor system management.

341

342

```java { .api }

343

/**

344

* Actor for handling dead letters in the actor system.

345

*/

346

public class DeadLettersActor extends AbstractActor {

347

/**

348

* Gets Props for creating DeadLettersActor instances.

349

* @return Props configuration for the actor

350

*/

351

public static Props getProps();

352

}

353

354

/**

355

* Supervisor actor for managing child actors with escalation strategy.

356

*/

357

public class SupervisorActor extends AbstractActor {

358

// Supervisor implementation for actor lifecycle management

359

}

360

361

/**

362

* Supervisor strategy that escalates all exceptions to parent actors.

363

*/

364

public class EscalatingSupervisorStrategy implements SupervisorStrategyConfigurator {

365

// Strategy implementation for exception handling

366

}

367

368

/**

369

* Custom SSL engine provider for secure Pekko communication.

370

*/

371

public class CustomSSLEngineProvider extends ConfigSSLEngineProvider {

372

// SSL engine configuration for secure RPC communication

373

}

374

375

/**

376

* Thread factory that sets thread priority for actor system threads.

377

*/

378

public class PrioritySettingThreadFactory implements ThreadFactory {

379

// Thread factory for priority-based thread management

380

}

381

382

/**

383

* Dispatcher configurator for priority-based thread scheduling.

384

*/

385

public class PriorityThreadsDispatcher extends DispatcherConfigurator {

386

// Dispatcher configuration for priority threads

387

}

388

389

/**

390

* Pekko extension for remote address handling and resolution.

391

*/

392

public class RemoteAddressExtension extends AbstractExtension {

393

// Extension for remote address management

394

}

395

```

396

397

**Production Configuration Example:**

398

399

```java

400

import org.apache.flink.runtime.rpc.pekko.RobustActorSystem;

401

import org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider;

402

import com.typesafe.config.ConfigFactory;

403

import com.typesafe.config.Config;

404

405

// Create production-ready actor system with SSL and robust error handling

406

Config productionConfig = ConfigFactory.parseString("""

407

pekko {

408

remote.artery {

409

transport = tls-tcp

410

canonical.hostname = "production-node.example.com"

411

canonical.port = 6123

412

}

413

414

actor {

415

provider = remote

416

serialization-bindings {

417

"java.io.Serializable" = java

418

}

419

}

420

421

ssl-config {

422

trustManager = "org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider"

423

}

424

}

425

""");

426

427

RobustActorSystem productionSystem = RobustActorSystem.create(

428

"flink-production",

429

productionConfig

430

);

431

```