0
# Concurrency
1
2
Enhanced concurrency utilities including listenable futures, rate limiting, improved executors, and synchronization primitives that extend Java's concurrent programming capabilities.
3
4
## Package: com.google.common.util.concurrent
5
6
### ListenableFuture
7
8
Enhanced Future that allows listeners to be attached for completion notification, enabling reactive programming patterns.
9
10
```java { .api }
11
import com.google.common.util.concurrent.ListenableFuture;
12
import com.google.common.util.concurrent.Futures;
13
import com.google.common.util.concurrent.FutureCallback;
14
import com.google.common.util.concurrent.MoreExecutors;
15
16
// Creating immediate futures
17
ListenableFuture<String> successful = Futures.immediateFuture("result");
18
ListenableFuture<String> failed = Futures.immediateFailedFuture(new RuntimeException("error"));
19
20
// Adding callbacks for completion notification
21
ListenableFuture<String> future = asyncOperation();
22
Futures.addCallback(future, new FutureCallback<String>() {
23
@Override
24
public void onSuccess(String result) {
25
System.out.println("Operation succeeded: " + result);
26
}
27
28
@Override
29
public void onFailure(Throwable t) {
30
System.err.println("Operation failed: " + t.getMessage());
31
}
32
}, MoreExecutors.directExecutor());
33
34
// Chaining operations with transformation
35
ListenableFuture<String> input = getStringAsync();
36
ListenableFuture<Integer> length = Futures.transform(input,
37
new Function<String, Integer>() {
38
@Override
39
public Integer apply(String s) {
40
return s.length();
41
}
42
}, executor);
43
44
// Asynchronous transformation (returns another ListenableFuture)
45
ListenableFuture<String> input2 = getKeyAsync();
46
ListenableFuture<String> result = Futures.transformAsync(input2,
47
new AsyncFunction<String, String>() {
48
@Override
49
public ListenableFuture<String> apply(String key) throws Exception {
50
return lookupValueAsync(key); // Returns ListenableFuture<String>
51
}
52
}, executor);
53
```
54
55
### Combining Futures
56
57
Utilities for combining multiple futures into aggregate operations.
58
59
```java { .api }
60
import com.google.common.collect.ImmutableList;
61
62
// Combine multiple futures into a list
63
ListenableFuture<String> future1 = getAsync("key1");
64
ListenableFuture<String> future2 = getAsync("key2");
65
ListenableFuture<String> future3 = getAsync("key3");
66
67
ListenableFuture<List<String>> combined = Futures.allAsList(future1, future2, future3);
68
// Result list contains values in same order as input futures
69
70
// Successful futures only (failures ignored)
71
ListenableFuture<List<String>> successful = Futures.successfulAsList(future1, future2, future3);
72
// Failed futures contribute null to result list
73
74
// Immediate values in list
75
List<ListenableFuture<String>> futures = ImmutableList.of(future1, future2, future3);
76
ListenableFuture<List<String>> fromList = Futures.allAsList(futures);
77
78
// Combining with custom logic
79
ListenableFuture<String> combined2 = Futures.whenAllSucceed(future1, future2, future3)
80
.call(new Callable<String>() {
81
@Override
82
public String call() throws Exception {
83
// All futures completed successfully
84
String result1 = Futures.getDone(future1);
85
String result2 = Futures.getDone(future2);
86
String result3 = Futures.getDone(future3);
87
return result1 + result2 + result3;
88
}
89
}, executor);
90
91
// First successful result
92
ListenableFuture<String> firstSuccessful = Futures.whenAllComplete(future1, future2, future3)
93
.call(new Callable<String>() {
94
@Override
95
public String call() throws Exception {
96
// Return first non-null result
97
for (ListenableFuture<String> future : Arrays.asList(future1, future2, future3)) {
98
try {
99
String result = Futures.getDone(future);
100
if (result != null) {
101
return result;
102
}
103
} catch (Exception ignored) {
104
// Continue to next future
105
}
106
}
107
throw new RuntimeException("All futures failed or returned null");
108
}
109
}, executor);
110
```
111
112
### Exception Handling with Futures
113
114
Robust error handling and recovery patterns for asynchronous operations.
115
116
```java { .api }
117
// Catching specific exceptions
118
ListenableFuture<String> risky = riskyOperation();
119
ListenableFuture<String> withFallback = Futures.catching(risky,
120
IOException.class,
121
new Function<IOException, String>() {
122
@Override
123
public String apply(IOException e) {
124
return "fallback-value"; // Recover from IOException
125
}
126
}, executor);
127
128
// Asynchronous exception handling
129
ListenableFuture<String> withAsyncFallback = Futures.catchingAsync(risky,
130
IOException.class,
131
new AsyncFunction<IOException, String>() {
132
@Override
133
public ListenableFuture<String> apply(IOException e) throws Exception {
134
return getFallbackValueAsync(); // Async recovery
135
}
136
}, executor);
137
138
// Multiple exception types
139
ListenableFuture<String> multiCatch = Futures.catching(
140
Futures.catching(risky, IOException.class, ioFallback, executor),
141
TimeoutException.class, timeoutFallback, executor);
142
143
// Transform exceptions
144
ListenableFuture<String> transformed = Futures.transform(risky,
145
new Function<String, String>() {
146
@Override
147
public String apply(String input) {
148
if (input == null) {
149
throw new IllegalStateException("Unexpected null result");
150
}
151
return input.toUpperCase();
152
}
153
}, executor);
154
```
155
156
### Timeouts and Scheduling
157
158
Adding timeouts and scheduling capabilities to futures.
159
160
```java { .api }
161
import java.util.concurrent.ScheduledExecutorService;
162
import java.util.concurrent.Executors;
163
164
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
165
166
// Add timeout to future
167
ListenableFuture<String> slow = slowOperation();
168
ListenableFuture<String> withTimeout = Futures.withTimeout(slow, 30, TimeUnit.SECONDS, scheduler);
169
170
// Schedule async operation
171
ListenableFuture<String> scheduled = Futures.scheduleAsync(
172
new AsyncCallable<String>() {
173
@Override
174
public ListenableFuture<String> call() throws Exception {
175
return performScheduledTask();
176
}
177
}, 5, TimeUnit.SECONDS, scheduler);
178
179
// Submit callable to executor
180
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
181
Executors.newFixedThreadPool(10));
182
183
ListenableFuture<String> submitted = Futures.submit(new Callable<String>() {
184
@Override
185
public String call() throws Exception {
186
return "computed result";
187
}
188
}, executor);
189
```
190
191
### ListeningExecutorService
192
193
ExecutorService that returns ListenableFuture instances instead of regular Futures.
194
195
```java { .api }
196
import com.google.common.util.concurrent.ListeningExecutorService;
197
import com.google.common.util.concurrent.MoreExecutors;
198
import java.util.concurrent.Executors;
199
200
// Create ListeningExecutorService
201
ExecutorService threadPool = Executors.newFixedThreadPool(10);
202
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(threadPool);
203
204
// Submit tasks that return ListenableFuture
205
ListenableFuture<String> future = listeningExecutor.submit(new Callable<String>() {
206
@Override
207
public String call() throws Exception {
208
return "task result";
209
}
210
});
211
212
ListenableFuture<Void> voidFuture = listeningExecutor.submit(new Runnable() {
213
@Override
214
public void run() {
215
performTask();
216
}
217
}, null);
218
219
// Direct executor (executes immediately on calling thread)
220
ListeningExecutorService directExecutor = MoreExecutors.newDirectExecutorService();
221
222
// Same-thread executor (for testing)
223
Executor sameThreadExecutor = MoreExecutors.directExecutor();
224
225
// Graceful shutdown utility
226
MoreExecutors.shutdownAndAwaitTermination(threadPool, 60, TimeUnit.SECONDS);
227
```
228
229
### RateLimiter
230
231
Controls the rate at which operations can be performed, useful for throttling API calls or resource access.
232
233
```java { .api }
234
import com.google.common.util.concurrent.RateLimiter;
235
236
// Create rate limiter (5 permits per second)
237
RateLimiter rateLimiter = RateLimiter.create(5.0);
238
239
// Acquire permits (blocking)
240
rateLimiter.acquire(); // Acquire 1 permit
241
rateLimiter.acquire(3); // Acquire 3 permits
242
243
// Try to acquire permits (non-blocking)
244
boolean acquired = rateLimiter.tryAcquire(); // Try to acquire 1 permit
245
boolean acquired3 = rateLimiter.tryAcquire(3); // Try to acquire 3 permits
246
boolean acquiredWithTimeout = rateLimiter.tryAcquire(2, TimeUnit.SECONDS); // With timeout
247
248
// Dynamic rate adjustment
249
rateLimiter.setRate(10.0); // Change to 10 permits per second
250
double currentRate = rateLimiter.getRate();
251
252
// Bursty rate limiter (allows bursts up to specified amount)
253
RateLimiter bursty = RateLimiter.create(2.0, 5, TimeUnit.SECONDS);
254
// 2 permits/second, but can accumulate up to 10 permits over 5 seconds
255
256
// Practical usage example
257
public class ApiClient {
258
private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 10 requests/second
259
260
public String makeApiCall(String endpoint) throws Exception {
261
rateLimiter.acquire(); // Wait for permit
262
return httpClient.get(endpoint);
263
}
264
265
public Optional<String> tryApiCall(String endpoint) throws Exception {
266
if (rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
267
return Optional.of(httpClient.get(endpoint));
268
} else {
269
return Optional.absent(); // Rate limit exceeded
270
}
271
}
272
}
273
```
274
275
### Monitor
276
277
Synchronization primitive with boolean conditions, providing more flexible locking than traditional synchronized blocks.
278
279
```java { .api }
280
import com.google.common.util.concurrent.Monitor;
281
282
public class BoundedBuffer<T> {
283
private final Monitor monitor = new Monitor();
284
private final Monitor.Guard notEmpty = new Monitor.Guard(monitor) {
285
@Override
286
public boolean isSatisfied() {
287
return size > 0;
288
}
289
};
290
private final Monitor.Guard notFull = new Monitor.Guard(monitor) {
291
@Override
292
public boolean isSatisfied() {
293
return size < capacity;
294
}
295
};
296
297
private final T[] buffer;
298
private final int capacity;
299
private int size = 0;
300
private int head = 0;
301
private int tail = 0;
302
303
@SuppressWarnings("unchecked")
304
public BoundedBuffer(int capacity) {
305
this.capacity = capacity;
306
this.buffer = (T[]) new Object[capacity];
307
}
308
309
public void put(T item) throws InterruptedException {
310
monitor.enterWhen(notFull); // Wait until buffer not full
311
try {
312
buffer[tail] = item;
313
tail = (tail + 1) % capacity;
314
size++;
315
} finally {
316
monitor.leave();
317
}
318
}
319
320
public T take() throws InterruptedException {
321
monitor.enterWhen(notEmpty); // Wait until buffer not empty
322
try {
323
T item = buffer[head];
324
buffer[head] = null;
325
head = (head + 1) % capacity;
326
size--;
327
return item;
328
} finally {
329
monitor.leave();
330
}
331
}
332
333
public boolean tryPut(T item, long timeout, TimeUnit unit) throws InterruptedException {
334
if (monitor.enterWhen(notFull, timeout, unit)) {
335
try {
336
buffer[tail] = item;
337
tail = (tail + 1) % capacity;
338
size++;
339
return true;
340
} finally {
341
monitor.leave();
342
}
343
}
344
return false; // Timeout
345
}
346
347
public T tryTake(long timeout, TimeUnit unit) throws InterruptedException {
348
if (monitor.enterWhen(notEmpty, timeout, unit)) {
349
try {
350
T item = buffer[head];
351
buffer[head] = null;
352
head = (head + 1) % capacity;
353
size--;
354
return item;
355
} finally {
356
monitor.leave();
357
}
358
}
359
return null; // Timeout
360
}
361
}
362
```
363
364
### Service Framework
365
366
Framework for managing application services with lifecycle management and state transitions.
367
368
```java { .api }
369
import com.google.common.util.concurrent.Service;
370
import com.google.common.util.concurrent.AbstractService;
371
import com.google.common.util.concurrent.ServiceManager;
372
373
// Custom service implementation
374
public class DatabaseService extends AbstractService {
375
private DatabaseConnection connection;
376
377
@Override
378
protected void doStart() {
379
try {
380
connection = new DatabaseConnection();
381
connection.connect();
382
notifyStarted(); // Signal successful start
383
} catch (Exception e) {
384
notifyFailed(e); // Signal start failure
385
}
386
}
387
388
@Override
389
protected void doStop() {
390
try {
391
if (connection != null) {
392
connection.close();
393
}
394
notifyStopped(); // Signal successful stop
395
} catch (Exception e) {
396
notifyFailed(e); // Signal stop failure
397
}
398
}
399
400
public void executeQuery(String sql) {
401
// Service must be running to execute operations
402
checkRunning();
403
connection.execute(sql);
404
}
405
406
private void checkRunning() {
407
if (state() != State.RUNNING) {
408
throw new IllegalStateException("Service is not running: " + state());
409
}
410
}
411
}
412
413
// Service management
414
Service databaseService = new DatabaseService();
415
Service webService = new WebService();
416
Service cacheService = new CacheService();
417
418
// Individual service management
419
databaseService.startAsync();
420
databaseService.awaitRunning(30, TimeUnit.SECONDS); // Wait for start
421
422
// Service state monitoring
423
Service.State state = databaseService.state();
424
boolean isRunning = (state == Service.State.RUNNING);
425
426
// Add state listeners
427
databaseService.addListener(new Service.Listener() {
428
@Override
429
public void starting() {
430
System.out.println("Database service starting...");
431
}
432
433
@Override
434
public void running() {
435
System.out.println("Database service is running");
436
}
437
438
@Override
439
public void stopping(Service.State from) {
440
System.out.println("Database service stopping from state: " + from);
441
}
442
443
@Override
444
public void terminated(Service.State from) {
445
System.out.println("Database service terminated from state: " + from);
446
}
447
448
@Override
449
public void failed(Service.State from, Throwable failure) {
450
System.err.println("Database service failed: " + failure.getMessage());
451
}
452
}, MoreExecutors.directExecutor());
453
454
// Managing multiple services
455
ServiceManager serviceManager = new ServiceManager(
456
Arrays.asList(databaseService, webService, cacheService));
457
458
// Start all services
459
serviceManager.startAsync();
460
serviceManager.awaitHealthy(60, TimeUnit.SECONDS); // Wait for all to be healthy
461
462
// Stop all services
463
serviceManager.stopAsync();
464
serviceManager.awaitStopped(30, TimeUnit.SECONDS);
465
466
// Service dependencies and health checks
467
Map<Service, Long> startupTimes = serviceManager.startupTimes();
468
ImmutableMultimap<Service.State, Service> servicesByState = serviceManager.servicesByState();
469
```
470
471
### Striped Locks
472
473
Provides a set of locks that can be used to stripe synchronization across multiple objects.
474
475
```java { .api }
476
import com.google.common.util.concurrent.Striped;
477
import java.util.concurrent.locks.Lock;
478
import java.util.concurrent.locks.ReadWriteLock;
479
480
// Striped locks for better concurrency
481
Striped<Lock> striped = Striped.lock(16); // 16 locks in the stripe
482
Striped<ReadWriteLock> readWriteStriped = Striped.readWriteLock(16);
483
484
// Use with object keys
485
public class StripedCounter {
486
private final ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();
487
private final Striped<Lock> striped = Striped.lock(32);
488
489
public void increment(String key) {
490
Lock lock = striped.get(key); // Get lock for this key
491
lock.lock();
492
try {
493
AtomicInteger counter = counters.get(key);
494
if (counter == null) {
495
counter = new AtomicInteger(0);
496
counters.put(key, counter);
497
}
498
counter.incrementAndGet();
499
} finally {
500
lock.unlock();
501
}
502
}
503
504
public int get(String key) {
505
Lock lock = striped.get(key);
506
lock.lock();
507
try {
508
AtomicInteger counter = counters.get(key);
509
return counter != null ? counter.get() : 0;
510
} finally {
511
lock.unlock();
512
}
513
}
514
}
515
516
// Bulk operations with ordered locking (avoids deadlocks)
517
public void transfer(String fromKey, String toKey, int amount) {
518
Iterable<Lock> locks = striped.bulkGet(Arrays.asList(fromKey, toKey));
519
for (Lock lock : locks) {
520
lock.lock();
521
}
522
try {
523
// Perform transfer operation
524
AtomicInteger from = counters.get(fromKey);
525
AtomicInteger to = counters.get(toKey);
526
if (from != null && from.get() >= amount) {
527
from.addAndGet(-amount);
528
if (to == null) {
529
counters.put(toKey, new AtomicInteger(amount));
530
} else {
531
to.addAndGet(amount);
532
}
533
}
534
} finally {
535
// Release in reverse order
536
List<Lock> locksList = Lists.newArrayList(locks);
537
Collections.reverse(locksList);
538
for (Lock lock : locksList) {
539
lock.unlock();
540
}
541
}
542
}
543
```
544
545
### Atomic Operations and Utilities
546
547
Enhanced atomic operations and utilities for concurrent programming.
548
549
```java { .api }
550
import com.google.common.util.concurrent.AtomicDouble;
551
import com.google.common.util.concurrent.AtomicLongMap;
552
import com.google.common.util.concurrent.Uninterruptibles;
553
554
// Atomic operations on doubles
555
AtomicDouble atomicDouble = new AtomicDouble(0.0);
556
double result = atomicDouble.addAndGet(3.14);
557
atomicDouble.compareAndSet(3.14, 2.71);
558
559
// Atomic map for counters
560
AtomicLongMap<String> counters = AtomicLongMap.create();
561
long count = counters.incrementAndGet("requests"); // Atomic increment
562
counters.addAndGet("bytes", 1024); // Atomic add
563
long total = counters.get("requests"); // Get current value
564
Map<String, Long> snapshot = counters.asMap(); // Snapshot of all values
565
566
// Uninterruptible operations
567
public void robustOperation() {
568
// Sleep that can't be interrupted
569
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
570
571
// Join that can't be interrupted
572
Thread workerThread = new Thread(task);
573
workerThread.start();
574
Uninterruptibles.joinUninterruptibly(workerThread);
575
576
// Future get that can't be interrupted
577
Future<String> future = executor.submit(callable);
578
String result = Uninterruptibles.getUninterruptibly(future);
579
580
// Future get with timeout that can't be interrupted
581
String resultWithTimeout = Uninterruptibles.getUninterruptibly(
582
future, 30, TimeUnit.SECONDS);
583
}
584
```
585
586
### Testing Concurrent Code
587
588
Utilities for testing concurrent code and timing behavior.
589
590
```java { .api }
591
import com.google.common.testing.FakeTicker;
592
import com.google.common.base.Ticker;
593
594
// Testing with fake time
595
public class ConcurrentServiceTest {
596
597
@Test
598
public void testRateLimiting() {
599
FakeTicker ticker = new FakeTicker();
600
RateLimiter rateLimiter = RateLimiter.create(1.0, ticker); // 1 permit/second
601
602
// Should allow first request immediately
603
assertTrue(rateLimiter.tryAcquire());
604
605
// Should deny second request immediately
606
assertFalse(rateLimiter.tryAcquire());
607
608
// Advance time by 1 second
609
ticker.advance(1, TimeUnit.SECONDS);
610
611
// Should allow request after time advance
612
assertTrue(rateLimiter.tryAcquire());
613
}
614
615
@Test
616
public void testServiceLifecycle() throws Exception {
617
TestService service = new TestService();
618
619
// Test initial state
620
assertEquals(Service.State.NEW, service.state());
621
622
// Test successful start
623
service.startAsync();
624
service.awaitRunning(1, TimeUnit.SECONDS);
625
assertEquals(Service.State.RUNNING, service.state());
626
627
// Test successful stop
628
service.stopAsync();
629
service.awaitTerminated(1, TimeUnit.SECONDS);
630
assertEquals(Service.State.TERMINATED, service.state());
631
}
632
}
633
```
634
635
Guava's concurrency utilities provide powerful abstractions for asynchronous programming, resource management, and thread coordination that go well beyond Java's standard concurrency libraries while maintaining compatibility and ease of use.