or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

array-utilities.mdbuilders.mdconcurrent-utilities.mddate-time-utilities.mdexception-utilities.mdindex.mdmath-utilities.mdobject-utilities.mdstring-utilities.mdvalidation-utilities.md

concurrent-utilities.mddocs/

0

# Concurrent Utilities

1

2

Apache Commons Lang provides robust concurrency utilities for thread-safe initialization, background processing, circuit breakers, and fault tolerance. These utilities simplify concurrent programming patterns while providing proper error handling and resource management.

3

4

## Core Concurrent Classes

5

6

### ConcurrentUtils - General Concurrency Utilities

7

8

Provides utility methods for safe concurrent operations and exception handling:

9

10

```java { .api }

11

import org.apache.commons.lang3.concurrent.ConcurrentUtils;

12

```

13

14

#### Safe Initialization Patterns

15

16

```java { .api }

17

// Safe initialization with exception handling

18

public static <T> T initialize(ConcurrentInitializer<T> initializer) throws ConcurrentException

19

public static <T> T initializeUnchecked(ConcurrentInitializer<T> initializer)

20

21

// Exception handling utilities

22

public static ConcurrentException extractCause(ExecutionException ex)

23

public static ConcurrentRuntimeException extractCauseUnchecked(ExecutionException ex)

24

public static void handleCause(ExecutionException ex) throws ConcurrentException

25

public static void handleCauseUnchecked(ExecutionException ex)

26

27

// Map operations

28

public static <K, V> V createIfAbsent(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init) throws ConcurrentException

29

public static <K, V> V createIfAbsentUnchecked(ConcurrentMap<K, V> map, K key, ConcurrentInitializer<V> init)

30

public static <K, V> V putIfAbsent(ConcurrentMap<K, V> map, K key, V value)

31

```

32

33

**Usage Examples:**

34

```java { .api }

35

public class ConcurrentUtilsExamples {

36

37

private final ConcurrentMap<String, DatabaseConnection> connections = new ConcurrentHashMap<>();

38

39

// Safe map operations

40

public DatabaseConnection getConnection(String name) throws ConcurrentException {

41

return ConcurrentUtils.createIfAbsent(connections, name, () -> {

42

// This initialization happens only once per key

43

return DatabaseConnectionFactory.create(name);

44

});

45

}

46

47

// Unchecked version (wraps exceptions in runtime exceptions)

48

public DatabaseConnection getConnectionUnchecked(String name) {

49

return ConcurrentUtils.createIfAbsentUnchecked(connections, name, () -> {

50

return DatabaseConnectionFactory.create(name);

51

});

52

}

53

54

// Safe Future handling

55

public <T> T getResultSafely(Future<T> future) throws ConcurrentException {

56

try {

57

return future.get();

58

} catch (ExecutionException e) {

59

ConcurrentUtils.handleCause(e);

60

return null; // Never reached

61

} catch (InterruptedException e) {

62

Thread.currentThread().interrupt();

63

throw new ConcurrentException("Thread interrupted", e);

64

}

65

}

66

}

67

```

68

69

### Initializer Classes - Thread-Safe Lazy Initialization

70

71

#### AtomicInitializer - Single-Use Atomic Initialization

72

73

```java { .api }

74

import org.apache.commons.lang3.concurrent.AtomicInitializer;

75

```

76

77

```java { .api }

78

public class AtomicInitializerExample {

79

80

// Expensive object that should be created only once

81

private final AtomicInitializer<DatabaseConnectionPool> poolInitializer =

82

new AtomicInitializer<DatabaseConnectionPool>() {

83

@Override

84

protected DatabaseConnectionPool initialize() throws ConcurrentException {

85

// This method is called exactly once

86

return new DatabaseConnectionPool(

87

"jdbc:postgresql://localhost/mydb",

88

"user", "password", 10

89

);

90

}

91

};

92

93

public DatabaseConnectionPool getConnectionPool() throws ConcurrentException {

94

return poolInitializer.get(); // Thread-safe, initializes only once

95

}

96

97

// Alternative using builder pattern (Java 8+)

98

private final AtomicInitializer<ConfigurationManager> configInitializer =

99

AtomicInitializer.<ConfigurationManager>builder()

100

.setInitializer(() -> ConfigurationManager.loadFromFile("config.properties"))

101

.get();

102

}

103

```

