or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-system.mdconcurrent-utilities.mdexceptions.mdindex.mdrpc-configuration.mdrpc-system.md

concurrent-utilities.mddocs/

0

# Concurrent Utilities

1

2

Utilities for integrating Pekko actor systems with Java's concurrency APIs, converting between Scala and Java futures, and providing scheduled execution capabilities.

3

4

## Capabilities

5

6

### ScalaFutureUtils

7

8

Utilities for converting between Scala Future types (used by Pekko) and Java CompletableFuture types.

9

10

```java { .api }

11

/**

12

* Utilities to convert Scala types into Java types, particularly for Future interoperability.

13

*/

14

public class ScalaFutureUtils {

15

16

/**

17

* Converts a Scala Future to a Java CompletableFuture.

18

* This is essential for integrating Pekko's Scala-based async operations

19

* with Java's CompletableFuture-based async APIs.

20

*

21

* @param scalaFuture The Scala Future to convert

22

* @return CompletableFuture that completes with the same result

23

*/

24

public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);

25

}

26

```

27

28

**Usage Examples:**

29

30

```java

31

import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;

32

import org.apache.pekko.actor.ActorSelection;

33

import org.apache.pekko.pattern.Patterns;

34

import org.apache.pekko.util.Timeout;

35

import scala.concurrent.Future;

36

import java.util.concurrent.CompletableFuture;

37

import java.time.Duration;

38

39

// Convert Pekko ask pattern result to CompletableFuture

40

ActorSelection actorSelection = actorSystem.actorSelection("/user/someActor");

41

Timeout timeout = Timeout.create(Duration.ofSeconds(10));

42

43

// Pekko ask returns Scala Future

44

Future<Object> scalaFuture = Patterns.ask(actorSelection, "someMessage", timeout);

45

46

// Convert to Java CompletableFuture for easier Java integration

47

CompletableFuture<Object> javaFuture = ScalaFutureUtils.toJava(scalaFuture);

48

49

// Now you can use standard Java CompletableFuture operations

50

javaFuture

51

.thenApply(result -> processResult(result))

52

.thenAccept(processedResult -> logger.info("Received: {}", processedResult))

53

.exceptionally(throwable -> {

54

logger.error("RPC call failed", throwable);

55

return null;

56

});

57

```

58

59

### ActorSystemScheduledExecutorAdapter

60

61

Adapter that allows using a Pekko ActorSystem as a Java ScheduledExecutor, enabling integration with Java's scheduled execution APIs.

62

63

```java { .api }

64

/**

65

* Adapter to use ActorSystem as ScheduledExecutor.

66

* Provides Java ScheduledExecutorService-compatible interface backed by Pekko's scheduler.

67

*/

68

public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

69

70

/**

71

* Constructor for ActorSystemScheduledExecutorAdapter.

72

* @param actorSystem Pekko ActorSystem to use for scheduling

73

* @param flinkClassLoader ClassLoader for task execution context

74

*/

75

public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);

76

77

/**

78

* Schedules a Runnable task for execution after a delay.

79

* @param command Task to execute

80

* @param delay Delay before execution

81

* @param unit Time unit for the delay

82

* @return ScheduledFuture representing the scheduled task

83

*/

84

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

85

86

/**

87

* Schedules a Callable task for execution after a delay.

88

* @param callable Task to execute that returns a value

89

* @param delay Delay before execution

90

* @param unit Time unit for the delay

91

* @return ScheduledFuture representing the scheduled task and its result

92

*/

93

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

94

95

/**

96

* Schedules a task to run repeatedly at fixed rate.

97

* @param command Task to execute repeatedly

98

* @param initialDelay Delay before first execution

99

* @param period Period between successive executions

100

* @param unit Time unit for delays and period

101

* @return ScheduledFuture representing the scheduled repeating task

102

*/

103

public ScheduledFuture<?> scheduleAtFixedRate(

104

Runnable command,

105

long initialDelay,

106

long period,

107

TimeUnit unit

108

);

109

110

/**

111

* Schedules a task to run repeatedly with fixed delay between executions.

112

* @param command Task to execute repeatedly

113

* @param initialDelay Delay before first execution

114

* @param delay Delay between end of one execution and start of next

115

* @param unit Time unit for delays

116

* @return ScheduledFuture representing the scheduled repeating task

117

*/

118

public ScheduledFuture<?> scheduleWithFixedDelay(

119

Runnable command,

120

long initialDelay,

121

long delay,

122

TimeUnit unit

123

);

124

125

/**

126

* Executes a command immediately (implements Executor interface).

127

* @param command Task to execute

128

*/

129

public void execute(Runnable command);

130

}

131

```

132

133

**Usage Examples:**

134

135

