or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

basic-utilities.mdcaching.mdcollections.mdconcurrency.mdgraph-api.mdhash-math.mdimmutable-collections.mdindex.mdio-utilities.mdother-utilities.md

concurrency.mddocs/

0

# Concurrency

1

2

Enhanced concurrency utilities including listenable futures, rate limiting, improved executors, and synchronization primitives that extend Java's concurrent programming capabilities.

3

4

## Package: com.google.common.util.concurrent

5

6

### ListenableFuture

7

8

Enhanced Future that allows listeners to be attached for completion notification, enabling reactive programming patterns.

9

10

```java { .api }

11

import com.google.common.util.concurrent.ListenableFuture;

12

import com.google.common.util.concurrent.Futures;

13

import com.google.common.util.concurrent.FutureCallback;

14

import com.google.common.util.concurrent.MoreExecutors;

15

16

// Creating immediate futures

17

ListenableFuture<String> successful = Futures.immediateFuture("result");

18

ListenableFuture<String> failed = Futures.immediateFailedFuture(new RuntimeException("error"));

19

20

// Adding callbacks for completion notification

21

ListenableFuture<String> future = asyncOperation();

22

Futures.addCallback(future, new FutureCallback<String>() {

23

@Override

24

public void onSuccess(String result) {

25

System.out.println("Operation succeeded: " + result);

26

}

27

28

@Override

29

public void onFailure(Throwable t) {

30

System.err.println("Operation failed: " + t.getMessage());

31

}

32

}, MoreExecutors.directExecutor());

33

34

// Chaining operations with transformation

35

ListenableFuture<String> input = getStringAsync();

36

ListenableFuture<Integer> length = Futures.transform(input,

37

new Function<String, Integer>() {

38

@Override

39

public Integer apply(String s) {

40

return s.length();

41

}

42

}, executor);

43

44

// Asynchronous transformation (returns another ListenableFuture)

45

ListenableFuture<String> input2 = getKeyAsync();

46

ListenableFuture<String> result = Futures.transformAsync(input2,

47

new AsyncFunction<String, String>() {

48

@Override

49

public ListenableFuture<String> apply(String key) throws Exception {

50

return lookupValueAsync(key); // Returns ListenableFuture<String>

51

}

52

}, executor);

53

```

54

55

### Combining Futures

56

57

Utilities for combining multiple futures into aggregate operations.

58

59

```java { .api }

60

import com.google.common.collect.ImmutableList;

61

62

// Combine multiple futures into a list

63

ListenableFuture<String> future1 = getAsync("key1");

64

ListenableFuture<String> future2 = getAsync("key2");

65

ListenableFuture<String> future3 = getAsync("key3");

66

67

ListenableFuture<List<String>> combined = Futures.allAsList(future1, future2, future3);

68

// Result list contains values in same order as input futures

69

70

// Successful futures only (failures ignored)

71

ListenableFuture<List<String>> successful = Futures.successfulAsList(future1, future2, future3);

72

// Failed futures contribute null to result list

73

74

// Immediate values in list

75

List<ListenableFuture<String>> futures = ImmutableList.of(future1, future2, future3);

76

ListenableFuture<List<String>> fromList = Futures.allAsList(futures);

77

78

// Combining with custom logic

79

ListenableFuture<String> combined2 = Futures.whenAllSucceed(future1, future2, future3)

80

.call(new Callable<String>() {

81

@Override

82

public String call() throws Exception {

83

// All futures completed successfully

84

String result1 = Futures.getDone(future1);

85

String result2 = Futures.getDone(future2);

86

String result3 = Futures.getDone(future3);

87

return result1 + result2 + result3;

88

}

89

}, executor);

90

91

// First successful result

92

ListenableFuture<String> firstSuccessful = Futures.whenAllComplete(future1, future2, future3)

93

.call(new Callable<String>() {

94

@Override

95

public String call() throws Exception {

96

// Return first non-null result

97

for (ListenableFuture<String> future : Arrays.asList(future1, future2, future3)) {

98

try {

99

String result = Futures.getDone(future);

100

if (result != null) {

101

return result;

102

}

103

} catch (Exception ignored) {

104

// Continue to next future

105

}

106

}

107

throw new RuntimeException("All futures failed or returned null");

108

}

109

}, executor);

110

```