104

105

#### LazyInitializer - Thread-Safe Lazy Initialization

106

107

```java { .api }

108

import org.apache.commons.lang3.concurrent.LazyInitializer;

109

```

110

111

```java { .api }

112

public class LazyInitializerExample {

113

114

// Cache that's initialized on first access

115

private final LazyInitializer<Cache<String, Object>> cacheInitializer =

116

new LazyInitializer<Cache<String, Object>>() {

117

@Override

118

protected Cache<String, Object> initialize() throws ConcurrentException {

119

return CacheBuilder.newBuilder()

120

.maximumSize(1000)

121

.expireAfterWrite(30, TimeUnit.MINUTES)

122

.build();

123

}

124

};

125

126

public Cache<String, Object> getCache() throws ConcurrentException {

127

return cacheInitializer.get();

128

}

129

130

// Service registry example

131

private final LazyInitializer<ServiceRegistry> registryInitializer =

132

LazyInitializer.<ServiceRegistry>builder()

133

.setInitializer(() -> {

134

ServiceRegistry registry = new ServiceRegistry();

135

registry.registerService("userService", new UserServiceImpl());

136

registry.registerService("orderService", new OrderServiceImpl());

137

return registry;

138

})

139

.get();

140

}

141

```

142

143

#### ConstantInitializer - Simple Constant Value Holder

144

145

```java { .api }

146

import org.apache.commons.lang3.concurrent.ConstantInitializer;

147

```

148

149

```java { .api }

150

public class ConstantInitializerExample {

151

152

// For pre-computed values

153

private final ConcurrentInitializer<String> appNameInitializer =

154

new ConstantInitializer<>("MyApplication v1.0");

155

156

// For configuration values

157

private final ConcurrentInitializer<Integer> maxUsersInitializer =

158

new ConstantInitializer<>(Integer.parseInt(System.getProperty("max.users", "1000")));

159

160

public String getApplicationName() throws ConcurrentException {

161

return appNameInitializer.get();

162

}

163

164

public int getMaxUsers() throws ConcurrentException {

165

return maxUsersInitializer.get();

166

}

167

}

168

```

169

170

### Background Processing

171

172

#### BackgroundInitializer - Asynchronous Initialization

173

174

```java { .api }

175

import org.apache.commons.lang3.concurrent.BackgroundInitializer;

176

```

177

178

```java { .api }

179

public class BackgroundProcessingExample {

180

181

// Initialize expensive resources in background

182

private final BackgroundInitializer<SearchIndex> searchIndexInitializer =

183

new BackgroundInitializer<SearchIndex>() {

184

@Override

185

protected SearchIndex initialize() throws Exception {

186

// This runs in a background thread

187

SearchIndex index = new SearchIndex();

188

index.loadFromDatabase(); // Expensive operation

189

return index;

190

}

191

};

192

193

public void startBackgroundInitialization() {

194

// Start background initialization immediately

195

searchIndexInitializer.start();

196

}

197

198

public SearchIndex getSearchIndex() throws ConcurrentException {

199

// This will block until background initialization is complete

200

return searchIndexInitializer.get();

201

}

202

203

// Custom executor example

204

private final ExecutorService customExecutor = Executors.newFixedThreadPool(2);

205

206

private final BackgroundInitializer<ReportGenerator> reportInitializer =

207

BackgroundInitializer.<ReportGenerator>builder()

208

.setExecutor(customExecutor)

209

.setInitializer(() -> {

210

ReportGenerator generator = new ReportGenerator();

211

generator.preloadTemplates();

212

return generator;

213

})

214

.get();

215

}

216

```

