0
# Actor System Management
1
2
Tools and utilities for starting, configuring, and managing Pekko actor systems with proper thread pool configuration, SSL support, and Flink-specific optimizations.
3
4
## Capabilities
5
6
### ActorSystemBootstrapTools
7
8
Provides factory methods for creating and configuring Pekko actor systems in various deployment scenarios.
9
10
```java { .api }
11
/**
12
* Tools for starting and configuring Pekko actor systems with Flink-specific settings.
13
*/
14
public class ActorSystemBootstrapTools {
15
16
/**
17
* Starts remote actor system with external address binding.
18
* @param configuration Flink configuration object
19
* @param externalAddress External address to bind to
20
* @param externalPortRange Port range specification (e.g., "6123-6130")
21
* @param logger Logger instance for bootstrap messages
22
* @return Configured ActorSystem instance
23
* @throws Exception if actor system creation fails
24
*/
25
public static ActorSystem startRemoteActorSystem(
26
Configuration configuration,
27
String externalAddress,
28
String externalPortRange,
29
Logger logger
30
) throws Exception;
31
32
/**
33
* Starts remote actor system with full configuration options.
34
* @param configuration Flink configuration object
35
* @param actorSystemName Name for the actor system
36
* @param externalAddress External address to bind to
37
* @param externalPortRange Port range specification
38
* @param bindAddress Internal bind address (can be different from external)
39
* @param bindPort Optional specific bind port
40
* @param logger Logger instance
41
* @param actorSystemExecutorConfiguration Executor configuration
42
* @param customConfig Additional Pekko configuration
43
* @return Configured ActorSystem instance
44
* @throws Exception if actor system creation fails
45
*/
46
public static ActorSystem startRemoteActorSystem(
47
Configuration configuration,
48
String actorSystemName,
49
String externalAddress,
50
String externalPortRange,
51
String bindAddress,
52
Optional<Integer> bindPort,
53
Logger logger,
54
Config actorSystemExecutorConfiguration,
55
Config customConfig
56
) throws Exception;
57
58
/**
59
* Starts local actor system for single-node scenarios.
60
* @param configuration Flink configuration object
61
* @param actorSystemName Name for the actor system
62
* @param logger Logger instance
63
* @param actorSystemExecutorConfiguration Executor configuration
64
* @param customConfig Additional Pekko configuration
65
* @return Configured ActorSystem instance
66
* @throws Exception if actor system creation fails
67
*/
68
public static ActorSystem startLocalActorSystem(
69
Configuration configuration,
70
String actorSystemName,
71
Logger logger,
72
Config actorSystemExecutorConfiguration,
73
Config customConfig
74
) throws Exception;
75
76
/**
77
* Gets fork-join executor configuration from Flink configuration.
78
* @param configuration Flink configuration object
79
* @return ForkJoinExecutorConfiguration for actor system
80
*/
81
public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(
82
Configuration configuration
83
);
84
85
/**
86
* Gets fork-join executor configuration optimized for remote communication.
87
* @param configuration Flink configuration object
88
* @return ForkJoinExecutorConfiguration for remote actor system
89
*/
90
public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
91
Configuration configuration
92
);
93
}
94
```
95
96
**Usage Examples:**
97
98
```java
99
import org.apache.flink.configuration.Configuration;
100
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
101
import org.apache.pekko.actor.ActorSystem;
102
import org.slf4j.Logger;
103
import org.slf4j.LoggerFactory;
104
105
Logger logger = LoggerFactory.getLogger(MyClass.class);
106
Configuration config = new Configuration();
107
108
// Start local actor system for development
109
ActorSystem localSystem = ActorSystemBootstrapTools.startLocalActorSystem(
110
config,
111
"flink-local",
112
logger,
113
null, // default executor config
114
null // default Pekko config
115
);
116
117
// Start remote actor system for cluster deployment
118
ActorSystem remoteSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
119
config,
120
"flink-cluster",
121
logger
122
);
123
124
// Start remote system with specific configuration
125
RpcSystem.ForkJoinExecutorConfiguration executorConfig =
126
ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(config);
127
128
ActorSystem configuredSystem = ActorSystemBootstrapTools.startRemoteActorSystem(
129
config,
130
"flink-production",
131
"192.168.1.100",
132
"6123-6130",
133
"0.0.0.0",
134
Optional.empty(),
135
logger,
136
executorConfig.toConfig(),
137
null
138
);
139
```
140
141
### PekkoUtils
142
143
Comprehensive utility class for Pekko configuration, actor system management, and URL handling.
144
145
```java { .api }
146
/**
147
* Utility methods for Pekko actor system configuration and management.
148
*/
149
public class PekkoUtils {
150
151
/**
152
* Gets the standard Flink actor system name.
153
* @return Standard actor system name used by Flink
154
*/
155
public static String getFlinkActorSystemName();
156
157
/**
158
* Gets thread pool executor configuration from settings.
159
* @param configuration Thread pool executor configuration
160
* @return Pekko Config object for thread pool executor
161
*/
162
public static Config getThreadPoolExecutorConfig(
163
RpcSystem.FixedThreadPoolExecutorConfiguration configuration
164
);
165
166
/**
167
* Gets fork-join executor configuration from settings.
168
* @param configuration Fork-join executor configuration
169
* @return Pekko Config object for fork-join executor
170
*/
171
public static Config getForkJoinExecutorConfig(
172
RpcSystem.ForkJoinExecutorConfiguration configuration
173
);
174
175
/**
176
* Creates local actor system with Flink configuration.
177
* @param configuration Flink configuration object
178
* @return Local ActorSystem instance
179
*/
180
public static ActorSystem createLocalActorSystem(Configuration configuration);
181
182
/**
183
* Creates actor system with custom name and configuration.
184
* @param actorSystemName Name for the actor system
185
* @param config Pekko configuration object
186
* @return ActorSystem instance
187
*/
188
public static ActorSystem createActorSystem(String actorSystemName, Config config);
189
190
/**
191
* Creates default actor system with standard settings.
192
* @return Default ActorSystem instance
193
*/
194
public static ActorSystem createDefaultActorSystem();
195
196
/**
197
* Gets Pekko configuration for external address binding.
198
* @param configuration Flink configuration object
199
* @param externalAddress External address to bind to
200
* @return Pekko Config object
201
*/
202
public static Config getConfig(Configuration configuration, HostAndPort externalAddress);
203
204
/**
205
* Gets Pekko configuration with separate bind address.
206
* @param configuration Flink configuration object
207
* @param externalAddress External address for external communication
208
* @param bindAddress Internal bind address
209
* @param executorConfig Executor configuration
210
* @return Pekko Config object
211
*/
212
public static Config getConfig(
213
Configuration configuration,
214
HostAndPort externalAddress,
215
HostAndPort bindAddress,
216
Config executorConfig
217
);
218
219
/**
220
* Gets address from running actor system.
221
* @param system ActorSystem instance
222
* @return Address object representing the system's address
223
*/
224
public static Address getAddress(ActorSystem system);
225
226
/**
227
* Gets RPC URL for a specific actor.
228
* @param system ActorSystem containing the actor
229
* @param actor ActorRef to generate URL for
230
* @return RPC URL string for the actor
231
*/
232
public static String getRpcURL(ActorSystem system, ActorRef actor);
233
234
/**
235
* Extracts address from RPC URL string.
236
* @param rpcURL RPC URL to parse
237
* @return Address object extracted from URL
238
* @throws MalformedURLException if URL is malformed
239
*/
240
public static Address getAddressFromRpcURL(String rpcURL) throws MalformedURLException;
241
242
/**
243
* Extracts InetSocketAddress from RPC URL string.
244
* @param rpcURL RPC URL to parse
245
* @return InetSocketAddress extracted from URL
246
* @throws Exception if URL cannot be parsed
247
*/
248
public static InetSocketAddress getInetSocketAddressFromRpcURL(String rpcURL) throws Exception;
249
250
/**
251
* Terminates actor system gracefully.
252
* @param actorSystem ActorSystem to terminate
253
* @return CompletableFuture indicating termination completion
254
*/
255
public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem);
256
}
257
```
258
259
**Advanced Configuration Examples:**
260
261
```java
262
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
263
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
264
import org.apache.pekko.actor.Address;
265
import com.typesafe.config.Config;
266
267
// Create actor system with custom executor configuration
268
RpcSystem.ForkJoinExecutorConfiguration executorConfig =
269
new RpcSystem.ForkJoinExecutorConfiguration(8, 64, 2.0);
270
Config pekkoExecutorConfig = PekkoUtils.getForkJoinExecutorConfig(executorConfig);
271
272
ActorSystem customSystem = PekkoUtils.createActorSystem(
273
"flink-custom",
274
pekkoExecutorConfig
275
);
276
277
// Configure for external access with separate bind address
278
HostAndPort externalAddress = new HostAndPort("public.example.com", 6123);
279
HostAndPort bindAddress = new HostAndPort("0.0.0.0", 6123);
280
281
Config clusterConfig = PekkoUtils.getConfig(
282
flinkConfig,
283
externalAddress,
284
bindAddress,
285
pekkoExecutorConfig
286
);
287
288
// Extract network information from running systems
289
Address systemAddress = PekkoUtils.getAddress(customSystem);
290
String actorUrl = PekkoUtils.getRpcURL(customSystem, someActor);
291
292
// Parse URLs for connection information
293
InetSocketAddress socketAddress = PekkoUtils.getInetSocketAddressFromRpcURL(
294
"pekko://flink@cluster-node:6123/user/jobmanager"
295
);
296
297
// Graceful shutdown
298
CompletableFuture<Void> termination = PekkoUtils.terminateActorSystem(customSystem);
299
termination.thenRun(() -> logger.info("Actor system terminated"));
300
```
301
302
### RobustActorSystem
303
304
Enhanced ActorSystem implementation with configurable exception handling for production environments.
305
306
```java { .api }
307
/**
308
* ActorSystem with configurable UncaughtExceptionHandler for robust error handling.
309
*/
310
public abstract class RobustActorSystem extends ActorSystemImpl {
311
312
/**
313
* Constructor for RobustActorSystem.
314
* @param name Name of the actor system
315
* @param applicationConfig Application configuration
316
* @param classLoader ClassLoader for the system
317
* @param defaultExecutionContext Default execution context
318
* @param setup ActorSystemSetup configuration
319
*/
320
public RobustActorSystem(
321
String name,
322
Config applicationConfig,
323
ClassLoader classLoader,
324
Option<ExecutionContext> defaultExecutionContext,
325
ActorSystemSetup setup
326
);
327
328
/**
329
* Factory method to create RobustActorSystem.
330
* @param name Name of the actor system
331
* @param applicationConfig Application configuration
332
* @return RobustActorSystem instance
333
*/
334
public static RobustActorSystem create(String name, Config applicationConfig);
335
}
336
```
337
338
### Support Classes
339
340
Additional classes that provide specialized functionality for actor system management.
341
342
```java { .api }
343
/**
344
* Actor for handling dead letters in the actor system.
345
*/
346
public class DeadLettersActor extends AbstractActor {
347
/**
348
* Gets Props for creating DeadLettersActor instances.
349
* @return Props configuration for the actor
350
*/
351
public static Props getProps();
352
}
353
354
/**
355
* Supervisor actor for managing child actors with escalation strategy.
356
*/
357
public class SupervisorActor extends AbstractActor {
358
// Supervisor implementation for actor lifecycle management
359
}
360
361
/**
362
* Supervisor strategy that escalates all exceptions to parent actors.
363
*/
364
public class EscalatingSupervisorStrategy implements SupervisorStrategyConfigurator {
365
// Strategy implementation for exception handling
366
}
367
368
/**
369
* Custom SSL engine provider for secure Pekko communication.
370
*/
371
public class CustomSSLEngineProvider extends ConfigSSLEngineProvider {
372
// SSL engine configuration for secure RPC communication
373
}
374
375
/**
376
* Thread factory that sets thread priority for actor system threads.
377
*/
378
public class PrioritySettingThreadFactory implements ThreadFactory {
379
// Thread factory for priority-based thread management
380
}
381
382
/**
383
* Dispatcher configurator for priority-based thread scheduling.
384
*/
385
public class PriorityThreadsDispatcher extends DispatcherConfigurator {
386
// Dispatcher configuration for priority threads
387
}
388
389
/**
390
* Pekko extension for remote address handling and resolution.
391
*/
392
public class RemoteAddressExtension extends AbstractExtension {
393
// Extension for remote address management
394
}
395
```
396
397
**Production Configuration Example:**
398
399
```java
400
import org.apache.flink.runtime.rpc.pekko.RobustActorSystem;
401
import org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider;
402
import com.typesafe.config.ConfigFactory;
403
import com.typesafe.config.Config;
404
405
// Create production-ready actor system with SSL and robust error handling
406
Config productionConfig = ConfigFactory.parseString("""
407
pekko {
408
remote.artery {
409
transport = tls-tcp
410
canonical.hostname = "production-node.example.com"
411
canonical.port = 6123
412
}
413
414
actor {
415
provider = remote
416
serialization-bindings {
417
"java.io.Serializable" = java
418
}
419
}
420
421
ssl-config {
422
trustManager = "org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider"
423
}
424
}
425
""");
426
427
RobustActorSystem productionSystem = RobustActorSystem.create(
428
"flink-production",
429
productionConfig
430
);
431
```