111

112

### Exception Handling with Futures

113

114

Robust error handling and recovery patterns for asynchronous operations.

115

116

```java { .api }

117

// Catching specific exceptions

118

ListenableFuture<String> risky = riskyOperation();

119

ListenableFuture<String> withFallback = Futures.catching(risky,

120

IOException.class,

121

new Function<IOException, String>() {

122

@Override

123

public String apply(IOException e) {

124

return "fallback-value"; // Recover from IOException

125

}

126

}, executor);

127

128

// Asynchronous exception handling

129

ListenableFuture<String> withAsyncFallback = Futures.catchingAsync(risky,

130

IOException.class,

131

new AsyncFunction<IOException, String>() {

132

@Override

133

public ListenableFuture<String> apply(IOException e) throws Exception {

134

return getFallbackValueAsync(); // Async recovery

135

}

136

}, executor);

137

138

// Multiple exception types

139

ListenableFuture<String> multiCatch = Futures.catching(

140

Futures.catching(risky, IOException.class, ioFallback, executor),

141

TimeoutException.class, timeoutFallback, executor);

142

143

// Transform exceptions

144

ListenableFuture<String> transformed = Futures.transform(risky,

145

new Function<String, String>() {

146

@Override

147

public String apply(String input) {

148

if (input == null) {

149

throw new IllegalStateException("Unexpected null result");

150

}

151

return input.toUpperCase();

152

}

153

}, executor);

154

```

155

156

### Timeouts and Scheduling

157

158

Adding timeouts and scheduling capabilities to futures.

159

160

```java { .api }

161

import java.util.concurrent.ScheduledExecutorService;

162

import java.util.concurrent.Executors;

163

164

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

165

166

// Add timeout to future

167

ListenableFuture<String> slow = slowOperation();

168

ListenableFuture<String> withTimeout = Futures.withTimeout(slow, 30, TimeUnit.SECONDS, scheduler);

169

170

// Schedule async operation

171

ListenableFuture<String> scheduled = Futures.scheduleAsync(

172

new AsyncCallable<String>() {

173

@Override

174

public ListenableFuture<String> call() throws Exception {

175

return performScheduledTask();

176

}

177

}, 5, TimeUnit.SECONDS, scheduler);

178

179

// Submit callable to executor

180

ListeningExecutorService executor = MoreExecutors.listeningDecorator(

181

Executors.newFixedThreadPool(10));

182

183

ListenableFuture<String> submitted = Futures.submit(new Callable<String>() {

184

@Override

185

public String call() throws Exception {

186

return "computed result";

187

}

188

}, executor);

189

```

190

191

### ListeningExecutorService

192

193

ExecutorService that returns ListenableFuture instances instead of regular Futures.

194

195

```java { .api }

196

import com.google.common.util.concurrent.ListeningExecutorService;

197

import com.google.common.util.concurrent.MoreExecutors;

198

import java.util.concurrent.Executors;

199

200

// Create ListeningExecutorService

201

ExecutorService threadPool = Executors.newFixedThreadPool(10);

202

ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(threadPool);

203

204

// Submit tasks that return ListenableFuture

205

ListenableFuture<String> future = listeningExecutor.submit(new Callable<String>() {

206

@Override

207

public String call() throws Exception {

208

return "task result";

209

}

210

});

211

212

ListenableFuture<Void> voidFuture = listeningExecutor.submit(new Runnable() {

213

@Override

214

public void run() {

215

performTask();

216

}

217

}, null);

218

219

// Direct executor (executes immediately on calling thread)

220

ListeningExecutorService directExecutor = MoreExecutors.newDirectExecutorService();

221

222

// Same-thread executor (for testing)

223

Executor sameThreadExecutor = MoreExecutors.directExecutor();

224

225

// Graceful shutdown utility

226

MoreExecutors.shutdownAndAwaitTermination(threadPool, 60, TimeUnit.SECONDS);

227

```

228

229

### RateLimiter

230

231

Controls the rate at which operations can be performed, useful for throttling API calls or resource access.

232

233