217

218

#### MultiBackgroundInitializer - Multiple Background Tasks

219

220

```java { .api }

221

import org.apache.commons.lang3.concurrent.MultiBackgroundInitializer;

222

```

223

224

```java { .api }

225

public class MultiBackgroundExample {

226

227

public void initializeApplication() throws ConcurrentException {

228

MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();

229

230

// Add multiple background tasks

231

initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {

232

@Override

233

protected DataSource initialize() throws Exception {

234

return createDataSource();

235

}

236

});

237

238

initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {

239

@Override

240

protected CacheManager initialize() throws Exception {

241

return createCacheManager();

242

}

243

});

244

245

initializer.addInitializer("search", new BackgroundInitializer<SearchEngine>() {

246

@Override

247

protected SearchEngine initialize() throws Exception {

248

return createSearchEngine();

249

}

250

});

251

252

// Start all background tasks

253

MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();

254

255

// Get results (blocks until all complete)

256

DataSource dataSource = (DataSource) results.getInitializer("database").get();

257

CacheManager cacheManager = (CacheManager) results.getInitializer("cache").get();

258

SearchEngine searchEngine = (SearchEngine) results.getInitializer("search").get();

259

260

// Check for exceptions

261

if (results.isException("database")) {

262

Exception dbException = results.getException("database");

263

log.error("Database initialization failed", dbException);

264

}

265

}

266

}

267

```

268

269

### Circuit Breaker Pattern

270

271

#### EventCountCircuitBreaker - Failure Rate Protection

272

273

```java { .api }

274

import org.apache.commons.lang3.concurrent.EventCountCircuitBreaker;

275

```

276

277

```java { .api }

278

public class CircuitBreakerExample {

279

280

// Circuit breaker: max 5 failures in 1 minute, then open for 30 seconds

281

private final EventCountCircuitBreaker circuitBreaker =

282

new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);

283

284

public String callExternalService(String request) throws ServiceException {

285

// Check if circuit is open (too many recent failures)

286

if (!circuitBreaker.checkState()) {

287

throw new ServiceException("Circuit breaker is OPEN - service temporarily unavailable");

288

}

289

290

try {

291

// Attempt the potentially failing operation

292

String response = externalServiceClient.call(request);

293

294

// Reset failure count on success

295

circuitBreaker.close();

296

297

return response;

298

299

} catch (Exception e) {

300

// Record failure

301

circuitBreaker.incrementAndCheckState();

302

303

throw new ServiceException("External service call failed", e);

304

}

305

}

306

307

// Monitor circuit breaker state

308

public CircuitBreakerStatus getCircuitBreakerStatus() {

309

return new CircuitBreakerStatus(

310

circuitBreaker.getState().name(),

311

circuitBreaker.getCheckInterval(),

312

circuitBreaker.getCheckIntervalUnit(),

313

circuitBreaker.getOpeningThreshold(),

314

circuitBreaker.getClosingThreshold()

315

);

316

}

317

}

318

```

319

320

#### ThresholdCircuitBreaker - Simple Threshold Protection

321

322

```java { .api }

323

import org.apache.commons.lang3.concurrent.ThresholdCircuitBreaker;

324

```

325

326

```java { .api }

327

public class ThresholdCircuitBreakerExample {

328

329

// Circuit breaker that opens after 10 failures

330

private final ThresholdCircuitBreaker circuitBreaker = new ThresholdCircuitBreaker(10);

331

332

public void processMessage(Message message) throws ProcessingException {

333

if (!circuitBreaker.checkState()) {

334

throw new ProcessingException("Message processing circuit breaker is OPEN");

335

}

336

337

try {

338

messageProcessor.process(message);

339

// Reset on successful processing

340

// Note: ThresholdCircuitBreaker doesn't auto-reset, manual reset needed

341

342

} catch (Exception e) {

343

circuitBreaker.incrementAndCheckState();

344

throw new ProcessingException("Message processing failed", e);

345

}

346

}

347

348

// Manual reset method (could be called by admin endpoint)

349

public void resetCircuitBreaker() {

350

circuitBreaker.close();

351

}

352

}

353

```

