0
# Reactive and Async APIs
1
2
Redisson provides comprehensive support for non-blocking, reactive programming with both Reactive Streams and RxJava interfaces. All synchronous operations have corresponding asynchronous variants that return futures or reactive streams.
3
4
## Capabilities
5
6
### Client Interfaces
7
8
Access to reactive and RxJava client interfaces for non-blocking operations.
9
10
```java { .api }
11
/**
12
* Get reactive streams client interface
13
* @return RedissonReactiveClient for reactive programming with Reactive Streams
14
*/
15
public RedissonReactiveClient reactive();
16
17
/**
18
* Get RxJava client interface
19
* @return RedissonRxClient for RxJava programming
20
*/
21
public RedissonRxClient rxJava();
22
```
23
24
**Reactive Client Interfaces:**
25
26
```java { .api }
27
// Reactive Streams client interface
28
public interface RedissonReactiveClient {
29
// Collections - return reactive variants
30
<K, V> RMapReactive<K, V> getMap(String name);
31
<V> RListReactive<V> getList(String name);
32
<V> RSetReactive<V> getSet(String name);
33
<V> RQueueReactive<V> getQueue(String name);
34
35
// Locks and synchronization - return reactive variants
36
RLockReactive getLock(String name);
37
RSemaphoreReactive getSemaphore(String name);
38
RCountDownLatchReactive getCountDownLatch(String name);
39
40
// Atomic operations - return reactive variants
41
RAtomicLongReactive getAtomicLong(String name);
42
RAtomicDoubleReactive getAtomicDouble(String name);
43
44
// Topics - return reactive variants
45
RTopicReactive getTopic(String name);
46
RPatternTopicReactive getPatternTopic(String pattern);
47
48
// Lifecycle methods
49
Mono<Void> shutdown();
50
Mono<Void> shutdown(long quietPeriod, long timeout, TimeUnit unit);
51
boolean isShutdown();
52
boolean isShuttingDown();
53
}
54
55
// RxJava client interface
56
public interface RedissonRxClient {
57
// Collections - return RxJava variants
58
<K, V> RMapRx<K, V> getMap(String name);
59
<V> RListRx<V> getList(String name);
60
<V> RSetRx<V> getSet(String name);
61
<V> RQueueRx<V> getQueue(String name);
62
63
// Locks and synchronization - return RxJava variants
64
RLockRx getLock(String name);
65
RSemaphoreRx getSemaphore(String name);
66
RCountDownLatchRx getCountDownLatch(String name);
67
68
// Atomic operations - return RxJava variants
69
RAtomicLongRx getAtomicLong(String name);
70
RAtomicDoubleRx getAtomicDouble(String name);
71
72
// Topics - return RxJava variants
73
RTopicRx getTopic(String name);
74
RPatternTopicRx getPatternTopic(String pattern);
75
76
// Lifecycle methods
77
Completable shutdown();
78
Completable shutdown(long quietPeriod, long timeout, TimeUnit unit);
79
boolean isShutdown();
80
boolean isShuttingDown();
81
}
82
```
83
84
**Usage Examples:**
85
86
```java
87
import org.redisson.api.*;
88
import reactor.core.publisher.Mono;
89
import reactor.core.publisher.Flux;
90
import io.reactivex.rxjava3.core.Single;
91
import io.reactivex.rxjava3.core.Completable;
92
93
// Get reactive clients
94
RedissonReactiveClient reactiveClient = redisson.reactive();
95
RedissonRxClient rxClient = redisson.rxJava();
96
97
// Reactive streams example
98
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("users");
99
Mono<String> putResult = reactiveMap.put("user1", "Alice")
100
.then(reactiveMap.get("user1"))
101
.doOnNext(value -> System.out.println("Retrieved: " + value));
102
103
// RxJava example
104
RMapRx<String, String> rxMap = rxClient.getMap("products");
105
Single<String> rxResult = rxMap.put("product1", "Laptop")
106
.andThen(rxMap.get("product1"))
107
.doOnSuccess(value -> System.out.println("Retrieved: " + value));
108
```
109
110
### Async Interfaces
111
112
All synchronous interfaces extend async counterparts that return `RFuture<T>` for non-blocking operations.
113
114
```java { .api }
115
// Base async interface
116
public interface RObjectAsync {
117
RFuture<Boolean> touchAsync();
118
RFuture<Boolean> unlinkAsync();
119
RFuture<Boolean> deleteAsync();
120
RFuture<Boolean> isExistsAsync();
121
RFuture<Void> renameAsync(String newName);
122
RFuture<Boolean> renameNXAsync(String newName);
123
RFuture<Boolean> copyAsync(String host, int port, int database, long timeout);
124
RFuture<Boolean> migrateAsync(String host, int port, int database, long timeout);
125
RFuture<Boolean> moveAsync(int database);
126
RFuture<Long> sizeInMemoryAsync();
127
RFuture<Void> restoreAsync(byte[] state);
128
RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
129
RFuture<Void> restoreAndReplaceAsync(byte[] state);
130
RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
131
RFuture<byte[]> dumpAsync();
132
}
133
134
// Async expirable interface
135
public interface RExpirableAsync extends RObjectAsync {
136
RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit);
137
RFuture<Boolean> expireAtAsync(Date timestamp);
138
RFuture<Boolean> expireAtAsync(long timestamp);
139
RFuture<Boolean> clearExpireAsync();
140
RFuture<Long> remainTimeToLiveAsync();
141
RFuture<Long> getExpireTimeAsync();
142
}
143
```
144
145
**RFuture Interface:**
146
147
```java { .api }
148
// Redisson's future interface extending Java's CompletableFuture
149
public interface RFuture<T> extends CompletableFuture<T> {
150
// Standard CompletableFuture methods available
151
boolean cancel(boolean mayInterruptIfRunning);
152
boolean isCancelled();
153
boolean isDone();
154
T get() throws InterruptedException, ExecutionException;
155
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
156
157
// Additional async composition methods
158
RFuture<T> sync();
159
RFuture<T> syncUninterruptibly();
160
T syncUninterruptibly();
161
RFuture<T> await();
162
RFuture<T> awaitUninterruptibly();
163
boolean await(long timeout, TimeUnit unit);
164
boolean await(long timeoutMillis);
165
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
166
boolean awaitUninterruptibly(long timeoutMillis);
167
168
// Success/failure callbacks
169
RFuture<T> onComplete(BiConsumer<? super T, ? super Throwable> action);
170
171
// Conversion to reactive types
172
Mono<T> toMono();
173
Single<T> toSingle();
174
}
175
```
176
177
### Reactive Map Operations
178
179
Reactive map interface with non-blocking operations returning reactive streams.
180
181
```java { .api }
182
public interface RMapReactive<K, V> extends RObjectReactive {
183
// Basic operations
184
Mono<V> put(K key, V value);
185
Mono<V> putIfAbsent(K key, V value);
186
Mono<V> get(K key);
187
Mono<V> remove(K key);
188
Mono<Boolean> containsKey(K key);
189
Mono<Boolean> containsValue(V value);
190
191
// Bulk operations
192
Mono<Integer> size();
193
Mono<Boolean> isEmpty();
194
Mono<Void> clear();
195
Flux<K> keyIterator();
196
Flux<V> valueIterator();
197
Flux<Entry<K, V>> entryIterator();
198
199
// Advanced operations
200
Mono<V> addAndGet(K key, Number delta);
201
Flux<K> readAllKeySet();
202
Flux<V> readAllValues();
203
Flux<Entry<K, V>> readAllEntrySet();
204
Mono<Map<K, V>> readAllMap();
205
206
// Fast operations
207
Mono<Long> fastPut(K key, V value);
208
Mono<Boolean> fastPutIfAbsent(K key, V value);
209
Mono<Long> fastRemove(K... keys);
210
211
// Batch operations
212
Mono<Void> putAll(Map<? extends K, ? extends V> map);
213
Mono<Map<K, V>> getAll(Set<K> keys);
214
Mono<Long> removeAll(Set<K> keys);
215
}
216
```
217
218
**Usage Examples:**
219
220
```java
221
// Reactive map operations
222
RMapReactive<String, User> userMap = reactiveClient.getMap("users");
223
224
// Chain reactive operations
225
Mono<String> pipeline = userMap.put("user1", new User("Alice", 25))
226
.then(userMap.get("user1"))
227
.map(User::getName)
228
.doOnNext(name -> System.out.println("User: " + name));
229
230
// Execute the pipeline
231
pipeline.subscribe();
232
233
// Multiple operations in sequence
234
Flux<String> userNames = userMap.putAll(Map.of(
235
"user1", new User("Alice", 25),
236
"user2", new User("Bob", 30),
237
"user3", new User("Charlie", 35)
238
))
239
.thenMany(userMap.readAllValues())
240
.map(User::getName)
241
.doOnNext(name -> System.out.println("Found user: " + name));
242
243
userNames.collectList().subscribe(names -> {
244
System.out.println("All users: " + names);
245
});
246
```
247
248
### Reactive Lock Operations
249
250
Reactive lock interface for non-blocking distributed locking.
251
252
```java { .api }
253
public interface RLockReactive extends RObjectReactive {
254
// Lock acquisition
255
Mono<Void> lock();
256
Mono<Void> lock(long leaseTime, TimeUnit unit);
257
258
// Try lock operations
259
Mono<Boolean> tryLock();
260
Mono<Boolean> tryLock(long waitTime, TimeUnit unit);
261
Mono<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
262
263
// Lock release
264
Mono<Void> unlock();
265
Mono<Void> forceUnlock();
266
267
// Lock status
268
Mono<Boolean> isLocked();
269
Mono<Boolean> isHeldByCurrentThread();
270
Mono<Integer> getHoldCount();
271
Mono<Long> remainTimeToLive();
272
}
273
```
274
275
**Usage Examples:**
276
277
```java
278
// Reactive lock operations
279
RLockReactive lock = reactiveClient.getLock("processLock");
280
281
// Try lock with timeout and automatic release
282
Mono<String> protectedOperation = lock.tryLock(5, 30, TimeUnit.SECONDS)
283
.flatMap(acquired -> {
284
if (acquired) {
285
return performCriticalOperation()
286
.doFinally(signalType -> lock.unlock().subscribe());
287
} else {
288
return Mono.error(new RuntimeException("Could not acquire lock"));
289
}
290
});
291
292
protectedOperation.subscribe(
293
result -> System.out.println("Operation result: " + result),
294
error -> System.err.println("Operation failed: " + error.getMessage())
295
);
296
297
// Lock with error handling
298
lock.lock()
299
.then(performProtectedWork())
300
.doOnError(error -> System.err.println("Error during work: " + error))
301
.doFinally(signalType -> {
302
// Always unlock, even on error
303
lock.unlock().subscribe();
304
})
305
.subscribe();
306
```
307
308
### RxJava Integration
309
310
RxJava interfaces for reactive programming with RxJava types.
311
312
```java { .api }
313
// RxJava map interface
314
public interface RMapRx<K, V> extends RObjectRx {
315
// Basic operations returning RxJava types
316
Single<V> put(K key, V value);
317
Maybe<V> putIfAbsent(K key, V value);
318
Maybe<V> get(K key);
319
Maybe<V> remove(K key);
320
Single<Boolean> containsKey(K key);
321
Single<Boolean> containsValue(V value);
322
323
// Bulk operations
324
Single<Integer> size();
325
Single<Boolean> isEmpty();
326
Completable clear();
327
Observable<K> keyIterator();
328
Observable<V> valueIterator();
329
Observable<Entry<K, V>> entryIterator();
330
331
// Fast operations
332
Single<Long> fastPut(K key, V value);
333
Single<Boolean> fastPutIfAbsent(K key, V value);
334
Single<Long> fastRemove(K... keys);
335
}
336
337
// RxJava lock interface
338
public interface RLockRx extends RObjectRx {
339
Completable lock();
340
Completable lock(long leaseTime, TimeUnit unit);
341
Single<Boolean> tryLock();
342
Single<Boolean> tryLock(long waitTime, TimeUnit unit);
343
Single<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
344
Completable unlock();
345
Completable forceUnlock();
346
Single<Boolean> isLocked();
347
Single<Boolean> isHeldByCurrentThread();
348
Single<Integer> getHoldCount();
349
Single<Long> remainTimeToLive();
350
}
351
```
352
353
**RxJava Examples:**
354
355
```java
356
// RxJava map operations
357
RMapRx<String, String> rxMap = rxClient.getMap("cache");
358
359
// Chain RxJava operations
360
Single<String> result = rxMap.put("key1", "value1")
361
.flatMap(previous -> rxMap.get("key1"))
362
.doOnSuccess(value -> System.out.println("Retrieved: " + value));
363
364
result.subscribe(
365
value -> System.out.println("Final result: " + value),
366
error -> System.err.println("Error: " + error.getMessage())
367
);
368
369
// RxJava lock operations
370
RLockRx rxLock = rxClient.getLock("rxLock");
371
372
Completable lockOperation = rxLock.tryLock(5, TimeUnit.SECONDS)
373
.flatMapCompletable(acquired -> {
374
if (acquired) {
375
return performRxWork()
376
.doFinally(() -> rxLock.unlock().subscribe());
377
} else {
378
return Completable.error(new RuntimeException("Lock not acquired"));
379
}
380
});
381
382
lockOperation.subscribe(
383
() -> System.out.println("Operation completed successfully"),
384
error -> System.err.println("Operation failed: " + error.getMessage())
385
);
386
```
387
388
### Async Collections
389
390
Async interfaces for all collection types with future-based operations.
391
392
```java { .api }
393
// Async map interface
394
public interface RMapAsync<K, V> extends RObjectAsync, RExpirableAsync {
395
RFuture<V> putAsync(K key, V value);
396
RFuture<V> putIfAbsentAsync(K key, V value);
397
RFuture<V> getAsync(K key);
398
RFuture<V> removeAsync(K key);
399
RFuture<Boolean> containsKeyAsync(K key);
400
RFuture<Boolean> containsValueAsync(V value);
401
402
RFuture<Integer> sizeAsync();
403
RFuture<Boolean> isEmptyAsync();
404
RFuture<Void> clearAsync();
405
406
RFuture<Set<K>> keySetAsync();
407
RFuture<Collection<V>> valuesAsync();
408
RFuture<Set<Entry<K, V>>> entrySetAsync();
409
410
RFuture<Map<K, V>> getAllAsync(Set<K> keys);
411
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
412
}
413
414
// Async list interface
415
public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<V>> {
416
RFuture<V> getAsync(int index);
417
RFuture<V> setAsync(int index, V element);
418
RFuture<Void> addAsync(int index, V element);
419
RFuture<V> removeAsync(int index);
420
421
RFuture<Integer> indexOfAsync(Object o);
422
RFuture<Integer> lastIndexOfAsync(Object o);
423
424
RFuture<List<V>> rangeAsync(int fromIndex, int toIndex);
425
RFuture<Void> trimAsync(int fromIndex, int toIndex);
426
}
427
428
// Async lock interface
429
public interface RLockAsync extends RObjectAsync {
430
RFuture<Void> lockAsync();
431
RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);
432
RFuture<Boolean> tryLockAsync();
433
RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);
434
RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);
435
RFuture<Void> unlockAsync();
436
RFuture<Void> forceUnlockAsync();
437
RFuture<Boolean> isLockedAsync();
438
RFuture<Boolean> isHeldByCurrentThreadAsync();
439
RFuture<Integer> getHoldCountAsync();
440
RFuture<Long> remainTimeToLiveAsync();
441
}
442
```
443
444
**Async Usage Examples:**
445
446
```java
447
// Async map operations with CompletableFuture
448
RMapAsync<String, String> asyncMap = redisson.getMap("asyncData");
449
450
// Chain async operations
451
RFuture<String> futureResult = asyncMap.putAsync("key1", "value1")
452
.thenCompose(v -> asyncMap.getAsync("key1"))
453
.thenApply(value -> "Processed: " + value);
454
455
// Handle result
456
futureResult.whenComplete((result, throwable) -> {
457
if (throwable == null) {
458
System.out.println("Result: " + result);
459
} else {
460
System.err.println("Error: " + throwable.getMessage());
461
}
462
});
463
464
// Async lock operations
465
RLockAsync asyncLock = redisson.getLock("asyncLock");
466
467
RFuture<Boolean> lockFuture = asyncLock.tryLockAsync(5, 30, TimeUnit.SECONDS)
468
.thenCompose(acquired -> {
469
if (acquired) {
470
return performAsyncWork()
471
.whenComplete((result, error) -> {
472
// Always unlock
473
asyncLock.unlockAsync();
474
});
475
} else {
476
return CompletableFuture.failedFuture(
477
new RuntimeException("Could not acquire lock")
478
);
479
}
480
});
481
482
// Multiple async operations in parallel
483
RFuture<String> future1 = asyncMap.getAsync("key1");
484
RFuture<String> future2 = asyncMap.getAsync("key2");
485
RFuture<String> future3 = asyncMap.getAsync("key3");
486
487
RFuture.allOf(future1, future2, future3)
488
.thenApply(v -> Arrays.asList(
489
future1.getNow(null),
490
future2.getNow(null),
491
future3.getNow(null)
492
))
493
.whenComplete((results, error) -> {
494
if (error == null) {
495
System.out.println("All results: " + results);
496
} else {
497
System.err.println("Error getting results: " + error.getMessage());
498
}
499
});
500
```
501
502
## Reactive Error Handling and Patterns
503
504
```java { .api }
505
// Error handling patterns for reactive streams
506
public class ReactivePatterns {
507
508
// Retry with backoff
509
public static <T> Mono<T> retryWithBackoff(Mono<T> source, int maxRetries) {
510
return source.retryWhen(Retry.backoff(maxRetries, Duration.ofMillis(100)));
511
}
512
513
// Timeout handling
514
public static <T> Mono<T> withTimeout(Mono<T> source, Duration timeout) {
515
return source.timeout(timeout)
516
.onErrorResume(TimeoutException.class,
517
ex -> Mono.error(new RuntimeException("Operation timed out", ex)));
518
}
519
520
// Fallback handling
521
public static <T> Mono<T> withFallback(Mono<T> primary, Mono<T> fallback) {
522
return primary.onErrorResume(throwable -> {
523
System.err.println("Primary failed, using fallback: " + throwable.getMessage());
524
return fallback;
525
});
526
}
527
}
528
529
// Usage examples
530
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("data");
531
532
// Apply patterns
533
Mono<String> robustOperation = ReactivePatterns.withTimeout(
534
ReactivePatterns.retryWithBackoff(
535
reactiveMap.get("important-key"), 3
536
),
537
Duration.ofSeconds(10)
538
);
539
540
Mono<String> withFallback = ReactivePatterns.withFallback(
541
robustOperation,
542
Mono.just("default-value")
543
);
544
```