```java { .api }

234

import com.google.common.util.concurrent.RateLimiter;

235

236

// Create rate limiter (5 permits per second)

237

RateLimiter rateLimiter = RateLimiter.create(5.0);

238

239

// Acquire permits (blocking)

240

rateLimiter.acquire(); // Acquire 1 permit

241

rateLimiter.acquire(3); // Acquire 3 permits

242

243

// Try to acquire permits (non-blocking)

244

boolean acquired = rateLimiter.tryAcquire(); // Try to acquire 1 permit

245

boolean acquired3 = rateLimiter.tryAcquire(3); // Try to acquire 3 permits

246

boolean acquiredWithTimeout = rateLimiter.tryAcquire(2, TimeUnit.SECONDS); // With timeout

247

248

// Dynamic rate adjustment

249

rateLimiter.setRate(10.0); // Change to 10 permits per second

250

double currentRate = rateLimiter.getRate();

251

252

// Bursty rate limiter (allows bursts up to specified amount)

253

RateLimiter bursty = RateLimiter.create(2.0, 5, TimeUnit.SECONDS);

254

// 2 permits/second, but can accumulate up to 10 permits over 5 seconds

255

256

// Practical usage example

257

public class ApiClient {

258

private final RateLimiter rateLimiter = RateLimiter.create(10.0); // 10 requests/second

259

260

public String makeApiCall(String endpoint) throws Exception {

261

rateLimiter.acquire(); // Wait for permit

262

return httpClient.get(endpoint);

263

}

264

265

public Optional<String> tryApiCall(String endpoint) throws Exception {

266

if (rateLimiter.tryAcquire(1, TimeUnit.SECONDS)) {

267

return Optional.of(httpClient.get(endpoint));

268

} else {

269

return Optional.absent(); // Rate limit exceeded

270

}

271

}

272

}

273

```

274

275

### Monitor

276

277

Synchronization primitive with boolean conditions, providing more flexible locking than traditional synchronized blocks.

278

279

```java { .api }

280

import com.google.common.util.concurrent.Monitor;

281

282

public class BoundedBuffer<T> {

283

private final Monitor monitor = new Monitor();

284

private final Monitor.Guard notEmpty = new Monitor.Guard(monitor) {

285

@Override

286

public boolean isSatisfied() {

287

return size > 0;

288

}

289

};

290

private final Monitor.Guard notFull = new Monitor.Guard(monitor) {

291

@Override

292

public boolean isSatisfied() {

293

return size < capacity;

294

}

295

};

296

297

private final T[] buffer;

298

private final int capacity;

299

private int size = 0;

300

private int head = 0;

301

private int tail = 0;

302

303

@SuppressWarnings("unchecked")

304

public BoundedBuffer(int capacity) {

305

this.capacity = capacity;

306

this.buffer = (T[]) new Object[capacity];

307

}

308

309

public void put(T item) throws InterruptedException {

310

monitor.enterWhen(notFull); // Wait until buffer not full

311

try {

312

buffer[tail] = item;

313

tail = (tail + 1) % capacity;

314

size++;

315

} finally {

316

monitor.leave();

317

}

318

}

319

320

public T take() throws InterruptedException {

321

monitor.enterWhen(notEmpty); // Wait until buffer not empty

322

try {

323

T item = buffer[head];

324

buffer[head] = null;

325

head = (head + 1) % capacity;

326

size--;

327

return item;

328

} finally {

329

monitor.leave();

330

}

331

}

332

333

public boolean tryPut(T item, long timeout, TimeUnit unit) throws InterruptedException {

334

if (monitor.enterWhen(notFull, timeout, unit)) {

335

try {

336

buffer[tail] = item;

337

tail = (tail + 1) % capacity;

338

size++;

339

return true;

340

} finally {

341

monitor.leave();

342

}

343

}

344

return false; // Timeout

345

}

346

347

public T tryTake(long timeout, TimeUnit unit) throws InterruptedException {

348

if (monitor.enterWhen(notEmpty, timeout, unit)) {

349

try {

350

T item = buffer[head];

351

buffer[head] = null;

352

head = (head + 1) % capacity;

353

size--;

354

return item;

355

} finally {

356

monitor.leave();

357

}

358

}

359

return null; // Timeout

360

}

361

}

362

```

363

364

### Service Framework

365

366

Framework for managing application services with lifecycle management and state transitions.

367

368