```java

136

import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;

137

import org.apache.pekko.actor.ActorSystem;

138

import java.util.concurrent.ScheduledFuture;

139

import java.util.concurrent.TimeUnit;

140

import java.util.concurrent.Callable;

141

142

// Create scheduled executor adapter

143

ActorSystem actorSystem = // ... get actor system

144

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

145

ActorSystemScheduledExecutorAdapter scheduler =

146

new ActorSystemScheduledExecutorAdapter(actorSystem, classLoader);

147

148

// One-time delayed execution

149

ScheduledFuture<?> delayedTask = scheduler.schedule(() -> {

150

logger.info("Delayed task executed");

151

performMaintenanceTask();

152

}, 30, TimeUnit.SECONDS);

153

154

// Scheduled task with return value

155

ScheduledFuture<String> valuedTask = scheduler.schedule(() -> {

156

return "Task completed at " + System.currentTimeMillis();

157

}, 10, TimeUnit.SECONDS);

158

159

String result = valuedTask.get(); // Blocks until completion

160

161

// Periodic execution at fixed rate (heartbeat)

162

ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {

163

sendHeartbeat();

164

}, 0, 5, TimeUnit.SECONDS); // Start immediately, repeat every 5 seconds

165

166

// Periodic execution with fixed delay (cleanup)

167

ScheduledFuture<?> cleanup = scheduler.scheduleWithFixedDelay(() -> {

168

performCleanup();

169

}, 60, 30, TimeUnit.SECONDS); // Start after 60s, repeat with 30s gap

170

171

// Immediate execution

172

scheduler.execute(() -> {

173

logger.info("Immediate task executed");

174

});

175

176

// Cancel scheduled tasks when done

177

heartbeat.cancel(false);

178

cleanup.cancel(true);

179

```

180

181

**Integration with Flink Components:**

182

183

```java

184

import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;

185

import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;

186

import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;

187

188

// Use scheduler in Flink components

189

public class JobManagerServices {

190

private final ActorSystemScheduledExecutorAdapter scheduler;

191

private final ScheduledFuture<?> checkpointScheduler;

192

193

public JobManagerServices(ActorSystem actorSystem) {

194

this.scheduler = new ActorSystemScheduledExecutorAdapter(

195

actorSystem,

196

JobManagerServices.class.getClassLoader()

197

);

198

199

// Schedule checkpoint triggering

200

this.checkpointScheduler = scheduler.scheduleAtFixedRate(

201

this::triggerCheckpoint,

202

10, // initial delay

203

30, // checkpoint interval

204

TimeUnit.SECONDS

205

);

206

}

207

208

private void triggerCheckpoint() {

209

// Checkpoint triggering logic

210

logger.debug("Triggering periodic checkpoint");

211

}

212

213

public void shutdown() {

214

checkpointScheduler.cancel(false);

215

}

216

}

217

218

// Resource cleanup with scheduled executor

219

public class ResourceManager {

220

private final ActorSystemScheduledExecutorAdapter scheduler;

221

222

public void scheduleResourceCleanup() {

223

// Clean up unused resources every 5 minutes

224

scheduler.scheduleWithFixedDelay(() -> {

225

cleanupUnusedResources();

226

}, 5, 5, TimeUnit.MINUTES);

227

}

228

229

public void scheduleTaskManagerHeartbeat() {

230

// Check TaskManager heartbeats every 10 seconds

231

scheduler.scheduleAtFixedRate(() -> {

232

checkTaskManagerHeartbeats();

233

}, 0, 10, TimeUnit.SECONDS);

234

}

235

236

private void cleanupUnusedResources() {

237

// Resource cleanup implementation

238

}

239

240

private void checkTaskManagerHeartbeats() {

241

// Heartbeat checking implementation

242

}

243

}

244

```

245

246

**Error Handling and Best Practices:**

247

248

```java

249

import java.util.concurrent.CompletionException;

250

import java.util.concurrent.ExecutionException;

251

252

// Proper error handling with scheduled tasks

253

public class RobustScheduledTasks {

254

private final ActorSystemScheduledExecutorAdapter scheduler;

255

256

public void setupRobustScheduling() {

257

// Task with proper error handling

258

scheduler.scheduleAtFixedRate(() -> {

259

try {

260

performRiskyOperation();

261

} catch (Exception e) {

262

logger.error("Scheduled task failed, but continuing", e);

263

// Don't rethrow - would stop the scheduled execution

264

}

265

}, 0, 30, TimeUnit.SECONDS);

266

267

// Task with future completion handling

268

ScheduledFuture<String> task = scheduler.schedule(() -> {

269

return performLongRunningOperation();

270

}, 10, TimeUnit.SECONDS);

271

272

// Handle completion asynchronously

273

CompletableFuture<String> futureResult = CompletableFuture.supplyAsync(() -> {

274

try {

275

return task.get();

276

} catch (ExecutionException e) {

277

throw new CompletionException(e.getCause());

278

} catch (InterruptedException e) {

279

Thread.currentThread().interrupt();

280

throw new CompletionException(e);

281

}

282

});

283

284

futureResult

285

.thenAccept(result -> logger.info("Operation completed: {}", result))

286

.exceptionally(throwable -> {

287

logger.error("Scheduled operation failed", throwable);

288

return null;

289

});

290

}

291

292

private void performRiskyOperation() throws Exception {

293

// Implementation that might throw exceptions

294

}

295

296

private String performLongRunningOperation() {

297

// Implementation that takes time to complete

298

return "operation result";

299

}

300

}

301

```