or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

collections.mdconfiguration.mddata-structures.mdindex.mdmessaging.mdreactive-async.mdsynchronization.md

reactive-async.mddocs/

0

# Reactive and Async APIs

1

2

Redisson provides comprehensive support for non-blocking, reactive programming with both Reactive Streams and RxJava interfaces. All synchronous operations have corresponding asynchronous variants that return futures or reactive streams.

3

4

## Capabilities

5

6

### Client Interfaces

7

8

Access to reactive and RxJava client interfaces for non-blocking operations.

9

10

```java { .api }

11

/**

12

* Get reactive streams client interface

13

* @return RedissonReactiveClient for reactive programming with Reactive Streams

14

*/

15

public RedissonReactiveClient reactive();

16

17

/**

18

* Get RxJava client interface

19

* @return RedissonRxClient for RxJava programming

20

*/

21

public RedissonRxClient rxJava();

22

```

23

24

**Reactive Client Interfaces:**

25

26

```java { .api }

27

// Reactive Streams client interface

28

public interface RedissonReactiveClient {

29

// Collections - return reactive variants

30

<K, V> RMapReactive<K, V> getMap(String name);

31

<V> RListReactive<V> getList(String name);

32

<V> RSetReactive<V> getSet(String name);

33

<V> RQueueReactive<V> getQueue(String name);

34

35

// Locks and synchronization - return reactive variants

36

RLockReactive getLock(String name);

37

RSemaphoreReactive getSemaphore(String name);

38

RCountDownLatchReactive getCountDownLatch(String name);

39

40

// Atomic operations - return reactive variants

41

RAtomicLongReactive getAtomicLong(String name);

42

RAtomicDoubleReactive getAtomicDouble(String name);

43

44

// Topics - return reactive variants

45

RTopicReactive getTopic(String name);

46

RPatternTopicReactive getPatternTopic(String pattern);

47

48

// Lifecycle methods

49

Mono<Void> shutdown();

50

Mono<Void> shutdown(long quietPeriod, long timeout, TimeUnit unit);

51

boolean isShutdown();

52

boolean isShuttingDown();

53

}

54

55

// RxJava client interface

56

public interface RedissonRxClient {

57

// Collections - return RxJava variants

58

<K, V> RMapRx<K, V> getMap(String name);

59

<V> RListRx<V> getList(String name);

60

<V> RSetRx<V> getSet(String name);

61

<V> RQueueRx<V> getQueue(String name);

62

63

// Locks and synchronization - return RxJava variants

64

RLockRx getLock(String name);

65

RSemaphoreRx getSemaphore(String name);

66

RCountDownLatchRx getCountDownLatch(String name);

67

68

// Atomic operations - return RxJava variants

69

RAtomicLongRx getAtomicLong(String name);

70

RAtomicDoubleRx getAtomicDouble(String name);

71

72

// Topics - return RxJava variants

73

RTopicRx getTopic(String name);

74

RPatternTopicRx getPatternTopic(String pattern);

75

76

// Lifecycle methods

77

Completable shutdown();

78

Completable shutdown(long quietPeriod, long timeout, TimeUnit unit);

79

boolean isShutdown();

80

boolean isShuttingDown();

81

}

82

```

83

84

**Usage Examples:**

85

86

```java

87

import org.redisson.api.*;

88

import reactor.core.publisher.Mono;

89

import reactor.core.publisher.Flux;

90

import io.reactivex.rxjava3.core.Single;

91

import io.reactivex.rxjava3.core.Completable;

92

93

// Get reactive clients

94

RedissonReactiveClient reactiveClient = redisson.reactive();

95

RedissonRxClient rxClient = redisson.rxJava();

96

97

// Reactive streams example

98

RMapReactive<String, String> reactiveMap = reactiveClient.getMap("users");

99

Mono<String> putResult = reactiveMap.put("user1", "Alice")

100

.then(reactiveMap.get("user1"))

101

.doOnNext(value -> System.out.println("Retrieved: " + value));

102

103

// RxJava example

104

RMapRx<String, String> rxMap = rxClient.getMap("products");

105

Single<String> rxResult = rxMap.put("product1", "Laptop")

106

.andThen(rxMap.get("product1"))

107

.doOnSuccess(value -> System.out.println("Retrieved: " + value));

108

```