```java { .api }

369

import com.google.common.util.concurrent.Service;

370

import com.google.common.util.concurrent.AbstractService;

371

import com.google.common.util.concurrent.ServiceManager;

372

373

// Custom service implementation

374

public class DatabaseService extends AbstractService {

375

private DatabaseConnection connection;

376

377

@Override

378

protected void doStart() {

379

try {

380

connection = new DatabaseConnection();

381

connection.connect();

382

notifyStarted(); // Signal successful start

383

} catch (Exception e) {

384

notifyFailed(e); // Signal start failure

385

}

386

}

387

388

@Override

389

protected void doStop() {

390

try {

391

if (connection != null) {

392

connection.close();

393

}

394

notifyStopped(); // Signal successful stop

395

} catch (Exception e) {

396

notifyFailed(e); // Signal stop failure

397

}

398

}

399

400

public void executeQuery(String sql) {

401

// Service must be running to execute operations

402

checkRunning();

403

connection.execute(sql);

404

}

405

406

private void checkRunning() {

407

if (state() != State.RUNNING) {

408

throw new IllegalStateException("Service is not running: " + state());

409

}

410

}

411

}

412

413

// Service management

414

Service databaseService = new DatabaseService();

415

Service webService = new WebService();

416

Service cacheService = new CacheService();

417

418

// Individual service management

419

databaseService.startAsync();

420

databaseService.awaitRunning(30, TimeUnit.SECONDS); // Wait for start

421

422

// Service state monitoring

423

Service.State state = databaseService.state();

424

boolean isRunning = (state == Service.State.RUNNING);

425

426

// Add state listeners

427

databaseService.addListener(new Service.Listener() {

428

@Override

429

public void starting() {

430

System.out.println("Database service starting...");

431

}

432

433

@Override

434

public void running() {

435

System.out.println("Database service is running");

436

}

437

438

@Override

439

public void stopping(Service.State from) {

440

System.out.println("Database service stopping from state: " + from);

441

}

442

443

@Override

444

public void terminated(Service.State from) {

445

System.out.println("Database service terminated from state: " + from);

446

}

447

448

@Override

449

public void failed(Service.State from, Throwable failure) {

450

System.err.println("Database service failed: " + failure.getMessage());

451

}

452

}, MoreExecutors.directExecutor());

453

454

// Managing multiple services

455

ServiceManager serviceManager = new ServiceManager(

456

Arrays.asList(databaseService, webService, cacheService));

457

458

// Start all services

459

serviceManager.startAsync();

460

serviceManager.awaitHealthy(60, TimeUnit.SECONDS); // Wait for all to be healthy

461

462

// Stop all services

463

serviceManager.stopAsync();

464

serviceManager.awaitStopped(30, TimeUnit.SECONDS);

465

466

// Service dependencies and health checks

467

Map<Service, Long> startupTimes = serviceManager.startupTimes();

468

ImmutableMultimap<Service.State, Service> servicesByState = serviceManager.servicesByState();

469

```

470

471

### Striped Locks

472

473

Provides a set of locks that can be used to stripe synchronization across multiple objects.

474

475

```java { .api }

476

import com.google.common.util.concurrent.Striped;

477

import java.util.concurrent.locks.Lock;

478

import java.util.concurrent.locks.ReadWriteLock;

479

480

// Striped locks for better concurrency

481

Striped<Lock> striped = Striped.lock(16); // 16 locks in the stripe

482

Striped<ReadWriteLock> readWriteStriped = Striped.readWriteLock(16);

483

484

// Use with object keys

485

public class StripedCounter {

486

private final ConcurrentMap<String, AtomicInteger> counters = new ConcurrentHashMap<>();

487

private final Striped<Lock> striped = Striped.lock(32);

488

489

public void increment(String key) {

490

Lock lock = striped.get(key); // Get lock for this key

491

lock.lock();

492

try {

493

AtomicInteger counter = counters.get(key);

494

if (counter == null) {

495

counter = new AtomicInteger(0);

496

counters.put(key, counter);

497

}

498

counter.incrementAndGet();

499

} finally {

500

lock.unlock();

501

}

502

}

503

504

public int get(String key) {

505

Lock lock = striped.get(key);

506

lock.lock();

507

try {

508

AtomicInteger counter = counters.get(key);

509

return counter != null ? counter.get() : 0;

510

} finally {

511

lock.unlock();

512

}

513

}

514

}

515

516

// Bulk operations with ordered locking (avoids deadlocks)

517

public void transfer(String fromKey, String toKey, int amount) {

518

Iterable<Lock> locks = striped.bulkGet(Arrays.asList(fromKey, toKey));

519

for (Lock lock : locks) {

520

lock.lock();

521

}

522

try {

523

// Perform transfer operation

524

AtomicInteger from = counters.get(fromKey);

525

AtomicInteger to = counters.get(toKey);

526

if (from != null && from.get() >= amount) {

527

from.addAndGet(-amount);

528

if (to == null) {

529

counters.put(toKey, new AtomicInteger(amount));

530

} else {

531

to.addAndGet(amount);

532

}

533

}

534

} finally {

535

// Release in reverse order

536

List<Lock> locksList = Lists.newArrayList(locks);

537

Collections.reverse(locksList);

538

for (Lock lock : locksList) {

539

lock.unlock();

540

}

541

}

542

}

543

```