354

355

### Thread Factory and Management

356

357

#### BasicThreadFactory - Customizable Thread Creation

358

359

```java { .api }

360

import org.apache.commons.lang3.concurrent.BasicThreadFactory;

361

```

362

363

```java { .api }

364

public class ThreadFactoryExample {

365

366

// Custom thread factory with naming and daemon settings

367

private final ThreadFactory threadFactory = new BasicThreadFactory.Builder()

368

.namingPattern("worker-thread-%d")

369

.daemon(true)

370

.priority(Thread.NORM_PRIORITY)

371

.uncaughtExceptionHandler((thread, exception) -> {

372

log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage(), exception);

373

})

374

.build();

375

376

private final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

377

378

// Background task processing

379

public void submitBackgroundTask(Runnable task) {

380

executorService.submit(() -> {

381

try {

382

task.run();

383

} catch (Exception e) {

384

log.error("Background task failed", e);

385

// Exception is also handled by uncaught exception handler

386

}

387

});

388

}

389

390

// Scheduled task executor with custom threads

391

private final ScheduledExecutorService scheduledExecutor =

392

Executors.newScheduledThreadPool(5, new BasicThreadFactory.Builder()

393

.namingPattern("scheduled-task-%d")

394

.daemon(false) // Keep JVM alive

395

.priority(Thread.MAX_PRIORITY)

396

.build());

397

398

public void scheduleMaintenanceTask() {

399

scheduledExecutor.scheduleAtFixedRate(() -> {

400

performMaintenance();

401

}, 0, 1, TimeUnit.HOURS);

402

}

403

}

404

```

405

406

### Advanced Concurrent Patterns

407

408

#### Memoizer - Thread-Safe Caching

409

410

```java { .api }

411

import org.apache.commons.lang3.concurrent.Memoizer;

412

```

413

414

```java { .api }

415

public class MemoizerExample {

416

417

// Cache expensive computations

418

private final Memoizer<String, UserProfile> userProfileCache =

419

new Memoizer<>(userId -> {

420

// This computation happens only once per userId

421

return loadUserProfileFromDatabase(userId);

422

});

423

424

public UserProfile getUserProfile(String userId) throws InterruptedException, ExecutionException {

425

return userProfileCache.compute(userId);

426

}

427

428

// With custom cache implementation

429

private final Memoizer<CacheKey, Report> reportCache =

430

new Memoizer<>(

431

key -> generateReport(key),

432

new ConcurrentHashMap<>(), // Custom cache implementation

433

true // Recalculate on InterruptedException

434

);

435

436

public Report getReport(String reportType, Date startDate, Date endDate)

437

throws InterruptedException, ExecutionException {

438

CacheKey key = new CacheKey(reportType, startDate, endDate);

439

return reportCache.compute(key);

440

}

441

}

442

```

443

444

#### TimedSemaphore - Rate Limiting

445

446

```java { .api }

447

import org.apache.commons.lang3.concurrent.TimedSemaphore;

448

```

449

450

```java { .api }

451

public class RateLimitingExample {

452

453

// Allow maximum 100 operations per second

454

private final TimedSemaphore rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 100);

455

456

public void callRateLimitedAPI(String request) throws InterruptedException {

457

// This will block if rate limit is exceeded

458

rateLimiter.acquire();

459

460

try {

461

apiClient.makeCall(request);

462

} finally {

463

// Permits are automatically released after the time period

464

}

465

}

466

467

// Dynamic rate adjustment

468

public void adjustRateLimit(int newLimit) {

469

rateLimiter.setLimit(newLimit);

470

}

471

472

// Monitor rate limiter

473

public RateLimiterStatus getRateLimiterStatus() {

474

return new RateLimiterStatus(

475

rateLimiter.getLimit(),

476

rateLimiter.getAvailablePermits(),

477

rateLimiter.getAcquireCount(),

478

rateLimiter.getPeriod(),

479

rateLimiter.getUnit()

480

);

481

}

482

483

// Shutdown

484

public void shutdown() {

485

rateLimiter.shutdown();

486

}

487

}

488

```