109

110

### Async Interfaces

111

112

All synchronous interfaces extend async counterparts that return `RFuture<T>` for non-blocking operations.

113

114

```java { .api }

115

// Base async interface

116

public interface RObjectAsync {

117

RFuture<Boolean> touchAsync();

118

RFuture<Boolean> unlinkAsync();

119

RFuture<Boolean> deleteAsync();

120

RFuture<Boolean> isExistsAsync();

121

RFuture<Void> renameAsync(String newName);

122

RFuture<Boolean> renameNXAsync(String newName);

123

RFuture<Boolean> copyAsync(String host, int port, int database, long timeout);

124

RFuture<Boolean> migrateAsync(String host, int port, int database, long timeout);

125

RFuture<Boolean> moveAsync(int database);

126

RFuture<Long> sizeInMemoryAsync();

127

RFuture<Void> restoreAsync(byte[] state);

128

RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit);

129

RFuture<Void> restoreAndReplaceAsync(byte[] state);

130

RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit);

131

RFuture<byte[]> dumpAsync();

132

}

133

134

// Async expirable interface

135

public interface RExpirableAsync extends RObjectAsync {

136

RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit);

137

RFuture<Boolean> expireAtAsync(Date timestamp);

138

RFuture<Boolean> expireAtAsync(long timestamp);

139

RFuture<Boolean> clearExpireAsync();

140

RFuture<Long> remainTimeToLiveAsync();

141

RFuture<Long> getExpireTimeAsync();

142

}

143

```

144

145

**RFuture Interface:**

146

147

```java { .api }

148

// Redisson's future interface extending Java's CompletableFuture

149

public interface RFuture<T> extends CompletableFuture<T> {

150

// Standard CompletableFuture methods available

151

boolean cancel(boolean mayInterruptIfRunning);

152

boolean isCancelled();

153

boolean isDone();

154

T get() throws InterruptedException, ExecutionException;

155

T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

156

157

// Additional async composition methods

158

RFuture<T> sync();

159

RFuture<T> syncUninterruptibly();

160

T syncUninterruptibly();

161

RFuture<T> await();

162

RFuture<T> awaitUninterruptibly();

163

boolean await(long timeout, TimeUnit unit);

164

boolean await(long timeoutMillis);

165

boolean awaitUninterruptibly(long timeout, TimeUnit unit);

166

boolean awaitUninterruptibly(long timeoutMillis);

167

168

// Success/failure callbacks

169

RFuture<T> onComplete(BiConsumer<? super T, ? super Throwable> action);

170

171

// Conversion to reactive types

172

Mono<T> toMono();

173

Single<T> toSingle();

174

}

175

```

176

177

### Reactive Map Operations

178

179

Reactive map interface with non-blocking operations returning reactive streams.

180

181

```java { .api }

182

public interface RMapReactive<K, V> extends RObjectReactive {

183

// Basic operations

184

Mono<V> put(K key, V value);

185

Mono<V> putIfAbsent(K key, V value);

186

Mono<V> get(K key);

187

Mono<V> remove(K key);

188

Mono<Boolean> containsKey(K key);

189

Mono<Boolean> containsValue(V value);

190

191

// Bulk operations

192

Mono<Integer> size();

193

Mono<Boolean> isEmpty();

194

Mono<Void> clear();

195

Flux<K> keyIterator();

196

Flux<V> valueIterator();

197

Flux<Entry<K, V>> entryIterator();

198

199

// Advanced operations

200

Mono<V> addAndGet(K key, Number delta);

201

Flux<K> readAllKeySet();

202

Flux<V> readAllValues();

203

Flux<Entry<K, V>> readAllEntrySet();

204

Mono<Map<K, V>> readAllMap();

205

206

// Fast operations

207

Mono<Long> fastPut(K key, V value);

208

Mono<Boolean> fastPutIfAbsent(K key, V value);

209

Mono<Long> fastRemove(K... keys);

210

211

// Batch operations

212

Mono<Void> putAll(Map<? extends K, ? extends V> map);

213

Mono<Map<K, V>> getAll(Set<K> keys);

214

Mono<Long> removeAll(Set<K> keys);

215

}

216

```