544

545

### Atomic Operations and Utilities

546

547

Enhanced atomic operations and utilities for concurrent programming.

548

549

```java { .api }

550

import com.google.common.util.concurrent.AtomicDouble;

551

import com.google.common.util.concurrent.AtomicLongMap;

552

import com.google.common.util.concurrent.Uninterruptibles;

553

554

// Atomic operations on doubles

555

AtomicDouble atomicDouble = new AtomicDouble(0.0);

556

double result = atomicDouble.addAndGet(3.14);

557

atomicDouble.compareAndSet(3.14, 2.71);

558

559

// Atomic map for counters

560

AtomicLongMap<String> counters = AtomicLongMap.create();

561

long count = counters.incrementAndGet("requests"); // Atomic increment

562

counters.addAndGet("bytes", 1024); // Atomic add

563

long total = counters.get("requests"); // Get current value

564

Map<String, Long> snapshot = counters.asMap(); // Snapshot of all values

565

566

// Uninterruptible operations

567

public void robustOperation() {

568

// Sleep that can't be interrupted

569

Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);

570

571

// Join that can't be interrupted

572

Thread workerThread = new Thread(task);

573

workerThread.start();

574

Uninterruptibles.joinUninterruptibly(workerThread);

575

576

// Future get that can't be interrupted

577

Future<String> future = executor.submit(callable);

578

String result = Uninterruptibles.getUninterruptibly(future);

579

580

// Future get with timeout that can't be interrupted

581

String resultWithTimeout = Uninterruptibles.getUninterruptibly(

582

future, 30, TimeUnit.SECONDS);

583

}

584

```

585

586

### Testing Concurrent Code

587

588

Utilities for testing concurrent code and timing behavior.

589

590

```java { .api }

591

import com.google.common.testing.FakeTicker;

592

import com.google.common.base.Ticker;

593

594

// Testing with fake time

595

public class ConcurrentServiceTest {

596

597

@Test

598

public void testRateLimiting() {

599

FakeTicker ticker = new FakeTicker();

600

RateLimiter rateLimiter = RateLimiter.create(1.0, ticker); // 1 permit/second

601

602

// Should allow first request immediately

603

assertTrue(rateLimiter.tryAcquire());

604

605

// Should deny second request immediately

606

assertFalse(rateLimiter.tryAcquire());

607

608

// Advance time by 1 second

609

ticker.advance(1, TimeUnit.SECONDS);

610

611

// Should allow request after time advance

612

assertTrue(rateLimiter.tryAcquire());

613

}

614

615

@Test

616

public void testServiceLifecycle() throws Exception {

617

TestService service = new TestService();

618

619

// Test initial state

620

assertEquals(Service.State.NEW, service.state());

621

622

// Test successful start

623

service.startAsync();

624

service.awaitRunning(1, TimeUnit.SECONDS);

625

assertEquals(Service.State.RUNNING, service.state());

626

627

// Test successful stop

628

service.stopAsync();

629

service.awaitTerminated(1, TimeUnit.SECONDS);

630

assertEquals(Service.State.TERMINATED, service.state());

631

}

632

}

633

```

634

635

Guava's concurrency utilities provide powerful abstractions for asynchronous programming, resource management, and thread coordination that go well beyond Java's standard concurrency libraries while maintaining compatibility and ease of use.