0
# Concurrent Utilities
1
2
Utilities for integrating Pekko actor systems with Java's concurrency APIs, converting between Scala and Java futures, and providing scheduled execution capabilities.
3
4
## Capabilities
5
6
### ScalaFutureUtils
7
8
Utilities for converting between Scala Future types (used by Pekko) and Java CompletableFuture types.
9
10
```java { .api }
11
/**
12
* Utilities to convert Scala types into Java types, particularly for Future interoperability.
13
*/
14
public class ScalaFutureUtils {
15
16
/**
17
* Converts a Scala Future to a Java CompletableFuture.
18
* This is essential for integrating Pekko's Scala-based async operations
19
* with Java's CompletableFuture-based async APIs.
20
*
21
* @param scalaFuture The Scala Future to convert
22
* @return CompletableFuture that completes with the same result
23
*/
24
public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);
25
}
26
```
27
28
**Usage Examples:**
29
30
```java
31
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
32
import org.apache.pekko.actor.ActorSelection;
33
import org.apache.pekko.pattern.Patterns;
34
import org.apache.pekko.util.Timeout;
35
import scala.concurrent.Future;
36
import java.util.concurrent.CompletableFuture;
37
import java.time.Duration;
38
39
// Convert Pekko ask pattern result to CompletableFuture
40
ActorSelection actorSelection = actorSystem.actorSelection("/user/someActor");
41
Timeout timeout = Timeout.create(Duration.ofSeconds(10));
42
43
// Pekko ask returns Scala Future
44
Future<Object> scalaFuture = Patterns.ask(actorSelection, "someMessage", timeout);
45
46
// Convert to Java CompletableFuture for easier Java integration
47
CompletableFuture<Object> javaFuture = ScalaFutureUtils.toJava(scalaFuture);
48
49
// Now you can use standard Java CompletableFuture operations
50
javaFuture
51
.thenApply(result -> processResult(result))
52
.thenAccept(processedResult -> logger.info("Received: {}", processedResult))
53
.exceptionally(throwable -> {
54
logger.error("RPC call failed", throwable);
55
return null;
56
});
57
```
58
59
### ActorSystemScheduledExecutorAdapter
60
61
Adapter that allows using a Pekko ActorSystem as a Java ScheduledExecutor, enabling integration with Java's scheduled execution APIs.
62
63
```java { .api }
64
/**
65
* Adapter to use ActorSystem as ScheduledExecutor.
66
* Provides Java ScheduledExecutorService-compatible interface backed by Pekko's scheduler.
67
*/
68
public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
69
70
/**
71
* Constructor for ActorSystemScheduledExecutorAdapter.
72
* @param actorSystem Pekko ActorSystem to use for scheduling
73
* @param flinkClassLoader ClassLoader for task execution context
74
*/
75
public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);
76
77
/**
78
* Schedules a Runnable task for execution after a delay.
79
* @param command Task to execute
80
* @param delay Delay before execution
81
* @param unit Time unit for the delay
82
* @return ScheduledFuture representing the scheduled task
83
*/
84
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
85
86
/**
87
* Schedules a Callable task for execution after a delay.
88
* @param callable Task to execute that returns a value
89
* @param delay Delay before execution
90
* @param unit Time unit for the delay
91
* @return ScheduledFuture representing the scheduled task and its result
92
*/
93
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
94
95
/**
96
* Schedules a task to run repeatedly at fixed rate.
97
* @param command Task to execute repeatedly
98
* @param initialDelay Delay before first execution
99
* @param period Period between successive executions
100
* @param unit Time unit for delays and period
101
* @return ScheduledFuture representing the scheduled repeating task
102
*/
103
public ScheduledFuture<?> scheduleAtFixedRate(
104
Runnable command,
105
long initialDelay,
106
long period,
107
TimeUnit unit
108
);
109
110
/**
111
* Schedules a task to run repeatedly with fixed delay between executions.
112
* @param command Task to execute repeatedly
113
* @param initialDelay Delay before first execution
114
* @param delay Delay between end of one execution and start of next
115
* @param unit Time unit for delays
116
* @return ScheduledFuture representing the scheduled repeating task
117
*/
118
public ScheduledFuture<?> scheduleWithFixedDelay(
119
Runnable command,
120
long initialDelay,
121
long delay,
122
TimeUnit unit
123
);
124
125
/**
126
* Executes a command immediately (implements Executor interface).
127
* @param command Task to execute
128
*/
129
public void execute(Runnable command);
130
}
131
```
132
133
**Usage Examples:**
134
135
```java
136
import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
137
import org.apache.pekko.actor.ActorSystem;
138
import java.util.concurrent.ScheduledFuture;
139
import java.util.concurrent.TimeUnit;
140
import java.util.concurrent.Callable;
141
142
// Create scheduled executor adapter
143
ActorSystem actorSystem = // ... get actor system
144
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
145
ActorSystemScheduledExecutorAdapter scheduler =
146
new ActorSystemScheduledExecutorAdapter(actorSystem, classLoader);
147
148
// One-time delayed execution
149
ScheduledFuture<?> delayedTask = scheduler.schedule(() -> {
150
logger.info("Delayed task executed");
151
performMaintenanceTask();
152
}, 30, TimeUnit.SECONDS);
153
154
// Scheduled task with return value
155
ScheduledFuture<String> valuedTask = scheduler.schedule(() -> {
156
return "Task completed at " + System.currentTimeMillis();
157
}, 10, TimeUnit.SECONDS);
158
159
String result = valuedTask.get(); // Blocks until completion
160
161
// Periodic execution at fixed rate (heartbeat)
162
ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
163
sendHeartbeat();
164
}, 0, 5, TimeUnit.SECONDS); // Start immediately, repeat every 5 seconds
165
166
// Periodic execution with fixed delay (cleanup)
167
ScheduledFuture<?> cleanup = scheduler.scheduleWithFixedDelay(() -> {
168
performCleanup();
169
}, 60, 30, TimeUnit.SECONDS); // Start after 60s, repeat with 30s gap
170
171
// Immediate execution
172
scheduler.execute(() -> {
173
logger.info("Immediate task executed");
174
});
175
176
// Cancel scheduled tasks when done
177
heartbeat.cancel(false);
178
cleanup.cancel(true);
179
```
180
181
**Integration with Flink Components:**
182
183
```java
184
import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
185
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
186
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
187
188
// Use scheduler in Flink components
189
public class JobManagerServices {
190
private final ActorSystemScheduledExecutorAdapter scheduler;
191
private final ScheduledFuture<?> checkpointScheduler;
192
193
public JobManagerServices(ActorSystem actorSystem) {
194
this.scheduler = new ActorSystemScheduledExecutorAdapter(
195
actorSystem,
196
JobManagerServices.class.getClassLoader()
197
);
198
199
// Schedule checkpoint triggering
200
this.checkpointScheduler = scheduler.scheduleAtFixedRate(
201
this::triggerCheckpoint,
202
10, // initial delay
203
30, // checkpoint interval
204
TimeUnit.SECONDS
205
);
206
}
207
208
private void triggerCheckpoint() {
209
// Checkpoint triggering logic
210
logger.debug("Triggering periodic checkpoint");
211
}
212
213
public void shutdown() {
214
checkpointScheduler.cancel(false);
215
}
216
}
217
218
// Resource cleanup with scheduled executor
219
public class ResourceManager {
220
private final ActorSystemScheduledExecutorAdapter scheduler;
221
222
public void scheduleResourceCleanup() {
223
// Clean up unused resources every 5 minutes
224
scheduler.scheduleWithFixedDelay(() -> {
225
cleanupUnusedResources();
226
}, 5, 5, TimeUnit.MINUTES);
227
}
228
229
public void scheduleTaskManagerHeartbeat() {
230
// Check TaskManager heartbeats every 10 seconds
231
scheduler.scheduleAtFixedRate(() -> {
232
checkTaskManagerHeartbeats();
233
}, 0, 10, TimeUnit.SECONDS);
234
}
235
236
private void cleanupUnusedResources() {
237
// Resource cleanup implementation
238
}
239
240
private void checkTaskManagerHeartbeats() {
241
// Heartbeat checking implementation
242
}
243
}
244
```
245
246
**Error Handling and Best Practices:**
247
248
```java
249
import java.util.concurrent.CompletionException;
250
import java.util.concurrent.ExecutionException;
251
252
// Proper error handling with scheduled tasks
253
public class RobustScheduledTasks {
254
private final ActorSystemScheduledExecutorAdapter scheduler;
255
256
public void setupRobustScheduling() {
257
// Task with proper error handling
258
scheduler.scheduleAtFixedRate(() -> {
259
try {
260
performRiskyOperation();
261
} catch (Exception e) {
262
logger.error("Scheduled task failed, but continuing", e);
263
// Don't rethrow - would stop the scheduled execution
264
}
265
}, 0, 30, TimeUnit.SECONDS);
266
267
// Task with future completion handling
268
ScheduledFuture<String> task = scheduler.schedule(() -> {
269
return performLongRunningOperation();
270
}, 10, TimeUnit.SECONDS);
271
272
// Handle completion asynchronously
273
CompletableFuture<String> futureResult = CompletableFuture.supplyAsync(() -> {
274
try {
275
return task.get();
276
} catch (ExecutionException e) {
277
throw new CompletionException(e.getCause());
278
} catch (InterruptedException e) {
279
Thread.currentThread().interrupt();
280
throw new CompletionException(e);
281
}
282
});
283
284
futureResult
285
.thenAccept(result -> logger.info("Operation completed: {}", result))
286
.exceptionally(throwable -> {
287
logger.error("Scheduled operation failed", throwable);
288
return null;
289
});
290
}
291
292
private void performRiskyOperation() throws Exception {
293
// Implementation that might throw exceptions
294
}
295
296
private String performLongRunningOperation() {
297
// Implementation that takes time to complete
298
return "operation result";
299
}
300
}
301
```