489

490

## Real-World Integration Examples

491

492

### Spring Boot Integration

493

494

```java { .api }

495

@Configuration

496

@EnableAsync

497

public class ConcurrentConfiguration {

498

499

@Bean

500

public ThreadFactory taskThreadFactory() {

501

return new BasicThreadFactory.Builder()

502

.namingPattern("async-task-%d")

503

.daemon(true)

504

.priority(Thread.NORM_PRIORITY)

505

.uncaughtExceptionHandler((thread, exception) -> {

506

log.error("Uncaught exception in async task thread {}", thread.getName(), exception);

507

})

508

.build();

509

}

510

511

@Bean

512

public TaskExecutor taskExecutor(ThreadFactory taskThreadFactory) {

513

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

514

executor.setCorePoolSize(10);

515

executor.setMaxPoolSize(20);

516

executor.setQueueCapacity(100);

517

executor.setThreadFactory(taskThreadFactory);

518

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

519

executor.initialize();

520

return executor;

521

}

522

523

@Bean

524

public EventCountCircuitBreaker externalServiceCircuitBreaker() {

525

// 5 failures per minute opens circuit for 30 seconds

526

return new EventCountCircuitBreaker(5, 1, TimeUnit.MINUTES, 3, 30, TimeUnit.SECONDS);

527

}

528

}

529

530

@Service

531

public class ExternalServiceClient {

532

533

private final EventCountCircuitBreaker circuitBreaker;

534

private final RestTemplate restTemplate;

535

536

public ExternalServiceClient(EventCountCircuitBreaker circuitBreaker, RestTemplate restTemplate) {

537

this.circuitBreaker = circuitBreaker;

538

this.restTemplate = restTemplate;

539

}

540

541

public CompletableFuture<String> callExternalServiceAsync(String request) {

542

return CompletableFuture.supplyAsync(() -> {

543

if (!circuitBreaker.checkState()) {

544

throw new ServiceUnavailableException("External service circuit breaker is OPEN");

545

}

546

547

try {

548

String response = restTemplate.getForObject("/external/api?q=" + request, String.class);

549

circuitBreaker.close(); // Reset on success

550

return response;

551

} catch (Exception e) {

552

circuitBreaker.incrementAndCheckState();

553

throw new ExternalServiceException("External service call failed", e);

554

}

555

});

556

}

557

}

558

```

559

560

### Microservice Resilience Pattern

561

562

```java { .api }

563

@Component

564

public class ResilientServiceClient {

565

566

private final TimedSemaphore rateLimiter;

567

private final EventCountCircuitBreaker circuitBreaker;

568

private final Memoizer<String, ServiceResponse> cache;

569

570

public ResilientServiceClient() {

571

// Rate limiting: 50 requests per second

572

this.rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, 50);

573

574

// Circuit breaker: 10 failures in 2 minutes opens for 60 seconds

575

this.circuitBreaker = new EventCountCircuitBreaker(10, 2, TimeUnit.MINUTES, 5, 60, TimeUnit.SECONDS);

576

577

// Response caching

578

this.cache = new Memoizer<>(this::callServiceWithoutProtection);

579

}

580

581

public ServiceResponse callService(String request) throws ServiceException {

582

try {

583

// Apply rate limiting

584

rateLimiter.acquire();

585

586

// Check circuit breaker

587

if (!circuitBreaker.checkState()) {

588

return getFallbackResponse(request);

589

}

590

591

// Use cached response if available

592

return cache.compute(request);

593

594

} catch (InterruptedException e) {

595

Thread.currentThread().interrupt();

596

throw new ServiceException("Service call interrupted", e);

597

} catch (ExecutionException e) {

598

circuitBreaker.incrementAndCheckState();

599

throw new ServiceException("Service call failed", e.getCause());

600

}

601

}

602

603

private ServiceResponse callServiceWithoutProtection(String request) {

604

try {

605

ServiceResponse response = httpClient.call(request);

606

circuitBreaker.close(); // Reset on success

607

return response;

608

} catch (Exception e) {

609

throw new RuntimeException("Service call failed", e);

610

}

611

}

612

613

private ServiceResponse getFallbackResponse(String request) {

614

return ServiceResponse.fallback("Service temporarily unavailable");

615

}

616

617

@PreDestroy

618

public void shutdown() {

619

rateLimiter.shutdown();

620

}

621

}

622

```

