Valkey and Redis Java client providing complete Real-Time Data Platform with distributed objects and services
—
Redisson provides comprehensive support for non-blocking, reactive programming with both Reactive Streams and RxJava interfaces. All synchronous operations have corresponding asynchronous variants that return futures or reactive streams.
Access to reactive and RxJava client interfaces for non-blocking operations.
/**
* Get reactive streams client interface
* @return RedissonReactiveClient for reactive programming with Reactive Streams
*/
public RedissonReactiveClient reactive();
/**
* Get RxJava client interface
* @return RedissonRxClient for RxJava programming
*/
public RedissonRxClient rxJava();Reactive Client Interfaces:
// Reactive Streams client interface
public interface RedissonReactiveClient {
// Collections - return reactive variants
<K, V> RMapReactive<K, V> getMap(String name);
<V> RListReactive<V> getList(String name);
<V> RSetReactive<V> getSet(String name);
<V> RQueueReactive<V> getQueue(String name);
// Locks and synchronization - return reactive variants
RLockReactive getLock(String name);
RSemaphoreReactive getSemaphore(String name);
RCountDownLatchReactive getCountDownLatch(String name);
// Atomic operations - return reactive variants
RAtomicLongReactive getAtomicLong(String name);
RAtomicDoubleReactive getAtomicDouble(String name);
// Topics - return reactive variants
RTopicReactive getTopic(String name);
RPatternTopicReactive getPatternTopic(String pattern);
// Lifecycle methods
Mono<Void> shutdown();
Mono<Void> shutdown(long quietPeriod, long timeout, TimeUnit unit);
boolean isShutdown();
boolean isShuttingDown();
}
// RxJava client interface
public interface RedissonRxClient {
// Collections - return RxJava variants
<K, V> RMapRx<K, V> getMap(String name);
<V> RListRx<V> getList(String name);
<V> RSetRx<V> getSet(String name);
<V> RQueueRx<V> getQueue(String name);
// Locks and synchronization - return RxJava variants
RLockRx getLock(String name);
RSemaphoreRx getSemaphore(String name);
RCountDownLatchRx getCountDownLatch(String name);
// Atomic operations - return RxJava variants
RAtomicLongRx getAtomicLong(String name);
RAtomicDoubleRx getAtomicDouble(String name);
// Topics - return RxJava variants
RTopicRx getTopic(String name);
RPatternTopicRx getPatternTopic(String pattern);
// Lifecycle methods
Completable shutdown();
Completable shutdown(long quietPeriod, long timeout, TimeUnit unit);
boolean isShutdown();
boolean isShuttingDown();
}Usage Examples:
import org.redisson.api.*;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Completable;
// Get reactive clients
RedissonReactiveClient reactiveClient = redisson.reactive();
RedissonRxClient rxClient = redisson.rxJava();
// Reactive streams example
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("users");
Mono<String> putResult = reactiveMap.put("user1", "Alice")
.then(reactiveMap.get("user1"))
.doOnNext(value -> System.out.println("Retrieved: " + value));
// RxJava example
RMapRx<String, String> rxMap = rxClient.getMap("products");
Single<String> rxResult = rxMap.put("product1", "Laptop")
.andThen(rxMap.get("product1"))
.doOnSuccess(value -> System.out.println("Retrieved: " + value));All synchronous interfaces extend async counterparts that return RFuture<T> for non-blocking operations.
// Base async interface
public interface RObjectAsync {
RFuture<Boolean> touchAsync();
RFuture<Boolean> unlinkAsync();
RFuture<Boolean> deleteAsync();
RFuture<Boolean> isExistsAsync();
RFuture<Void> renameAsync(String newName);
RFuture<Boolean> renameNXAsync(String newName);
RFuture<Boolean> copyAsync(String host, int port, int database, long timeout);
RFuture<Boolean> migrateAsync(String host, int port, int database, long timeout);
RFuture<Boolean> moveAsync(int database);
RFuture<Long> sizeInMemoryAsync();
RFuture<Void> restoreAsync(byte[] state);
RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
RFuture<Void> restoreAndReplaceAsync(byte[] state);
RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
RFuture<byte[]> dumpAsync();
}
// Async expirable interface
public interface RExpirableAsync extends RObjectAsync {
RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit);
RFuture<Boolean> expireAtAsync(Date timestamp);
RFuture<Boolean> expireAtAsync(long timestamp);
RFuture<Boolean> clearExpireAsync();
RFuture<Long> remainTimeToLiveAsync();
RFuture<Long> getExpireTimeAsync();
}RFuture Interface:
// Redisson's future interface extending Java's CompletableFuture
public interface RFuture<T> extends CompletableFuture<T> {
// Standard CompletableFuture methods available
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
T get() throws InterruptedException, ExecutionException;
T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
// Additional async composition methods
RFuture<T> sync();
RFuture<T> syncUninterruptibly();
T syncUninterruptibly();
RFuture<T> await();
RFuture<T> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit);
boolean await(long timeoutMillis);
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// Success/failure callbacks
RFuture<T> onComplete(BiConsumer<? super T, ? super Throwable> action);
// Conversion to reactive types
Mono<T> toMono();
Single<T> toSingle();
}Reactive map interface with non-blocking operations returning reactive streams.
public interface RMapReactive<K, V> extends RObjectReactive {
// Basic operations
Mono<V> put(K key, V value);
Mono<V> putIfAbsent(K key, V value);
Mono<V> get(K key);
Mono<V> remove(K key);
Mono<Boolean> containsKey(K key);
Mono<Boolean> containsValue(V value);
// Bulk operations
Mono<Integer> size();
Mono<Boolean> isEmpty();
Mono<Void> clear();
Flux<K> keyIterator();
Flux<V> valueIterator();
Flux<Entry<K, V>> entryIterator();
// Advanced operations
Mono<V> addAndGet(K key, Number delta);
Flux<K> readAllKeySet();
Flux<V> readAllValues();
Flux<Entry<K, V>> readAllEntrySet();
Mono<Map<K, V>> readAllMap();
// Fast operations
Mono<Long> fastPut(K key, V value);
Mono<Boolean> fastPutIfAbsent(K key, V value);
Mono<Long> fastRemove(K... keys);
// Batch operations
Mono<Void> putAll(Map<? extends K, ? extends V> map);
Mono<Map<K, V>> getAll(Set<K> keys);
Mono<Long> removeAll(Set<K> keys);
}Usage Examples:
// Reactive map operations
RMapReactive<String, User> userMap = reactiveClient.getMap("users");
// Chain reactive operations
Mono<String> pipeline = userMap.put("user1", new User("Alice", 25))
.then(userMap.get("user1"))
.map(User::getName)
.doOnNext(name -> System.out.println("User: " + name));
// Execute the pipeline
pipeline.subscribe();
// Multiple operations in sequence
Flux<String> userNames = userMap.putAll(Map.of(
"user1", new User("Alice", 25),
"user2", new User("Bob", 30),
"user3", new User("Charlie", 35)
))
.thenMany(userMap.readAllValues())
.map(User::getName)
.doOnNext(name -> System.out.println("Found user: " + name));
userNames.collectList().subscribe(names -> {
System.out.println("All users: " + names);
});Reactive lock interface for non-blocking distributed locking.
public interface RLockReactive extends RObjectReactive {
// Lock acquisition
Mono<Void> lock();
Mono<Void> lock(long leaseTime, TimeUnit unit);
// Try lock operations
Mono<Boolean> tryLock();
Mono<Boolean> tryLock(long waitTime, TimeUnit unit);
Mono<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
// Lock release
Mono<Void> unlock();
Mono<Void> forceUnlock();
// Lock status
Mono<Boolean> isLocked();
Mono<Boolean> isHeldByCurrentThread();
Mono<Integer> getHoldCount();
Mono<Long> remainTimeToLive();
}Usage Examples:
// Reactive lock operations
RLockReactive lock = reactiveClient.getLock("processLock");
// Try lock with timeout and automatic release
Mono<String> protectedOperation = lock.tryLock(5, 30, TimeUnit.SECONDS)
.flatMap(acquired -> {
if (acquired) {
return performCriticalOperation()
.doFinally(signalType -> lock.unlock().subscribe());
} else {
return Mono.error(new RuntimeException("Could not acquire lock"));
}
});
protectedOperation.subscribe(
result -> System.out.println("Operation result: " + result),
error -> System.err.println("Operation failed: " + error.getMessage())
);
// Lock with error handling
lock.lock()
.then(performProtectedWork())
.doOnError(error -> System.err.println("Error during work: " + error))
.doFinally(signalType -> {
// Always unlock, even on error
lock.unlock().subscribe();
})
.subscribe();RxJava interfaces for reactive programming with RxJava types.
// RxJava map interface
public interface RMapRx<K, V> extends RObjectRx {
// Basic operations returning RxJava types
Single<V> put(K key, V value);
Maybe<V> putIfAbsent(K key, V value);
Maybe<V> get(K key);
Maybe<V> remove(K key);
Single<Boolean> containsKey(K key);
Single<Boolean> containsValue(V value);
// Bulk operations
Single<Integer> size();
Single<Boolean> isEmpty();
Completable clear();
Observable<K> keyIterator();
Observable<V> valueIterator();
Observable<Entry<K, V>> entryIterator();
// Fast operations
Single<Long> fastPut(K key, V value);
Single<Boolean> fastPutIfAbsent(K key, V value);
Single<Long> fastRemove(K... keys);
}
// RxJava lock interface
public interface RLockRx extends RObjectRx {
Completable lock();
Completable lock(long leaseTime, TimeUnit unit);
Single<Boolean> tryLock();
Single<Boolean> tryLock(long waitTime, TimeUnit unit);
Single<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
Completable unlock();
Completable forceUnlock();
Single<Boolean> isLocked();
Single<Boolean> isHeldByCurrentThread();
Single<Integer> getHoldCount();
Single<Long> remainTimeToLive();
}RxJava Examples:
// RxJava map operations
RMapRx<String, String> rxMap = rxClient.getMap("cache");
// Chain RxJava operations
Single<String> result = rxMap.put("key1", "value1")
.flatMap(previous -> rxMap.get("key1"))
.doOnSuccess(value -> System.out.println("Retrieved: " + value));
result.subscribe(
value -> System.out.println("Final result: " + value),
error -> System.err.println("Error: " + error.getMessage())
);
// RxJava lock operations
RLockRx rxLock = rxClient.getLock("rxLock");
Completable lockOperation = rxLock.tryLock(5, TimeUnit.SECONDS)
.flatMapCompletable(acquired -> {
if (acquired) {
return performRxWork()
.doFinally(() -> rxLock.unlock().subscribe());
} else {
return Completable.error(new RuntimeException("Lock not acquired"));
}
});
lockOperation.subscribe(
() -> System.out.println("Operation completed successfully"),
error -> System.err.println("Operation failed: " + error.getMessage())
);Async interfaces for all collection types with future-based operations.
// Async map interface
public interface RMapAsync<K, V> extends RObjectAsync, RExpirableAsync {
RFuture<V> putAsync(K key, V value);
RFuture<V> putIfAbsentAsync(K key, V value);
RFuture<V> getAsync(K key);
RFuture<V> removeAsync(K key);
RFuture<Boolean> containsKeyAsync(K key);
RFuture<Boolean> containsValueAsync(V value);
RFuture<Integer> sizeAsync();
RFuture<Boolean> isEmptyAsync();
RFuture<Void> clearAsync();
RFuture<Set<K>> keySetAsync();
RFuture<Collection<V>> valuesAsync();
RFuture<Set<Entry<K, V>>> entrySetAsync();
RFuture<Map<K, V>> getAllAsync(Set<K> keys);
RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
}
// Async list interface
public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<V>> {
RFuture<V> getAsync(int index);
RFuture<V> setAsync(int index, V element);
RFuture<Void> addAsync(int index, V element);
RFuture<V> removeAsync(int index);
RFuture<Integer> indexOfAsync(Object o);
RFuture<Integer> lastIndexOfAsync(Object o);
RFuture<List<V>> rangeAsync(int fromIndex, int toIndex);
RFuture<Void> trimAsync(int fromIndex, int toIndex);
}
// Async lock interface
public interface RLockAsync extends RObjectAsync {
RFuture<Void> lockAsync();
RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);
RFuture<Boolean> tryLockAsync();
RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);
RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);
RFuture<Void> unlockAsync();
RFuture<Void> forceUnlockAsync();
RFuture<Boolean> isLockedAsync();
RFuture<Boolean> isHeldByCurrentThreadAsync();
RFuture<Integer> getHoldCountAsync();
RFuture<Long> remainTimeToLiveAsync();
}Async Usage Examples:
// Async map operations with CompletableFuture
RMapAsync<String, String> asyncMap = redisson.getMap("asyncData");
// Chain async operations
RFuture<String> futureResult = asyncMap.putAsync("key1", "value1")
.thenCompose(v -> asyncMap.getAsync("key1"))
.thenApply(value -> "Processed: " + value);
// Handle result
futureResult.whenComplete((result, throwable) -> {
if (throwable == null) {
System.out.println("Result: " + result);
} else {
System.err.println("Error: " + throwable.getMessage());
}
});
// Async lock operations
RLockAsync asyncLock = redisson.getLock("asyncLock");
RFuture<Boolean> lockFuture = asyncLock.tryLockAsync(5, 30, TimeUnit.SECONDS)
.thenCompose(acquired -> {
if (acquired) {
return performAsyncWork()
.whenComplete((result, error) -> {
// Always unlock
asyncLock.unlockAsync();
});
} else {
return CompletableFuture.failedFuture(
new RuntimeException("Could not acquire lock")
);
}
});
// Multiple async operations in parallel
RFuture<String> future1 = asyncMap.getAsync("key1");
RFuture<String> future2 = asyncMap.getAsync("key2");
RFuture<String> future3 = asyncMap.getAsync("key3");
RFuture.allOf(future1, future2, future3)
.thenApply(v -> Arrays.asList(
future1.getNow(null),
future2.getNow(null),
future3.getNow(null)
))
.whenComplete((results, error) -> {
if (error == null) {
System.out.println("All results: " + results);
} else {
System.err.println("Error getting results: " + error.getMessage());
}
});// Error handling patterns for reactive streams
public class ReactivePatterns {
// Retry with backoff
public static <T> Mono<T> retryWithBackoff(Mono<T> source, int maxRetries) {
return source.retryWhen(Retry.backoff(maxRetries, Duration.ofMillis(100)));
}
// Timeout handling
public static <T> Mono<T> withTimeout(Mono<T> source, Duration timeout) {
return source.timeout(timeout)
.onErrorResume(TimeoutException.class,
ex -> Mono.error(new RuntimeException("Operation timed out", ex)));
}
// Fallback handling
public static <T> Mono<T> withFallback(Mono<T> primary, Mono<T> fallback) {
return primary.onErrorResume(throwable -> {
System.err.println("Primary failed, using fallback: " + throwable.getMessage());
return fallback;
});
}
}
// Usage examples
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("data");
// Apply patterns
Mono<String> robustOperation = ReactivePatterns.withTimeout(
ReactivePatterns.retryWithBackoff(
reactiveMap.get("important-key"), 3
),
Duration.ofSeconds(10)
);
Mono<String> withFallback = ReactivePatterns.withFallback(
robustOperation,
Mono.just("default-value")
);Install with Tessl CLI
npx tessl i tessl/maven-org-redisson--redisson