0
# Concurrent Programming
1
2
Asynchronous programming utilities with Futures, Promises, and functional composition for building concurrent and reactive applications.
3
4
## Capabilities
5
6
### Future - Asynchronous Computations
7
8
Represents the result of an asynchronous computation that may complete with a value, fail with an exception, or be cancelled.
9
10
```java { .api }
11
/**
12
* Asynchronous computation result that may complete with success or failure
13
*/
14
interface Future<T> extends Value<T> {
15
// Factory methods
16
static <T> Future<T> of(Executor executor, CheckedFunction0<? extends T> computation);
17
static <T> Future<T> of(CheckedFunction0<? extends T> computation); // Uses ForkJoinPool
18
static <T> Future<T> successful(T value); // Immediately completed Future
19
static <T> Future<T> failed(Throwable exception); // Immediately failed Future
20
static <T> Future<T> fromTry(Try<? extends T> result); // Create from Try
21
static <T> Future<T> fromCompletableFuture(CompletableFuture<T> future);
22
23
// Completion state
24
boolean isCompleted(); // true if computation finished (success or failure)
25
boolean isSuccess(); // true if completed successfully
26
boolean isFailure(); // true if completed with exception
27
boolean isCancelled(); // true if computation was cancelled
28
29
// Value access (blocking operations)
30
T get(); // Block until completion, return value or throw
31
T get(long timeout, TimeUnit unit); // Block with timeout
32
Option<Try<T>> getValue(); // Non-blocking: get current value if completed
33
34
// Transformation operations
35
<U> Future<U> map(Function<? super T, ? extends U> mapper);
36
<U> Future<U> mapTry(CheckedFunction1<? super T, ? extends U> mapper);
37
<U> Future<U> flatMap(Function<? super T, ? extends Future<? extends U>> mapper);
38
39
// Error handling
40
Future<T> recover(Function<? super Throwable, ? extends T> recovery);
41
Future<T> recoverWith(Function<? super Throwable, ? extends Future<? extends T>> recovery);
42
43
// Filtering
44
Future<T> filter(Predicate<? super T> predicate);
45
Future<T> filterTry(CheckedPredicate<? super T> predicate);
46
47
// Side effects (non-blocking callbacks)
48
Future<T> onComplete(Consumer<? super Try<T>> action);
49
Future<T> onSuccess(Consumer<? super T> action); // Called if successful
50
Future<T> onFailure(Consumer<? super Throwable> action); // Called if failed
51
52
// Combining Futures
53
<U> Future<Tuple2<T, U>> zip(Future<? extends U> that);
54
<U, R> Future<R> zipWith(Future<? extends U> that, BiFunction<? super T, ? super U, ? extends R> mapper);
55
Future<T> fallbackTo(Future<? extends T> that); // Use that if this fails
56
57
// Conversion operations
58
CompletableFuture<T> toCompletableFuture();
59
Try<T> toTry(); // Blocks until completion
60
61
// Executor information
62
Executor executor(); // Get the executor used for this Future
63
64
// Cancellation
65
void cancel(); // Cancel the computation if not completed
66
void cancel(boolean mayInterruptIfRunning);
67
}
68
```
69
70
**Usage Examples:**
71
72
```java
73
import io.vavr.concurrent.Future;
74
import io.vavr.control.Try;
75
import java.util.concurrent.Executors;
76
77
// Creating Futures
78
Future<String> immediate = Future.successful("Hello");
79
Future<Integer> computation = Future.of(() -> expensiveComputation());
80
Future<String> withExecutor = Future.of(
81
Executors.newCachedThreadPool(),
82
() -> downloadData("http://example.com")
83
);
84
85
// Transforming Futures
86
Future<String> processed = computation
87
.map(i -> "Result: " + i)
88
.recover(ex -> "Error: " + ex.getMessage());
89
90
// Chaining asynchronous operations
91
Future<String> chained = Future.of(() -> getUserId())
92
.flatMap(id -> Future.of(() -> fetchUserName(id)))
93
.flatMap(name -> Future.of(() -> fetchUserEmail(name)));
94
95
// Combining multiple Futures
96
Future<String> name = Future.of(() -> "John");
97
Future<Integer> age = Future.of(() -> 30);
98
Future<String> person = name.zipWith(age, (n, a) -> n + " is " + a + " years old");
99
100
// Error handling
101
Future<String> safe = Future.of(() -> riskyOperation())
102
.recover(throwable -> "Default value")
103
.onFailure(ex -> System.err.println("Operation failed: " + ex));
104
105
// Non-blocking callbacks
106
computation
107
.onSuccess(result -> System.out.println("Got: " + result))
108
.onFailure(error -> System.err.println("Failed: " + error))
109
.onComplete(tryResult -> System.out.println("Completed: " + tryResult));
110
111
// Fallback pattern
112
Future<String> primary = Future.of(() -> primaryService());
113
Future<String> backup = Future.of(() -> backupService());
114
Future<String> resilient = primary.fallbackTo(backup);
115
116
// Converting to Java types
117
CompletableFuture<Integer> javaFuture = computation.toCompletableFuture();
118
119
// Helper methods
120
static int expensiveComputation() throws InterruptedException {
121
Thread.sleep(1000);
122
return 42;
123
}
124
125
static String downloadData(String url) throws IOException {
126
// Simulate download
127
return "Downloaded from " + url;
128
}
129
```
130
131
### Promise - Completable Future
132
133
A Promise is a writable, single-assignment container that completes a Future. Used for bridging callback-based APIs to Future-based APIs.
134
135
```java { .api }
136
/**
137
* Completable container that provides a Future and allows completing it exactly once
138
*/
139
interface Promise<T> {
140
// Factory methods
141
static <T> Promise<T> make(); // Create new Promise
142
static <T> Promise<T> make(Executor executor); // Create with specific executor
143
static <T> Promise<T> successful(T value); // Create already completed Promise
144
static <T> Promise<T> failed(Throwable exception); // Create already failed Promise
145
146
// Completion operations (can only be called once)
147
Promise<T> success(T value); // Complete with success value
148
Promise<T> failure(Throwable exception); // Complete with failure
149
Promise<T> complete(Try<? extends T> result); // Complete with Try result
150
Promise<T> completeWith(Future<? extends T> other); // Complete when other completes
151
152
// Try completion operations (return false if already completed)
153
boolean trySuccess(T value); // Try to complete with success
154
boolean tryFailure(Throwable exception); // Try to complete with failure
155
boolean tryComplete(Try<? extends T> result); // Try to complete with Try
156
boolean tryCompleteWith(Future<? extends T> other); // Try to complete with other Future
157
158
// State checking
159
boolean isCompleted(); // Check if Promise has been completed
160
161
// Future access
162
Future<T> future(); // Get the Future backed by this Promise
163
}
164
```
165
166
**Usage Examples:**
167
168
```java
169
import io.vavr.concurrent.Promise;
170
import io.vavr.concurrent.Future;
171
import io.vavr.control.Try;
172
173
// Basic Promise usage
174
Promise<String> promise = Promise.make();
175
Future<String> future = promise.future();
176
177
// Complete the promise in another thread
178
new Thread(() -> {
179
try {
180
Thread.sleep(1000);
181
promise.success("Hello from another thread!");
182
} catch (InterruptedException e) {
183
promise.failure(e);
184
}
185
}).start();
186
187
// Use the future
188
future.onSuccess(result -> System.out.println("Got: " + result));
189
190
// Bridging callback API to Future API
191
public Future<String> callbackToFuture(CallbackAPI api) {
192
Promise<String> promise = Promise.make();
193
194
api.doSomethingAsync(new Callback<String>() {
195
@Override
196
public void onSuccess(String result) {
197
promise.success(result);
198
}
199
200
@Override
201
public void onFailure(Exception error) {
202
promise.failure(error);
203
}
204
});
205
206
return promise.future();
207
}
208
209
// Try completion (safe for multiple calls)
210
Promise<Integer> safePromise = Promise.make();
211
boolean completed1 = safePromise.trySuccess(42); // true
212
boolean completed2 = safePromise.trySuccess(24); // false - already completed
213
214
// Completing with Try
215
Try<String> result = Try.of(() -> someOperation());
216
Promise<String> promiseFromTry = Promise.make();
217
promiseFromTry.complete(result);
218
219
// Promise chaining
220
Promise<String> step1 = Promise.make();
221
Future<Integer> pipeline = step1.future()
222
.map(String::length)
223
.filter(len -> len > 5);
224
225
step1.success("Hello World");
226
```
227
228
### Task - Lazy Asynchronous Computation
229
230
Represents a lazy asynchronous computation that is not started until explicitly run, allowing for composable and reusable async operations.
231
232
```java { .api }
233
/**
234
* Lazy asynchronous computation that starts only when run
235
*/
236
class Task<T> implements Value<T> {
237
// Factory methods
238
static <T> Task<T> of(CheckedFunction0<? extends T> computation);
239
static <T> Task<T> of(Executor executor, CheckedFunction0<? extends T> computation);
240
static <T> Task<T> successful(T value); // Task that immediately succeeds
241
static <T> Task<T> failed(Throwable exception); // Task that immediately fails
242
static <T> Task<T> fromTry(Try<? extends T> result);
243
static <T> Task<T> async(CheckedFunction0<? extends T> computation); // Always async
244
245
// Execution
246
Future<T> run(); // Execute the task and return Future
247
Future<T> run(Executor executor); // Execute with specific executor
248
249
// Transformation operations (lazy)
250
<U> Task<U> map(Function<? super T, ? extends U> mapper);
251
<U> Task<U> mapTry(CheckedFunction1<? super T, ? extends U> mapper);
252
<U> Task<U> flatMap(Function<? super T, ? extends Task<? extends U>> mapper);
253
254
// Error handling (lazy)
255
Task<T> recover(Function<? super Throwable, ? extends T> recovery);
256
Task<T> recoverWith(Function<? super Throwable, ? extends Task<? extends T>> recovery);
257
258
// Filtering (lazy)
259
Task<T> filter(Predicate<? super T> predicate);
260
Task<T> filterTry(CheckedPredicate<? super T> predicate);
261
262
// Combining tasks (lazy)
263
<U> Task<Tuple2<T, U>> zip(Task<? extends U> that);
264
<U, R> Task<R> zipWith(Task<? extends U> that, BiFunction<? super T, ? super U, ? extends R> mapper);
265
Task<T> fallbackTo(Task<? extends T> that);
266
267
// Side effects (applied when task runs)
268
Task<T> peek(Consumer<? super T> action);
269
Task<T> onFailure(Consumer<? super Throwable> action);
270
271
// Conversion operations
272
Future<T> toFuture(); // Same as run()
273
Try<T> toTry(); // Run synchronously
274
275
// Utility operations
276
Task<T> timeout(long timeout, TimeUnit unit);
277
Task<T> timeout(long timeout, TimeUnit unit, Supplier<? extends T> fallback);
278
Task<T> delay(long delay, TimeUnit unit); // Delay execution
279
}
280
```
281
282
**Usage Examples:**
283
284
```java
285
import io.vavr.concurrent.Task;
286
import io.vavr.concurrent.Future;
287
import java.util.concurrent.TimeUnit;
288
289
// Creating Tasks (lazy - not executed yet)
290
Task<String> fetchData = Task.of(() -> downloadData("http://api.example.com"));
291
Task<Integer> compute = Task.of(() -> heavyComputation());
292
Task<String> immediate = Task.successful("Already computed");
293
294
// Transforming Tasks (still lazy)
295
Task<String> processed = compute
296
.map(result -> "Result: " + result)
297
.recover(ex -> "Error: " + ex.getMessage());
298
299
// Chaining Tasks (lazy composition)
300
Task<String> pipeline = Task.of(() -> getUserId())
301
.flatMap(id -> Task.of(() -> fetchUserData(id)))
302
.flatMap(data -> Task.of(() -> processUserData(data)));
303
304
// Combining multiple Tasks
305
Task<String> name = Task.of(() -> fetchUserName());
306
Task<Integer> age = Task.of(() -> fetchUserAge());
307
Task<String> profile = name.zipWith(age, (n, a) -> n + " (" + a + " years old)");
308
309
// Adding delays and timeouts
310
Task<String> delayed = fetchData.delay(5, TimeUnit.SECONDS);
311
Task<String> withTimeout = fetchData.timeout(10, TimeUnit.SECONDS, () -> "Timeout fallback");
312
313
// Fallback Tasks
314
Task<String> primary = Task.of(() -> primaryService());
315
Task<String> backup = Task.of(() -> backupService());
316
Task<String> resilient = primary.fallbackTo(backup);
317
318
// Execution (this is when computation actually starts)
319
Future<String> future1 = processed.run(); // Run with default executor
320
Future<String> future2 = processed.run(customExecutor); // Run with custom executor
321
Try<String> syncResult = processed.toTry(); // Run synchronously
322
323
// Reusable Tasks
324
Task<List<String>> reusableTask = Task.of(() -> fetchDataList());
325
Future<List<String>> execution1 = reusableTask.run(); // First execution
326
Future<List<String>> execution2 = reusableTask.run(); // Second execution (independent)
327
328
// Error handling in lazy context
329
Task<String> safeTask = Task.of(() -> riskyOperation())
330
.recover(throwable -> "Fallback value")
331
.onFailure(ex -> logger.error("Task failed", ex));
332
333
// Helper methods
334
static String downloadData(String url) throws IOException {
335
// Simulate download
336
Thread.sleep(2000);
337
return "Data from " + url;
338
}
339
340
static Integer heavyComputation() throws InterruptedException {
341
Thread.sleep(5000);
342
return 42;
343
}
344
```
345
346
### Utility Classes and Static Methods
347
348
Helper methods for working with concurrent operations and multiple Futures.
349
350
```java { .api }
351
/**
352
* Utility methods for Future operations
353
*/
354
class Future {
355
// Sequence operations - convert List<Future<T>> to Future<List<T>>
356
static <T> Future<Seq<T>> sequence(Iterable<? extends Future<? extends T>> futures);
357
static <T> Future<List<T>> sequence(List<? extends Future<? extends T>> futures);
358
359
// Traverse operations - map and sequence combined
360
static <T, U> Future<Seq<U>> traverse(Iterable<? extends T> values,
361
Function<? super T, ? extends Future<? extends U>> mapper);
362
363
// Reduce operations
364
static <T> Future<T> reduce(Iterable<? extends Future<? extends T>> futures,
365
BinaryOperator<T> op);
366
static <T> Future<Option<T>> reduceOption(Iterable<? extends Future<? extends T>> futures,
367
BinaryOperator<T> op);
368
369
// Find operations
370
static <T> Future<Option<T>> find(Iterable<? extends Future<? extends T>> futures,
371
Predicate<? super T> predicate);
372
373
// Race operations - return first completed Future
374
static <T> Future<T> firstCompletedOf(Iterable<? extends Future<? extends T>> futures);
375
376
// Timing operations
377
static Future<Void> delay(long delay, TimeUnit unit);
378
static <T> Future<T> timeout(Future<T> future, long timeout, TimeUnit unit);
379
static <T> Future<T> timeout(Future<T> future, long timeout, TimeUnit unit,
380
Supplier<? extends T> fallback);
381
}
382
383
/**
384
* Utility methods for Promise operations
385
*/
386
class Promise {
387
// Create Promise from callback-style API
388
static <T> Promise<T> fromCallback(Consumer<Consumer<T>> callbackConsumer);
389
static <T> Promise<T> fromCallback(Consumer<BiConsumer<T, Throwable>> callbackConsumer);
390
}
391
```
392
393
**Usage Examples:**
394
395
```java
396
import io.vavr.concurrent.Future;
397
import io.vavr.collection.List;
398
import java.util.concurrent.TimeUnit;
399
400
// Sequence operations - wait for all Futures to complete
401
List<Future<String>> futures = List.of(
402
Future.of(() -> fetchData("url1")),
403
Future.of(() -> fetchData("url2")),
404
Future.of(() -> fetchData("url3"))
405
);
406
407
Future<List<String>> allResults = Future.sequence(futures);
408
allResults.onSuccess(results ->
409
System.out.println("All completed: " + results));
410
411
// Traverse - transform and collect
412
List<String> urls = List.of("url1", "url2", "url3");
413
Future<List<String>> traverseResult = Future.traverse(urls, url ->
414
Future.of(() -> fetchData(url)));
415
416
// Race - get first completed
417
Future<String> fastest = Future.firstCompletedOf(futures);
418
fastest.onSuccess(result ->
419
System.out.println("First completed: " + result));
420
421
// Reduce - combine all results
422
Future<String> combined = Future.reduce(futures, (a, b) -> a + ", " + b);
423
424
// Find - get first result matching predicate
425
Future<Option<String>> found = Future.find(futures,
426
result -> result.contains("important"));
427
428
// Timing operations
429
Future<Void> delayed = Future.delay(5, TimeUnit.SECONDS);
430
Future<String> withDeadline = Future.timeout(
431
Future.of(() -> slowOperation()),
432
10, TimeUnit.SECONDS,
433
() -> "Timeout occurred"
434
);
435
436
// Promise from callback API
437
Promise<String> callbackPromise = Promise.fromCallback(callback -> {
438
legacyAsyncAPI("param", new LegacyCallback() {
439
@Override
440
public void onResult(String result) {
441
callback.accept(result);
442
}
443
444
@Override
445
public void onError(Exception error) {
446
// Handle error case
447
}
448
});
449
});
450
```