623

624

### Application Startup Coordinator

625

626

```java { .api }

627

@Component

628

public class ApplicationStartupCoordinator {

629

630

private final MultiBackgroundInitializer startupInitializer;

631

private volatile boolean applicationReady = false;

632

633

@EventListener(ApplicationStartedEvent.class)

634

public void onApplicationStarted() throws ConcurrentException {

635

log.info("Starting background initialization...");

636

637

MultiBackgroundInitializer initializer = new MultiBackgroundInitializer();

638

639

// Database initialization

640

initializer.addInitializer("database", new BackgroundInitializer<DataSource>() {

641

@Override

642

protected DataSource initialize() throws Exception {

643

log.info("Initializing database connections...");

644

return dataSourceFactory.createDataSource();

645

}

646

});

647

648

// Cache warming

649

initializer.addInitializer("cache", new BackgroundInitializer<CacheManager>() {

650

@Override

651

protected CacheManager initialize() throws Exception {

652

log.info("Warming up caches...");

653

CacheManager cacheManager = createCacheManager();

654

cacheManager.preloadData();

655

return cacheManager;

656

}

657

});

658

659

// External service health check

660

initializer.addInitializer("external-services", new BackgroundInitializer<HealthCheckResults>() {

661

@Override

662

protected HealthCheckResults initialize() throws Exception {

663

log.info("Checking external service health...");

664

return healthChecker.checkAllServices();

665

}

666

});

667

668

// Start all background tasks

669

MultiBackgroundInitializer.MultiBackgroundInitializerResults results = initializer.start();

670

671

// Wait for completion and handle results

672

handleInitializationResults(results);

673

674

applicationReady = true;

675

log.info("Application startup completed successfully");

676

}

677

678

private void handleInitializationResults(MultiBackgroundInitializer.MultiBackgroundInitializerResults results) {

679

for (String taskName : Arrays.asList("database", "cache", "external-services")) {

680

if (results.isException(taskName)) {

681

Exception exception = results.getException(taskName);

682

log.error("Initialization failed for {}: {}", taskName, exception.getMessage(), exception);

683

684

// Decide whether to fail startup or continue

685

if ("database".equals(taskName)) {

686

throw new ApplicationStartupException("Critical component failed: " + taskName, exception);

687

} else {

688

log.warn("Non-critical component failed, continuing startup: {}", taskName);

689

}

690

} else {

691

log.info("Successfully initialized: {}", taskName);

692

}

693

}

694

}

695

696

@EventListener(ContextClosedEvent.class)

697

public void onApplicationShutdown() {

698

if (startupInitializer != null) {

699

// Graceful shutdown of background tasks

700

try {

701

startupInitializer.shutdown();

702

} catch (Exception e) {

703

log.warn("Error during shutdown: {}", e.getMessage(), e);

704

}

705

}

706

}

707

708

public boolean isApplicationReady() {

709

return applicationReady;

710

}

711

}

712

```

713

714

The concurrent utilities in Apache Commons Lang provide essential building blocks for creating robust, thread-safe applications with proper initialization patterns, fault tolerance, and resource management that scale well under concurrent load.