217

218

**Usage Examples:**

219

220

```java

221

// Reactive map operations

222

RMapReactive<String, User> userMap = reactiveClient.getMap("users");

223

224

// Chain reactive operations

225

Mono<String> pipeline = userMap.put("user1", new User("Alice", 25))

226

.then(userMap.get("user1"))

227

.map(User::getName)

228

.doOnNext(name -> System.out.println("User: " + name));

229

230

// Execute the pipeline

231

pipeline.subscribe();

232

233

// Multiple operations in sequence

234

Flux<String> userNames = userMap.putAll(Map.of(

235

"user1", new User("Alice", 25),

236

"user2", new User("Bob", 30),

237

"user3", new User("Charlie", 35)

238

))

239

.thenMany(userMap.readAllValues())

240

.map(User::getName)

241

.doOnNext(name -> System.out.println("Found user: " + name));

242

243

userNames.collectList().subscribe(names -> {

244

System.out.println("All users: " + names);

245

});

246

```

247

248

### Reactive Lock Operations

249

250

Reactive lock interface for non-blocking distributed locking.

251

252

```java { .api }

253

public interface RLockReactive extends RObjectReactive {

254

// Lock acquisition

255

Mono<Void> lock();

256

Mono<Void> lock(long leaseTime, TimeUnit unit);

257

258

// Try lock operations

259

Mono<Boolean> tryLock();

260

Mono<Boolean> tryLock(long waitTime, TimeUnit unit);

261

Mono<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);

262

263

// Lock release

264

Mono<Void> unlock();

265

Mono<Void> forceUnlock();

266

267

// Lock status

268

Mono<Boolean> isLocked();

269

Mono<Boolean> isHeldByCurrentThread();

270

Mono<Integer> getHoldCount();

271

Mono<Long> remainTimeToLive();

272

}

273

```

274

275

**Usage Examples:**

276

277

```java

278

// Reactive lock operations

279

RLockReactive lock = reactiveClient.getLock("processLock");

280

281

// Try lock with timeout and automatic release

282

Mono<String> protectedOperation = lock.tryLock(5, 30, TimeUnit.SECONDS)

283

.flatMap(acquired -> {

284

if (acquired) {

285

return performCriticalOperation()

286

.doFinally(signalType -> lock.unlock().subscribe());

287

} else {

288

return Mono.error(new RuntimeException("Could not acquire lock"));

289

}

290

});

291

292

protectedOperation.subscribe(

293

result -> System.out.println("Operation result: " + result),

294

error -> System.err.println("Operation failed: " + error.getMessage())

295

);

296

297

// Lock with error handling

298

lock.lock()

299

.then(performProtectedWork())

300

.doOnError(error -> System.err.println("Error during work: " + error))

301

.doFinally(signalType -> {

302

// Always unlock, even on error

303

lock.unlock().subscribe();

304

})

305

.subscribe();

306

```

307

308

### RxJava Integration

309

310

RxJava interfaces for reactive programming with RxJava types.

311

312

