0
# Concurrent Utilities
1
2
Apache Commons Lang provides robust concurrency utilities for thread-safe initialization, background processing, circuit breakers, and fault tolerance. These utilities simplify concurrent programming patterns while providing proper error handling and resource management.
3
4
## Core Concurrent Classes
5
6
### ConcurrentUtils - General Concurrency Utilities
7
8
Provides utility methods for safe concurrent operations and exception handling:
9
10
```java { .api }
11
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
12
```
13
14
#### Safe Initialization Patterns
15
16
```java { .api }
17
// Safe initialization with exception handling
18
public static <T> T initialize(ConcurrentInitializer<T> initializer) throws ConcurrentException
19
public static <T> T initializeUnchecked(ConcurrentInitializer<T> initializer)
20
21
// Exception handling utilities
22
public static ConcurrentException extractCause(ExecutionException ex)
23
public static ConcurrentRuntimeException extractCauseUnchecked(ExecutionException ex)
24
public static void handleCause(ExecutionException ex) throws ConcurrentException
25
public static void handleCauseUnchecked(ExecutionException ex)
26
27
// Map operations
28
public static <K, V> V createIfAbsent(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init) throws ConcurrentException
29
public static <K, V> V createIfAbsentUnchecked(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init)
30
public static <K, V> V putIfAbsent(ConcurrentMap<K, V> map, K key, V value)
31
```
32
33
**Usage Examples:**
34
```java { .api }
35
public class ConcurrentUtilsExamples {
36
37
private final ConcurrentMap<String, DatabaseConnection> connections = new ConcurrentHashMap<>();
38
39
// Safe map operations
40
public DatabaseConnection getConnection(String name) throws ConcurrentException {
41
return ConcurrentUtils.createIfAbsent(connections, name, () -> {
42
// This initialization happens only once per key
43
return DatabaseConnectionFactory.create(name);
44
});
45
}
46
47
// Unchecked version (wraps exceptions in runtime exceptions)
48
public DatabaseConnection getConnectionUnchecked(String name) {
49
return ConcurrentUtils.createIfAbsentUnchecked(connections, name, () -> {
50
return DatabaseConnectionFactory.create(name);
51
});
52
}
53
54
// Safe Future handling
55
public <T> T getResultSafely(Future<T> future) throws ConcurrentException {
56
try {
57
return future.get();
58
} catch (ExecutionException e) {
59
ConcurrentUtils.handleCause(e);
60
return null; // Never reached
61
} catch (InterruptedException e) {
62
Thread.currentThread().interrupt();
63
throw new ConcurrentException("Thread interrupted", e);
64
}
65
}
66
}
67
```
68
69
### Initializer Classes - Thread-Safe Lazy Initialization
70
71
#### AtomicInitializer - Single-Use Atomic Initialization
72
73
```java { .api }
74
import org.apache.commons.lang3.concurrent.AtomicInitializer;
75
```
76
77
```java { .api }
78
public class AtomicInitializerExample {
79
80
// Expensive object that should be created only once
81
private final AtomicInitializer<DatabaseConnectionPool> poolInitializer =
82
new AtomicInitializer<DatabaseConnectionPool>() {
83
@Override
84
protected DatabaseConnectionPool initialize() throws ConcurrentException {
85
// This method is called exactly once
86
return new DatabaseConnectionPool(
87
"jdbc:postgresql://localhost/mydb",
88
"user", "password", 10
89
);
90
}
91
};
92
93
public DatabaseConnectionPool getConnectionPool() throws ConcurrentException {
94
return poolInitializer.get(); // Thread-safe, initializes only once
95
}
96
97
// Alternative using builder pattern (Java 8+)
98
private final AtomicInitializer<ConfigurationManager> configInitializer =
99
AtomicInitializer.<ConfigurationManager>builder()
100
.setInitializer(() -> ConfigurationManager.loadFromFile("config.properties"))
101
.get();
102
}
103
```
104
105
#### LazyInitializer - Thread-Safe Lazy Initialization
106
107
```java { .api }
108
import org.apache.commons.lang3.concurrent.LazyInitializer;
109
```
110
111
```java { .api }
112
public class LazyInitializerExample {
113
114
// Cache that's initialized on first access
115
private final LazyInitializer<Cache<String, Object>> cacheInitializer =
116
new LazyInitializer<Cache<String, Object>>() {
117
@Override
118
protected Cache<String, Object> initialize() throws ConcurrentException {
119
return CacheBuilder.newBuilder()
120
.maximumSize(1000)
121
.expireAfterWrite(30, TimeUnit.MINUTES)
122
.build();
123
}
124
};
125
126
public Cache<String, Object> getCache() throws ConcurrentException {
127
return cacheInitializer.get();
128
}
129
130
// Service registry example
131
private final LazyInitializer<ServiceRegistry> registryInitializer =
132
LazyInitializer.<ServiceRegistry>builder()
133
.setInitializer(() -> {
134
ServiceRegistry registry = new ServiceRegistry();
135
registry.registerService("userService", new UserServiceImpl());
136
registry.registerService("orderService", new OrderServiceImpl());
137
return registry;
138
})
139
.get();
140
}
141
```
142
143
#### ConstantInitializer - Simple Constant Value Holder
144
145
```java { .api }
146
import org.apache.commons.lang3.concurrent.ConstantInitializer;
147
```
148
149
```java { .api }
150
public class ConstantInitializerExample {
151
152
// For pre-computed values
153
private final ConcurrentInitializer<String> appNameInitializer =
154
new ConstantInitializer<>("MyApplication v1.0");
155
156
// For configuration values
157
private final ConcurrentInitializer<Integer> maxUsersInitializer =
158
new ConstantInitializer<>(Integer.parseInt(System.getProperty("max.users", "1000")));
159
160
public String getApplicationName() throws ConcurrentException {
161
return appNameInitializer.get();
162
}
163
164
public int getMaxUsers() throws ConcurrentException {
165
return maxUsersInitializer.get();
166
}
167
}
168
```
169
170
### Background Processing
171
172
#### BackgroundInitializer - Asynchronous Initialization
173
174
```java { .api }
175
import org.apache.commons.lang3.concurrent.BackgroundInitializer;
176
```
177
178
```java { .api }
179
public class BackgroundProcessingExample {
180
181
// Initialize expensive resources in background
182
private final BackgroundInitializer<SearchIndex> searchIndexInitializer =
183
new BackgroundInitializer<SearchIndex>() {
184
@Override
185
protected SearchIndex initialize() throws Exception {
186
// This runs in a background thread
187
SearchIndex index = new SearchIndex();
188
index.loadFromDatabase(); // Expensive operation
189
return index;
190
}
191
};
192
193
public void startBackgroundInitialization() {
194
// Start background initialization immediately
195
searchIndexInitializer.start();
196
}
197
198
public SearchIndex getSearchIndex() throws ConcurrentException {
199
// This will block until background initialization is complete
200
return searchIndexInitializer.get();
201
}
202
203
// Custom executor example
204
private final ExecutorService customExecutor = Executors.newFixedThreadPool(2);
205
206
private final BackgroundInitializer<ReportGenerator> reportInitializer =
207
BackgroundInitializer.<ReportGenerator>builder()
208
.setExecutor(customExecutor)
209
.setInitializer(() -> {
210
ReportGenerator generator = new ReportGenerator();
211
generator.preloadTemplates();
212
return generator;
213
})
214
.get();
215
}
216
```
217
218
#### MultiBackgroundInitializer - Multiple Background Tasks
219
220
```java { .api }
221
import org.apache.commons.lang3.concurrent.MultiBackgroundInitializer;
222
```
223
224
```java { .api }
225
public class MultiBackgroundExample {
226
227
public void initializeApplication() throws ConcurrentException {
228
MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();
229
230
// Add multiple background tasks
231
initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {
232
@Override
233
protected DataSource initialize() throws Exception {
234
return createDataSource();
235
}
236
});
237
238
initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {
239
@Override
240
protected CacheManager initialize() throws Exception {
241
return createCacheManager();
242
}
243
});
244
245
initializer.addInitializer("search", new BackgroundInitializer<SearchEngine>() {
246
@Override
247
protected SearchEngine initialize() throws Exception {
248
return createSearchEngine();
249
}
250
});
251
252
// Start all background tasks
253
MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();
254
255
// Get results (blocks until all complete)
256
DataSource dataSource = (DataSource) results.getInitializer("database").get();
257
CacheManager cacheManager = (CacheManager) results.getInitializer("cache").get();
258
SearchEngine searchEngine = (SearchEngine) results.getInitializer("search").get();
259
260
// Check for exceptions
261
if (results.isException("database")) {
262
Exception dbException = results.getException("database");
263
log.error("Database initialization failed", dbException);
264
}
265
}
266
}
267
```
268
269
### Circuit Breaker Pattern
270
271
#### EventCountCircuitBreaker - Failure Rate Protection
272
273
```java { .api }
274
import org.apache.commons.lang3.concurrent.EventCountCircuitBreaker;
275
```
276
277
```java { .api }
278
public class CircuitBreakerExample {
279
280
// Circuit breaker: max 5 failures in 1 minute, then open for 30 seconds
281
private final EventCountCircuitBreaker circuitBreaker =
282
new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);
283
284
public String callExternalService(String request) throws ServiceException {
285
// Check if circuit is open (too many recent failures)
286
if (!circuitBreaker.checkState()) {
287
throw new ServiceException("Circuit breaker is OPEN - service temporarily unavailable");
288
}
289
290
try {
291
// Attempt the potentially failing operation
292
String response = externalServiceClient.call(request);
293
294
// Reset failure count on success
295
circuitBreaker.close();
296
297
return response;
298
299
} catch (Exception e) {
300
// Record failure
301
circuitBreaker.incrementAndCheckState();
302
303
throw new ServiceException("External service call failed", e);
304
}
305
}
306
307
// Monitor circuit breaker state
308
public CircuitBreakerStatus getCircuitBreakerStatus() {
309
return new CircuitBreakerStatus(
310
circuitBreaker.getState().name(),
311
circuitBreaker.getCheckInterval(),
312
circuitBreaker.getCheckIntervalUnit(),
313
circuitBreaker.getOpeningThreshold(),
314
circuitBreaker.getClosingThreshold()
315
);
316
}
317
}
318
```
319
320
#### ThresholdCircuitBreaker - Simple Threshold Protection
321
322
```java { .api }
323
import org.apache.commons.lang3.concurrent.ThresholdCircuitBreaker;
324
```
325
326
```java { .api }
327
public class ThresholdCircuitBreakerExample {
328
329
// Circuit breaker that opens after 10 failures
330
private final ThresholdCircuitBreaker circuitBreaker = new ThresholdCircuitBreaker(10);
331
332
public void processMessage(Message message) throws ProcessingException {
333
if (!circuitBreaker.checkState()) {
334
throw new ProcessingException("Message processing circuit breaker is OPEN");
335
}
336
337
try {
338
messageProcessor.process(message);
339
// Reset on successful processing
340
// Note: ThresholdCircuitBreaker doesn't auto-reset, manual reset needed
341
342
} catch (Exception e) {
343
circuitBreaker.incrementAndCheckState();
344
throw new ProcessingException("Message processing failed", e);
345
}
346
}
347
348
// Manual reset method (could be called by admin endpoint)
349
public void resetCircuitBreaker() {
350
circuitBreaker.close();
351
}
352
}
353
```
354
355
### Thread Factory and Management
356
357
#### BasicThreadFactory - Customizable Thread Creation
358
359
```java { .api }
360
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
361
```
362
363
```java { .api }
364
public class ThreadFactoryExample {
365
366
// Custom thread factory with naming and daemon settings
367
private final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
368
.namingPattern("worker-thread-%d")
369
.daemon(true)
370
.priority(Thread.NORM_PRIORITY)
371
.uncaughtExceptionHandler((thread, exception) -> {
372
log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage(), exception);
373
})
374
.build();
375
376
private final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
377
378
// Background task processing
379
public void submitBackgroundTask(Runnable task) {
380
executorService.submit(() -> {
381
try {
382
task.run();
383
} catch (Exception e) {
384
log.error("Background task failed", e);
385
// Exception is also handled by uncaught exception handler
386
}
387
});
388
}
389
390
// Scheduled task executor with custom threads
391
private final ScheduledExecutorService scheduledExecutor =
392
Executors.newScheduledThreadPool(5, new BasicThreadFactory.Builder()
393
.namingPattern("scheduled-task-%d")
394
.daemon(false) // Keep JVM alive
395
.priority(Thread.MAX_PRIORITY)
396
.build());
397
398
public void scheduleMaintenanceTask() {
399
scheduledExecutor.scheduleAtFixedRate(() -> {
400
performMaintenance();
401
}, 0, 1, TimeUnit.HOURS);
402
}
403
}
404
```
405
406
### Advanced Concurrent Patterns
407
408
#### Memoizer - Thread-Safe Caching
409
410
```java { .api }
411
import org.apache.commons.lang3.concurrent.Memoizer;
412
```
413
414
```java { .api }
415
public class MemoizerExample {
416
417
// Cache expensive computations
418
private final Memoizer<String, UserProfile> userProfileCache =
419
new Memoizer<>(userId -> {
420
// This computation happens only once per userId
421
return loadUserProfileFromDatabase(userId);
422
});
423
424
public UserProfile getUserProfile(String userId) throws InterruptedException, ExecutionException {
425
return userProfileCache.compute(userId);
426
}
427
428
// With custom cache implementation
429
private final Memoizer<CacheKey, Report> reportCache =
430
new Memoizer<>(
431
key -> generateReport(key),
432
new ConcurrentHashMap<>(), // Custom cache implementation
433
true // Recalculate on InterruptedException
434
);
435
436
public Report getReport(String reportType, Date startDate, Date endDate)
437
throws InterruptedException, ExecutionException {
438
CacheKey key = new CacheKey(reportType, startDate, endDate);
439
return reportCache.compute(key);
440
}
441
}
442
```
443
444
#### TimedSemaphore - Rate Limiting
445
446
```java { .api }
447
import org.apache.commons.lang3.concurrent.TimedSemaphore;
448
```
449
450
```java { .api }
451
public class RateLimitingExample {
452
453
// Allow maximum 100 operations per second
454
private final TimedSemaphore rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 100);
455
456
public void callRateLimitedAPI(String request) throws InterruptedException {
457
// This will block if rate limit is exceeded
458
rateLimiter.acquire();
459
460
try {
461
apiClient.makeCall(request);
462
} finally {
463
// Permits are automatically released after the time period
464
}
465
}
466
467
// Dynamic rate adjustment
468
public void adjustRateLimit(int newLimit) {
469
rateLimiter.setLimit(newLimit);
470
}
471
472
// Monitor rate limiter
473
public RateLimiterStatus getRateLimiterStatus() {
474
return new RateLimiterStatus(
475
rateLimiter.getLimit(),
476
rateLimiter.getAvailablePermits(),
477
rateLimiter.getAcquireCount(),
478
rateLimiter.getPeriod(),
479
rateLimiter.getUnit()
480
);
481
}
482
483
// Shutdown
484
public void shutdown() {
485
rateLimiter.shutdown();
486
}
487
}
488
```
489
490
## Real-World Integration Examples
491
492
### Spring Boot Integration
493
494
```java { .api }
495
@Configuration
496
@EnableAsync
497
public class ConcurrentConfiguration {
498
499
@Bean
500
public ThreadFactory taskThreadFactory() {
501
return new BasicThreadFactory.Builder()
502
.namingPattern("async-task-%d")
503
.daemon(true)
504
.priority(Thread.NORM_PRIORITY)
505
.uncaughtExceptionHandler((thread, exception) -> {
506
log.error("Uncaught exception in async task thread {}", thread.getName(), exception);
507
})
508
.build();
509
}
510
511
@Bean
512
public TaskExecutor taskExecutor(ThreadFactory taskThreadFactory) {
513
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
514
executor.setCorePoolSize(10);
515
executor.setMaxPoolSize(20);
516
executor.setQueueCapacity(100);
517
executor.setThreadFactory(taskThreadFactory);
518
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
519
executor.initialize();
520
return executor;
521
}
522
523
@Bean
524
public EventCountCircuitBreaker externalServiceCircuitBreaker() {
525
// 5 failures per minute opens circuit for 30 seconds
526
return new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);
527
}
528
}
529
530
@Service
531
public class ExternalServiceClient {
532
533
private final EventCountCircuitBreaker circuitBreaker;
534
private final RestTemplate restTemplate;
535
536
public ExternalServiceClient(EventCountCircuitBreaker circuitBreaker, RestTemplate restTemplate) {
537
this.circuitBreaker = circuitBreaker;
538
this.restTemplate = restTemplate;
539
}
540
541
public CompletableFuture<String> callExternalServiceAsync(String request) {
542
return CompletableFuture.supplyAsync(() -> {
543
if (!circuitBreaker.checkState()) {
544
throw new ServiceUnavailableException("External service circuit breaker is OPEN");
545
}
546
547
try {
548
String response = restTemplate.getForObject("/external/api?q=" + request, String.class);
549
circuitBreaker.close(); // Reset on success
550
return response;
551
} catch (Exception e) {
552
circuitBreaker.incrementAndCheckState();
553
throw new ExternalServiceException("External service call failed", e);
554
}
555
});
556
}
557
}
558
```
559
560
### Microservice Resilience Pattern
561
562
```java { .api }
563
@Component
564
public class ResilientServiceClient {
565
566
private final TimedSemaphore rateLimiter;
567
private final EventCountCircuitBreaker circuitBreaker;
568
private final Memoizer<String, ServiceResponse> cache;
569
570
public ResilientServiceClient() {
571
// Rate limiting: 50 requests per second
572
this.rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 50);
573
574
// Circuit breaker: 10 failures in 2 minutes opens for 60 seconds
575
this.circuitBreaker = new EventCountCircuitBreaker(10, 2, TimeUnit.MINUTES, 5, 60, TimeUnit.SECONDS);
576
577
// Response caching
578
this.cache = new Memoizer<>(this::callServiceWithoutProtection);
579
}
580
581
public ServiceResponse callService(String request) throws ServiceException {
582
try {
583
// Apply rate limiting
584
rateLimiter.acquire();
585
586
// Check circuit breaker
587
if (!circuitBreaker.checkState()) {
588
return getFallbackResponse(request);
589
}
590
591
// Use cached response if available
592
return cache.compute(request);
593
594
} catch (InterruptedException e) {
595
Thread.currentThread().interrupt();
596
throw new ServiceException("Service call interrupted", e);
597
} catch (ExecutionException e) {
598
circuitBreaker.incrementAndCheckState();
599
throw new ServiceException("Service call failed", e.getCause());
600
}
601
}
602
603
private ServiceResponse callServiceWithoutProtection(String request) {
604
try {
605
ServiceResponse response = httpClient.call(request);
606
circuitBreaker.close(); // Reset on success
607
return response;
608
} catch (Exception e) {
609
throw new RuntimeException("Service call failed", e);
610
}
611
}
612
613
private ServiceResponse getFallbackResponse(String request) {
614
return ServiceResponse.fallback("Service temporarily unavailable");
615
}
616
617
@PreDestroy
618
public void shutdown() {
619
rateLimiter.shutdown();
620
}
621
}
622
```
623
624
### Application Startup Coordinator
625
626
```java { .api }
627
@Component
628
public class ApplicationStartupCoordinator {
629
630
private final MultiBackgroundInitializer startupInitializer;
631
private volatile boolean applicationReady = false;
632
633
@EventListener(ApplicationStartedEvent.class)
634
public void onApplicationStarted() throws ConcurrentException {
635
log.info("Starting background initialization...");
636
637
MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();
638
639
// Database initialization
640
initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {
641
@Override
642
protected DataSource initialize() throws Exception {
643
log.info("Initializing database connections...");
644
return dataSourceFactory.createDataSource();
645
}
646
});
647
648
// Cache warming
649
initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {
650
@Override
651
protected CacheManager initialize() throws Exception {
652
log.info("Warming up caches...");
653
CacheManager cacheManager = createCacheManager();
654
cacheManager.preloadData();
655
return cacheManager;
656
}
657
});
658
659
// External service health check
660
initializer.addInitializer("external-services", new BackgroundInitializer<HealthCheckResults>() {
661
@Override
662
protected HealthCheckResults initialize() throws Exception {
663
log.info("Checking external service health...");
664
return healthChecker.checkAllServices();
665
}
666
});
667
668
// Start all background tasks
669
MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();
670
671
// Wait for completion and handle results
672
handleInitializationResults(results);
673
674
applicationReady = true;
675
log.info("Application startup completed successfully");
676
}
677
678
private void handleInitializationResults(MultiBackgroundInitializer.MultiBackgroundInitializerResults results) {
679
for (String taskName : Arrays.asList("database", "cache", "external-services")) {
680
if (results.isException(taskName)) {
681
Exception exception = results.getException(taskName);
682
log.error("Initialization failed for {}: {}", taskName, exception.getMessage(), exception);
683
684
// Decide whether to fail startup or continue
685
if ("database".equals(taskName)) {
686
throw new ApplicationStartupException("Critical component failed: " + taskName, exception);
687
} else {
688
log.warn("Non-critical component failed, continuing startup: {}", taskName);
689
}
690
} else {
691
log.info("Successfully initialized: {}", taskName);
692
}
693
}
694
}
695
696
@EventListener(ContextClosedEvent.class)
697
public void onApplicationShutdown() {
698
if (startupInitializer != null) {
699
// Graceful shutdown of background tasks
700
try {
701
startupInitializer.shutdown();
702
} catch (Exception e) {
703
log.warn("Error during shutdown: {}", e.getMessage(), e);
704
}
705
}
706
}
707
708
public boolean isApplicationReady() {
709
return applicationReady;
710
}
711
}
712
```
713
714
The concurrent utilities in Apache Commons Lang provide essential building blocks for creating robust, thread-safe applications with proper initialization patterns, fault tolerance, and resource management that scale well under concurrent load.