Apache Commons Lang provides essential Java utility classes for string manipulation, object operations, array handling, date/time processing, reflection utilities, and more.
—
Apache Commons Lang provides robust concurrency utilities for thread-safe initialization, background processing, circuit breakers, and fault tolerance. These utilities simplify concurrent programming patterns while providing proper error handling and resource management.
Provides utility methods for safe concurrent operations and exception handling:
import org.apache.commons.lang3.concurrent.ConcurrentUtils;// Safe initialization with exception handling
public static <T> T initialize(ConcurrentInitializer<T> initializer) throws ConcurrentException
public static <T> T initializeUnchecked(ConcurrentInitializer<T> initializer)
// Exception handling utilities
public static ConcurrentException extractCause(ExecutionException ex)
public static ConcurrentRuntimeException extractCauseUnchecked(ExecutionException ex)
public static void handleCause(ExecutionException ex) throws ConcurrentException
public static void handleCauseUnchecked(ExecutionException ex)
// Map operations
public static <K, V> V createIfAbsent(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init) throws ConcurrentException
public static <K, V> V createIfAbsentUnchecked(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init)
public static <K, V> V putIfAbsent(ConcurrentMap<K, V> map, K key, V value)Usage Examples:
public class ConcurrentUtilsExamples {
private final ConcurrentMap<String, DatabaseConnection> connections = new ConcurrentHashMap<>();
// Safe map operations
public DatabaseConnection getConnection(String name) throws ConcurrentException {
return ConcurrentUtils.createIfAbsent(connections, name, () -> {
// This initialization happens only once per key
return DatabaseConnectionFactory.create(name);
});
}
// Unchecked version (wraps exceptions in runtime exceptions)
public DatabaseConnection getConnectionUnchecked(String name) {
return ConcurrentUtils.createIfAbsentUnchecked(connections, name, () -> {
return DatabaseConnectionFactory.create(name);
});
}
// Safe Future handling
public <T> T getResultSafely(Future<T> future) throws ConcurrentException {
try {
return future.get();
} catch (ExecutionException e) {
ConcurrentUtils.handleCause(e);
return null; // Never reached
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConcurrentException("Thread interrupted", e);
}
}
}import org.apache.commons.lang3.concurrent.AtomicInitializer;public class AtomicInitializerExample {
// Expensive object that should be created only once
private final AtomicInitializer<DatabaseConnectionPool> poolInitializer =
new AtomicInitializer<DatabaseConnectionPool>() {
@Override
protected DatabaseConnectionPool initialize() throws ConcurrentException {
// This method is called exactly once
return new DatabaseConnectionPool(
"jdbc:postgresql://localhost/mydb",
"user", "password", 10
);
}
};
public DatabaseConnectionPool getConnectionPool() throws ConcurrentException {
return poolInitializer.get(); // Thread-safe, initializes only once
}
// Alternative using builder pattern (Java 8+)
private final AtomicInitializer<ConfigurationManager> configInitializer =
AtomicInitializer.<ConfigurationManager>builder()
.setInitializer(() -> ConfigurationManager.loadFromFile("config.properties"))
.get();
}import org.apache.commons.lang3.concurrent.LazyInitializer;public class LazyInitializerExample {
// Cache that's initialized on first access
private final LazyInitializer<Cache<String, Object>> cacheInitializer =
new LazyInitializer<Cache<String, Object>>() {
@Override
protected Cache<String, Object> initialize() throws ConcurrentException {
return CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
}
};
public Cache<String, Object> getCache() throws ConcurrentException {
return cacheInitializer.get();
}
// Service registry example
private final LazyInitializer<ServiceRegistry> registryInitializer =
LazyInitializer.<ServiceRegistry>builder()
.setInitializer(() -> {
ServiceRegistry registry = new ServiceRegistry();
registry.registerService("userService", new UserServiceImpl());
registry.registerService("orderService", new OrderServiceImpl());
return registry;
})
.get();
}import org.apache.commons.lang3.concurrent.ConstantInitializer;public class ConstantInitializerExample {
// For pre-computed values
private final ConcurrentInitializer<String> appNameInitializer =
new ConstantInitializer<>("MyApplication v1.0");
// For configuration values
private final ConcurrentInitializer<Integer> maxUsersInitializer =
new ConstantInitializer<>(Integer.parseInt(System.getProperty("max.users", "1000")));
public String getApplicationName() throws ConcurrentException {
return appNameInitializer.get();
}
public int getMaxUsers() throws ConcurrentException {
return maxUsersInitializer.get();
}
}import org.apache.commons.lang3.concurrent.BackgroundInitializer;public class BackgroundProcessingExample {
// Initialize expensive resources in background
private final BackgroundInitializer<SearchIndex> searchIndexInitializer =
new BackgroundInitializer<SearchIndex>() {
@Override
protected SearchIndex initialize() throws Exception {
// This runs in a background thread
SearchIndex index = new SearchIndex();
index.loadFromDatabase(); // Expensive operation
return index;
}
};
public void startBackgroundInitialization() {
// Start background initialization immediately
searchIndexInitializer.start();
}
public SearchIndex getSearchIndex() throws ConcurrentException {
// This will block until background initialization is complete
return searchIndexInitializer.get();
}
// Custom executor example
private final ExecutorService customExecutor = Executors.newFixedThreadPool(2);
private final BackgroundInitializer<ReportGenerator> reportInitializer =
BackgroundInitializer.<ReportGenerator>builder()
.setExecutor(customExecutor)
.setInitializer(() -> {
ReportGenerator generator = new ReportGenerator();
generator.preloadTemplates();
return generator;
})
.get();
}import org.apache.commons.lang3.concurrent.MultiBackgroundInitializer;public class MultiBackgroundExample {
public void initializeApplication() throws ConcurrentException {
MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();
// Add multiple background tasks
initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {
@Override
protected DataSource initialize() throws Exception {
return createDataSource();
}
});
initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {
@Override
protected CacheManager initialize() throws Exception {
return createCacheManager();
}
});
initializer.addInitializer("search", new BackgroundInitializer<SearchEngine>() {
@Override
protected SearchEngine initialize() throws Exception {
return createSearchEngine();
}
});
// Start all background tasks
MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();
// Get results (blocks until all complete)
DataSource dataSource = (DataSource) results.getInitializer("database").get();
CacheManager cacheManager = (CacheManager) results.getInitializer("cache").get();
SearchEngine searchEngine = (SearchEngine) results.getInitializer("search").get();
// Check for exceptions
if (results.isException("database")) {
Exception dbException = results.getException("database");
log.error("Database initialization failed", dbException);
}
}
}import org.apache.commons.lang3.concurrent.EventCountCircuitBreaker;public class CircuitBreakerExample {
// Circuit breaker: max 5 failures in 1 minute, then open for 30 seconds
private final EventCountCircuitBreaker circuitBreaker =
new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);
public String callExternalService(String request) throws ServiceException {
// Check if circuit is open (too many recent failures)
if (!circuitBreaker.checkState()) {
throw new ServiceException("Circuit breaker is OPEN - service temporarily unavailable");
}
try {
// Attempt the potentially failing operation
String response = externalServiceClient.call(request);
// Reset failure count on success
circuitBreaker.close();
return response;
} catch (Exception e) {
// Record failure
circuitBreaker.incrementAndCheckState();
throw new ServiceException("External service call failed", e);
}
}
// Monitor circuit breaker state
public CircuitBreakerStatus getCircuitBreakerStatus() {
return new CircuitBreakerStatus(
circuitBreaker.getState().name(),
circuitBreaker.getCheckInterval(),
circuitBreaker.getCheckIntervalUnit(),
circuitBreaker.getOpeningThreshold(),
circuitBreaker.getClosingThreshold()
);
}
}import org.apache.commons.lang3.concurrent.ThresholdCircuitBreaker;public class ThresholdCircuitBreakerExample {
// Circuit breaker that opens after 10 failures
private final ThresholdCircuitBreaker circuitBreaker = new ThresholdCircuitBreaker(10);
public void processMessage(Message message) throws ProcessingException {
if (!circuitBreaker.checkState()) {
throw new ProcessingException("Message processing circuit breaker is OPEN");
}
try {
messageProcessor.process(message);
// Reset on successful processing
// Note: ThresholdCircuitBreaker doesn't auto-reset, manual reset needed
} catch (Exception e) {
circuitBreaker.incrementAndCheckState();
throw new ProcessingException("Message processing failed", e);
}
}
// Manual reset method (could be called by admin endpoint)
public void resetCircuitBreaker() {
circuitBreaker.close();
}
}import org.apache.commons.lang3.concurrent.BasicThreadFactory;public class ThreadFactoryExample {
// Custom thread factory with naming and daemon settings
private final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.namingPattern("worker-thread-%d")
.daemon(true)
.priority(Thread.NORM_PRIORITY)
.uncaughtExceptionHandler((thread, exception) -> {
log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage(), exception);
})
.build();
private final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
// Background task processing
public void submitBackgroundTask(Runnable task) {
executorService.submit(() -> {
try {
task.run();
} catch (Exception e) {
log.error("Background task failed", e);
// Exception is also handled by uncaught exception handler
}
});
}
// Scheduled task executor with custom threads
private final ScheduledExecutorService scheduledExecutor =
Executors.newScheduledThreadPool(5, new BasicThreadFactory.Builder()
.namingPattern("scheduled-task-%d")
.daemon(false) // Keep JVM alive
.priority(Thread.MAX_PRIORITY)
.build());
public void scheduleMaintenanceTask() {
scheduledExecutor.scheduleAtFixedRate(() -> {
performMaintenance();
}, 0, 1, TimeUnit.HOURS);
}
}import org.apache.commons.lang3.concurrent.Memoizer;public class MemoizerExample {
// Cache expensive computations
private final Memoizer<String, UserProfile> userProfileCache =
new Memoizer<>(userId -> {
// This computation happens only once per userId
return loadUserProfileFromDatabase(userId);
});
public UserProfile getUserProfile(String userId) throws InterruptedException, ExecutionException {
return userProfileCache.compute(userId);
}
// With custom cache implementation
private final Memoizer<CacheKey, Report> reportCache =
new Memoizer<>(
key -> generateReport(key),
new ConcurrentHashMap<>(), // Custom cache implementation
true // Recalculate on InterruptedException
);
public Report getReport(String reportType, Date startDate, Date endDate)
throws InterruptedException, ExecutionException {
CacheKey key = new CacheKey(reportType, startDate, endDate);
return reportCache.compute(key);
}
}import org.apache.commons.lang3.concurrent.TimedSemaphore;public class RateLimitingExample {
// Allow maximum 100 operations per second
private final TimedSemaphore rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 100);
public void callRateLimitedAPI(String request) throws InterruptedException {
// This will block if rate limit is exceeded
rateLimiter.acquire();
try {
apiClient.makeCall(request);
} finally {
// Permits are automatically released after the time period
}
}
// Dynamic rate adjustment
public void adjustRateLimit(int newLimit) {
rateLimiter.setLimit(newLimit);
}
// Monitor rate limiter
public RateLimiterStatus getRateLimiterStatus() {
return new RateLimiterStatus(
rateLimiter.getLimit(),
rateLimiter.getAvailablePermits(),
rateLimiter.getAcquireCount(),
rateLimiter.getPeriod(),
rateLimiter.getUnit()
);
}
// Shutdown
public void shutdown() {
rateLimiter.shutdown();
}
}@Configuration
@EnableAsync
public class ConcurrentConfiguration {
@Bean
public ThreadFactory taskThreadFactory() {
return new BasicThreadFactory.Builder()
.namingPattern("async-task-%d")
.daemon(true)
.priority(Thread.NORM_PRIORITY)
.uncaughtExceptionHandler((thread, exception) -> {
log.error("Uncaught exception in async task thread {}", thread.getName(), exception);
})
.build();
}
@Bean
public TaskExecutor taskExecutor(ThreadFactory taskThreadFactory) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadFactory(taskThreadFactory);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean
public EventCountCircuitBreaker externalServiceCircuitBreaker() {
// 5 failures per minute opens circuit for 30 seconds
return new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);
}
}
@Service
public class ExternalServiceClient {
private final EventCountCircuitBreaker circuitBreaker;
private final RestTemplate restTemplate;
public ExternalServiceClient(EventCountCircuitBreaker circuitBreaker, RestTemplate restTemplate) {
this.circuitBreaker = circuitBreaker;
this.restTemplate = restTemplate;
}
public CompletableFuture<String> callExternalServiceAsync(String request) {
return CompletableFuture.supplyAsync(() -> {
if (!circuitBreaker.checkState()) {
throw new ServiceUnavailableException("External service circuit breaker is OPEN");
}
try {
String response = restTemplate.getForObject("/external/api?q=" + request, String.class);
circuitBreaker.close(); // Reset on success
return response;
} catch (Exception e) {
circuitBreaker.incrementAndCheckState();
throw new ExternalServiceException("External service call failed", e);
}
});
}
}@Component
public class ResilientServiceClient {
private final TimedSemaphore rateLimiter;
private final EventCountCircuitBreaker circuitBreaker;
private final Memoizer<String, ServiceResponse> cache;
public ResilientServiceClient() {
// Rate limiting: 50 requests per second
this.rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 50);
// Circuit breaker: 10 failures in 2 minutes opens for 60 seconds
this.circuitBreaker = new EventCountCircuitBreaker(10, 2, TimeUnit.MINUTES, 5, 60, TimeUnit.SECONDS);
// Response caching
this.cache = new Memoizer<>(this::callServiceWithoutProtection);
}
public ServiceResponse callService(String request) throws ServiceException {
try {
// Apply rate limiting
rateLimiter.acquire();
// Check circuit breaker
if (!circuitBreaker.checkState()) {
return getFallbackResponse(request);
}
// Use cached response if available
return cache.compute(request);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("Service call interrupted", e);
} catch (ExecutionException e) {
circuitBreaker.incrementAndCheckState();
throw new ServiceException("Service call failed", e.getCause());
}
}
private ServiceResponse callServiceWithoutProtection(String request) {
try {
ServiceResponse response = httpClient.call(request);
circuitBreaker.close(); // Reset on success
return response;
} catch (Exception e) {
throw new RuntimeException("Service call failed", e);
}
}
private ServiceResponse getFallbackResponse(String request) {
return ServiceResponse.fallback("Service temporarily unavailable");
}
@PreDestroy
public void shutdown() {
rateLimiter.shutdown();
}
}@Component
public class ApplicationStartupCoordinator {
private final MultiBackgroundInitializer startupInitializer;
private volatile boolean applicationReady = false;
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() throws ConcurrentException {
log.info("Starting background initialization...");
MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();
// Database initialization
initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {
@Override
protected DataSource initialize() throws Exception {
log.info("Initializing database connections...");
return dataSourceFactory.createDataSource();
}
});
// Cache warming
initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {
@Override
protected CacheManager initialize() throws Exception {
log.info("Warming up caches...");
CacheManager cacheManager = createCacheManager();
cacheManager.preloadData();
return cacheManager;
}
});
// External service health check
initializer.addInitializer("external-services", new BackgroundInitializer<HealthCheckResults>() {
@Override
protected HealthCheckResults initialize() throws Exception {
log.info("Checking external service health...");
return healthChecker.checkAllServices();
}
});
// Start all background tasks
MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();
// Wait for completion and handle results
handleInitializationResults(results);
applicationReady = true;
log.info("Application startup completed successfully");
}
private void handleInitializationResults(MultiBackgroundInitializer.MultiBackgroundInitializerResults results) {
for (String taskName : Arrays.asList("database", "cache", "external-services")) {
if (results.isException(taskName)) {
Exception exception = results.getException(taskName);
log.error("Initialization failed for {}: {}", taskName, exception.getMessage(), exception);
// Decide whether to fail startup or continue
if ("database".equals(taskName)) {
throw new ApplicationStartupException("Critical component failed: " + taskName, exception);
} else {
log.warn("Non-critical component failed, continuing startup: {}", taskName);
}
} else {
log.info("Successfully initialized: {}", taskName);
}
}
}
@EventListener(ContextClosedEvent.class)
public void onApplicationShutdown() {
if (startupInitializer != null) {
// Graceful shutdown of background tasks
try {
startupInitializer.shutdown();
} catch (Exception e) {
log.warn("Error during shutdown: {}", e.getMessage(), e);
}
}
}
public boolean isApplicationReady() {
return applicationReady;
}
}The concurrent utilities in Apache Commons Lang provide essential building blocks for creating robust, thread-safe applications with proper initialization patterns, fault tolerance, and resource management that scale well under concurrent load.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-commons--commons-lang3