```java { .api }

313

// RxJava map interface

314

public interface RMapRx<K, V> extends RObjectRx {

315

// Basic operations returning RxJava types

316

Single<V> put(K key, V value);

317

Maybe<V> putIfAbsent(K key, V value);

318

Maybe<V> get(K key);

319

Maybe<V> remove(K key);

320

Single<Boolean> containsKey(K key);

321

Single<Boolean> containsValue(V value);

322

323

// Bulk operations

324

Single<Integer> size();

325

Single<Boolean> isEmpty();

326

Completable clear();

327

Observable<K> keyIterator();

328

Observable<V> valueIterator();

329

Observable<Entry<K, V>> entryIterator();

330

331

// Fast operations

332

Single<Long> fastPut(K key, V value);

333

Single<Boolean> fastPutIfAbsent(K key, V value);

334

Single<Long> fastRemove(K... keys);

335

}

336

337

// RxJava lock interface

338

public interface RLockRx extends RObjectRx {

339

Completable lock();

340

Completable lock(long leaseTime, TimeUnit unit);

341

Single<Boolean> tryLock();

342

Single<Boolean> tryLock(long waitTime, TimeUnit unit);

343

Single<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);

344

Completable unlock();

345

Completable forceUnlock();

346

Single<Boolean> isLocked();

347

Single<Boolean> isHeldByCurrentThread();

348

Single<Integer> getHoldCount();

349

Single<Long> remainTimeToLive();

350

}

351

```

352

353

**RxJava Examples:**

354

355

```java

356

// RxJava map operations

357

RMapRx<String, String> rxMap = rxClient.getMap("cache");

358

359

// Chain RxJava operations

360

Single<String> result = rxMap.put("key1", "value1")

361

.flatMap(previous -> rxMap.get("key1"))

362

.doOnSuccess(value -> System.out.println("Retrieved: " + value));

363

364

result.subscribe(

365

value -> System.out.println("Final result: " + value),

366

error -> System.err.println("Error: " + error.getMessage())

367

);

368

369

// RxJava lock operations

370

RLockRx rxLock = rxClient.getLock("rxLock");

371

372

Completable lockOperation = rxLock.tryLock(5, TimeUnit.SECONDS)

373

.flatMapCompletable(acquired -> {

374

if (acquired) {

375

return performRxWork()

376

.doFinally(() -> rxLock.unlock().subscribe());

377

} else {

378

return Completable.error(new RuntimeException("Lock not acquired"));

379

}

380

});

381

382

lockOperation.subscribe(

383

() -> System.out.println("Operation completed successfully"),

384

error -> System.err.println("Operation failed: " + error.getMessage())

385

);

386

```

387

388

### Async Collections

389

390

Async interfaces for all collection types with future-based operations.

391

392

```java { .api }

393

// Async map interface

394

public interface RMapAsync<K, V> extends RObjectAsync, RExpirableAsync {

395

RFuture<V> putAsync(K key, V value);

396

RFuture<V> putIfAbsentAsync(K key, V value);

397

RFuture<V> getAsync(K key);

398

RFuture<V> removeAsync(K key);

399

RFuture<Boolean> containsKeyAsync(K key);

400

RFuture<Boolean> containsValueAsync(V value);

401

402

RFuture<Integer> sizeAsync();

403

RFuture<Boolean> isEmptyAsync();

404

RFuture<Void> clearAsync();

405

406

RFuture<Set<K>> keySetAsync();

407

RFuture<Collection<V>> valuesAsync();

408

RFuture<Set<Entry<K, V>>> entrySetAsync();

409

410

RFuture<Map<K, V>> getAllAsync(Set<K> keys);

411

RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);

412

}

413

414

// Async list interface

415

public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<V>> {

416

RFuture<V> getAsync(int index);

417

RFuture<V> setAsync(int index, V element);

418

RFuture<Void> addAsync(int index, V element);

419

RFuture<V> removeAsync(int index);

420

421

RFuture<Integer> indexOfAsync(Object o);

422

RFuture<Integer> lastIndexOfAsync(Object o);

423

424

RFuture<List<V>> rangeAsync(int fromIndex, int toIndex);

425

RFuture<Void> trimAsync(int fromIndex, int toIndex);

426

}

427

428

// Async lock interface

429

public interface RLockAsync extends RObjectAsync {

430

RFuture<Void> lockAsync();

431

RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);

432

RFuture<Boolean> tryLockAsync();

433

RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);

434

RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);

435

RFuture<Void> unlockAsync();

436

RFuture<Void> forceUnlockAsync();

437

RFuture<Boolean> isLockedAsync();

438

RFuture<Boolean> isHeldByCurrentThreadAsync();

439

RFuture<Integer> getHoldCountAsync();

440

RFuture<Long> remainTimeToLiveAsync();

441

}

442

```

443

444

**Async Usage Examples:**

445

446

```java

447

// Async map operations with CompletableFuture

448

RMapAsync<String, String> asyncMap = redisson.getMap("asyncData");

449

450

// Chain async operations

451

RFuture<String> futureResult = asyncMap.putAsync("key1", "value1")

452

.thenCompose(v -> asyncMap.getAsync("key1"))

453

.thenApply(value -> "Processed: " + value);

454

455

// Handle result

456

futureResult.whenComplete((result, throwable) -> {

457

if (throwable == null) {

458

System.out.println("Result: " + result);

459

} else {

460

System.err.println("Error: " + throwable.getMessage());

461

}

462

});

463

464

// Async lock operations

465

RLockAsync asyncLock = redisson.getLock("asyncLock");

466

467

RFuture<Boolean> lockFuture = asyncLock.tryLockAsync(5, 30, TimeUnit.SECONDS)

468

.thenCompose(acquired -> {

469

if (acquired) {

470

return performAsyncWork()

471

.whenComplete((result, error) -> {

472

// Always unlock

473

asyncLock.unlockAsync();

474

});

475

} else {

476

return CompletableFuture.failedFuture(

477

new RuntimeException("Could not acquire lock")

478

);

479

}

480

});

481

482

// Multiple async operations in parallel

483

RFuture<String> future1 = asyncMap.getAsync("key1");

484

RFuture<String> future2 = asyncMap.getAsync("key2");

485

RFuture<String> future3 = asyncMap.getAsync("key3");

486

487

RFuture.allOf(future1, future2, future3)

488

.thenApply(v -> Arrays.asList(

489

future1.getNow(null),

490

future2.getNow(null),

491

future3.getNow(null)

492

))

493

.whenComplete((results, error) -> {

494

if (error == null) {

495

System.out.println("All results: " + results);

496

} else {

497

System.err.println("Error getting results: " + error.getMessage());

498

}

499

});

500

```

501

502

## Reactive Error Handling and Patterns

503

504

```java { .api }

505

// Error handling patterns for reactive streams

506

public class ReactivePatterns {

507

508

// Retry with backoff

509

public static <T> Mono<T> retryWithBackoff(Mono<T> source, int maxRetries) {

510

return source.retryWhen(Retry.backoff(maxRetries, Duration.ofMillis(100)));

511

}

512

513

// Timeout handling

514

public static <T> Mono<T> withTimeout(Mono<T> source, Duration timeout) {

515

return source.timeout(timeout)

516

.onErrorResume(TimeoutException.class,

517

ex -> Mono.error(new RuntimeException("Operation timed out", ex)));

518

}

519

520

// Fallback handling

521

public static <T> Mono<T> withFallback(Mono<T> primary, Mono<T> fallback) {

522

return primary.onErrorResume(throwable -> {

523

System.err.println("Primary failed, using fallback: " + throwable.getMessage());

524

return fallback;

525

});

526

}

527

}

528

529

// Usage examples

530

RMapReactive<String, String> reactiveMap = reactiveClient.getMap("data");

531

532

// Apply patterns

533

Mono<String> robustOperation = ReactivePatterns.withTimeout(

534

ReactivePatterns.retryWithBackoff(

535

reactiveMap.get("important-key"), 3

536

),

537

Duration.ofSeconds(10)

538

);

539

540

Mono<String> withFallback = ReactivePatterns.withFallback(

541

robustOperation,

542

Mono.just("default-value")

543

);

544

```