CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-redisson--redisson

Valkey and Redis Java client providing complete Real-Time Data Platform with distributed objects and services

Pending
Overview
Eval results
Files

reactive-async.mddocs/

Reactive and Async APIs

Redisson provides comprehensive support for non-blocking, reactive programming with both Reactive Streams and RxJava interfaces. All synchronous operations have corresponding asynchronous variants that return futures or reactive streams.

Capabilities

Client Interfaces

Access to reactive and RxJava client interfaces for non-blocking operations.

/**
 * Get reactive streams client interface
 * @return RedissonReactiveClient for reactive programming with Reactive Streams
 */
public RedissonReactiveClient reactive();

/**
 * Get RxJava client interface  
 * @return RedissonRxClient for RxJava programming
 */
public RedissonRxClient rxJava();

Reactive Client Interfaces:

// Reactive Streams client interface
public interface RedissonReactiveClient {
    // Collections - return reactive variants
    <K, V> RMapReactive<K, V> getMap(String name);
    <V> RListReactive<V> getList(String name);
    <V> RSetReactive<V> getSet(String name);
    <V> RQueueReactive<V> getQueue(String name);
    
    // Locks and synchronization - return reactive variants
    RLockReactive getLock(String name);
    RSemaphoreReactive getSemaphore(String name);
    RCountDownLatchReactive getCountDownLatch(String name);
    
    // Atomic operations - return reactive variants
    RAtomicLongReactive getAtomicLong(String name);
    RAtomicDoubleReactive getAtomicDouble(String name);
    
    // Topics - return reactive variants
    RTopicReactive getTopic(String name);
    RPatternTopicReactive getPatternTopic(String pattern);
    
    // Lifecycle methods
    Mono<Void> shutdown();
    Mono<Void> shutdown(long quietPeriod, long timeout, TimeUnit unit);
    boolean isShutdown();
    boolean isShuttingDown();
}

// RxJava client interface
public interface RedissonRxClient {
    // Collections - return RxJava variants
    <K, V> RMapRx<K, V> getMap(String name);
    <V> RListRx<V> getList(String name); 
    <V> RSetRx<V> getSet(String name);
    <V> RQueueRx<V> getQueue(String name);
    
    // Locks and synchronization - return RxJava variants
    RLockRx getLock(String name);
    RSemaphoreRx getSemaphore(String name);
    RCountDownLatchRx getCountDownLatch(String name);
    
    // Atomic operations - return RxJava variants
    RAtomicLongRx getAtomicLong(String name);
    RAtomicDoubleRx getAtomicDouble(String name);
    
    // Topics - return RxJava variants
    RTopicRx getTopic(String name);
    RPatternTopicRx getPatternTopic(String pattern);
    
    // Lifecycle methods
    Completable shutdown();
    Completable shutdown(long quietPeriod, long timeout, TimeUnit unit);
    boolean isShutdown();
    boolean isShuttingDown();
}

Usage Examples:

import org.redisson.api.*;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.Completable;

// Get reactive clients
RedissonReactiveClient reactiveClient = redisson.reactive();
RedissonRxClient rxClient = redisson.rxJava();

// Reactive streams example
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("users");
Mono<String> putResult = reactiveMap.put("user1", "Alice")
    .then(reactiveMap.get("user1"))
    .doOnNext(value -> System.out.println("Retrieved: " + value));

// RxJava example
RMapRx<String, String> rxMap = rxClient.getMap("products");
Single<String> rxResult = rxMap.put("product1", "Laptop")
    .andThen(rxMap.get("product1"))
    .doOnSuccess(value -> System.out.println("Retrieved: " + value));

Async Interfaces

All synchronous interfaces extend async counterparts that return RFuture<T> for non-blocking operations.

// Base async interface
public interface RObjectAsync {
    RFuture<Boolean> touchAsync();
    RFuture<Boolean> unlinkAsync();
    RFuture<Boolean> deleteAsync();
    RFuture<Boolean> isExistsAsync();
    RFuture<Void> renameAsync(String newName);
    RFuture<Boolean> renameNXAsync(String newName);
    RFuture<Boolean> copyAsync(String host, int port, int database, long timeout);
    RFuture<Boolean> migrateAsync(String host, int port, int database, long timeout);
    RFuture<Boolean> moveAsync(int database);
    RFuture<Long> sizeInMemoryAsync();
    RFuture<Void> restoreAsync(byte[] state);
    RFuture<Void> restoreAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
    RFuture<Void> restoreAndReplaceAsync(byte[] state);
    RFuture<Void> restoreAndReplaceAsync(byte[] state, long timeToLive, TimeUnit timeUnit);
    RFuture<byte[]> dumpAsync();
}

// Async expirable interface
public interface RExpirableAsync extends RObjectAsync {
    RFuture<Boolean> expireAsync(long timeToLive, TimeUnit timeUnit);
    RFuture<Boolean> expireAtAsync(Date timestamp);
    RFuture<Boolean> expireAtAsync(long timestamp);
    RFuture<Boolean> clearExpireAsync();
    RFuture<Long> remainTimeToLiveAsync();
    RFuture<Long> getExpireTimeAsync();
}

RFuture Interface:

// Redisson's future interface extending Java's CompletableFuture
public interface RFuture<T> extends CompletableFuture<T> {
    // Standard CompletableFuture methods available
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    T get() throws InterruptedException, ExecutionException;
    T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
    // Additional async composition methods
    RFuture<T> sync();
    RFuture<T> syncUninterruptibly();
    T syncUninterruptibly();
    RFuture<T> await();
    RFuture<T> awaitUninterruptibly();
    boolean await(long timeout, TimeUnit unit);
    boolean await(long timeoutMillis);
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    
    // Success/failure callbacks
    RFuture<T> onComplete(BiConsumer<? super T, ? super Throwable> action);
    
    // Conversion to reactive types
    Mono<T> toMono();
    Single<T> toSingle();
}

Reactive Map Operations

Reactive map interface with non-blocking operations returning reactive streams.

public interface RMapReactive<K, V> extends RObjectReactive {
    // Basic operations
    Mono<V> put(K key, V value);
    Mono<V> putIfAbsent(K key, V value);
    Mono<V> get(K key);
    Mono<V> remove(K key);
    Mono<Boolean> containsKey(K key);
    Mono<Boolean> containsValue(V value);
    
    // Bulk operations
    Mono<Integer> size();
    Mono<Boolean> isEmpty();
    Mono<Void> clear();
    Flux<K> keyIterator();
    Flux<V> valueIterator();
    Flux<Entry<K, V>> entryIterator();
    
    // Advanced operations
    Mono<V> addAndGet(K key, Number delta);
    Flux<K> readAllKeySet();
    Flux<V> readAllValues();
    Flux<Entry<K, V>> readAllEntrySet();
    Mono<Map<K, V>> readAllMap();
    
    // Fast operations
    Mono<Long> fastPut(K key, V value);
    Mono<Boolean> fastPutIfAbsent(K key, V value);
    Mono<Long> fastRemove(K... keys);
    
    // Batch operations
    Mono<Void> putAll(Map<? extends K, ? extends V> map);
    Mono<Map<K, V>> getAll(Set<K> keys);
    Mono<Long> removeAll(Set<K> keys);
}

Usage Examples:

// Reactive map operations
RMapReactive<String, User> userMap = reactiveClient.getMap("users");

// Chain reactive operations
Mono<String> pipeline = userMap.put("user1", new User("Alice", 25))
    .then(userMap.get("user1"))
    .map(User::getName)
    .doOnNext(name -> System.out.println("User: " + name));

// Execute the pipeline
pipeline.subscribe();

// Multiple operations in sequence
Flux<String> userNames = userMap.putAll(Map.of(
        "user1", new User("Alice", 25),
        "user2", new User("Bob", 30),
        "user3", new User("Charlie", 35)
    ))
    .thenMany(userMap.readAllValues())
    .map(User::getName)
    .doOnNext(name -> System.out.println("Found user: " + name));

userNames.collectList().subscribe(names -> {
    System.out.println("All users: " + names);
});

Reactive Lock Operations

Reactive lock interface for non-blocking distributed locking.

public interface RLockReactive extends RObjectReactive {
    // Lock acquisition
    Mono<Void> lock();
    Mono<Void> lock(long leaseTime, TimeUnit unit);
    
    // Try lock operations
    Mono<Boolean> tryLock();
    Mono<Boolean> tryLock(long waitTime, TimeUnit unit);
    Mono<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
    
    // Lock release
    Mono<Void> unlock();
    Mono<Void> forceUnlock();
    
    // Lock status
    Mono<Boolean> isLocked();
    Mono<Boolean> isHeldByCurrentThread();
    Mono<Integer> getHoldCount();
    Mono<Long> remainTimeToLive();
}

Usage Examples:

// Reactive lock operations
RLockReactive lock = reactiveClient.getLock("processLock");

// Try lock with timeout and automatic release
Mono<String> protectedOperation = lock.tryLock(5, 30, TimeUnit.SECONDS)
    .flatMap(acquired -> {
        if (acquired) {
            return performCriticalOperation()
                .doFinally(signalType -> lock.unlock().subscribe());
        } else {
            return Mono.error(new RuntimeException("Could not acquire lock"));
        }
    });

protectedOperation.subscribe(
    result -> System.out.println("Operation result: " + result),
    error -> System.err.println("Operation failed: " + error.getMessage())
);

// Lock with error handling
lock.lock()
    .then(performProtectedWork())
    .doOnError(error -> System.err.println("Error during work: " + error))
    .doFinally(signalType -> {
        // Always unlock, even on error
        lock.unlock().subscribe();
    })
    .subscribe();

RxJava Integration

RxJava interfaces for reactive programming with RxJava types.

// RxJava map interface
public interface RMapRx<K, V> extends RObjectRx {
    // Basic operations returning RxJava types
    Single<V> put(K key, V value);
    Maybe<V> putIfAbsent(K key, V value);
    Maybe<V> get(K key);
    Maybe<V> remove(K key);
    Single<Boolean> containsKey(K key);
    Single<Boolean> containsValue(V value);
    
    // Bulk operations
    Single<Integer> size();
    Single<Boolean> isEmpty();
    Completable clear();
    Observable<K> keyIterator();
    Observable<V> valueIterator();
    Observable<Entry<K, V>> entryIterator();
    
    // Fast operations
    Single<Long> fastPut(K key, V value);
    Single<Boolean> fastPutIfAbsent(K key, V value);
    Single<Long> fastRemove(K... keys);
}

// RxJava lock interface
public interface RLockRx extends RObjectRx {
    Completable lock();
    Completable lock(long leaseTime, TimeUnit unit);
    Single<Boolean> tryLock();
    Single<Boolean> tryLock(long waitTime, TimeUnit unit);
    Single<Boolean> tryLock(long waitTime, long leaseTime, TimeUnit unit);
    Completable unlock();
    Completable forceUnlock();
    Single<Boolean> isLocked();
    Single<Boolean> isHeldByCurrentThread();
    Single<Integer> getHoldCount();
    Single<Long> remainTimeToLive();
}

RxJava Examples:

// RxJava map operations
RMapRx<String, String> rxMap = rxClient.getMap("cache");

// Chain RxJava operations
Single<String> result = rxMap.put("key1", "value1")
    .flatMap(previous -> rxMap.get("key1"))
    .doOnSuccess(value -> System.out.println("Retrieved: " + value));

result.subscribe(
    value -> System.out.println("Final result: " + value),
    error -> System.err.println("Error: " + error.getMessage())
);

// RxJava lock operations
RLockRx rxLock = rxClient.getLock("rxLock");

Completable lockOperation = rxLock.tryLock(5, TimeUnit.SECONDS)
    .flatMapCompletable(acquired -> {
        if (acquired) {
            return performRxWork()
                .doFinally(() -> rxLock.unlock().subscribe());
        } else {
            return Completable.error(new RuntimeException("Lock not acquired"));
        }
    });

lockOperation.subscribe(
    () -> System.out.println("Operation completed successfully"),
    error -> System.err.println("Operation failed: " + error.getMessage())
);

Async Collections

Async interfaces for all collection types with future-based operations.

// Async map interface  
public interface RMapAsync<K, V> extends RObjectAsync, RExpirableAsync {
    RFuture<V> putAsync(K key, V value);
    RFuture<V> putIfAbsentAsync(K key, V value);
    RFuture<V> getAsync(K key);
    RFuture<V> removeAsync(K key);
    RFuture<Boolean> containsKeyAsync(K key);
    RFuture<Boolean> containsValueAsync(V value);
    
    RFuture<Integer> sizeAsync();
    RFuture<Boolean> isEmptyAsync();
    RFuture<Void> clearAsync();
    
    RFuture<Set<K>> keySetAsync();
    RFuture<Collection<V>> valuesAsync();
    RFuture<Set<Entry<K, V>>> entrySetAsync();
    
    RFuture<Map<K, V>> getAllAsync(Set<K> keys);
    RFuture<Void> putAllAsync(Map<? extends K, ? extends V> map);
}

// Async list interface
public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<V>> {
    RFuture<V> getAsync(int index);
    RFuture<V> setAsync(int index, V element);
    RFuture<Void> addAsync(int index, V element);
    RFuture<V> removeAsync(int index);
    
    RFuture<Integer> indexOfAsync(Object o);
    RFuture<Integer> lastIndexOfAsync(Object o);
    
    RFuture<List<V>> rangeAsync(int fromIndex, int toIndex);
    RFuture<Void> trimAsync(int fromIndex, int toIndex);
}

// Async lock interface
public interface RLockAsync extends RObjectAsync {
    RFuture<Void> lockAsync();
    RFuture<Void> lockAsync(long leaseTime, TimeUnit unit);
    RFuture<Boolean> tryLockAsync();
    RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit);
    RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit);
    RFuture<Void> unlockAsync();
    RFuture<Void> forceUnlockAsync();
    RFuture<Boolean> isLockedAsync();
    RFuture<Boolean> isHeldByCurrentThreadAsync();
    RFuture<Integer> getHoldCountAsync();
    RFuture<Long> remainTimeToLiveAsync();
}

Async Usage Examples:

// Async map operations with CompletableFuture
RMapAsync<String, String> asyncMap = redisson.getMap("asyncData");

// Chain async operations
RFuture<String> futureResult = asyncMap.putAsync("key1", "value1")
    .thenCompose(v -> asyncMap.getAsync("key1"))
    .thenApply(value -> "Processed: " + value);

// Handle result
futureResult.whenComplete((result, throwable) -> {
    if (throwable == null) {
        System.out.println("Result: " + result);
    } else {
        System.err.println("Error: " + throwable.getMessage());
    }
});

// Async lock operations
RLockAsync asyncLock = redisson.getLock("asyncLock");

RFuture<Boolean> lockFuture = asyncLock.tryLockAsync(5, 30, TimeUnit.SECONDS)
    .thenCompose(acquired -> {
        if (acquired) {
            return performAsyncWork()
                .whenComplete((result, error) -> {
                    // Always unlock
                    asyncLock.unlockAsync();
                });
        } else {
            return CompletableFuture.failedFuture(
                new RuntimeException("Could not acquire lock")
            );
        }
    });

// Multiple async operations in parallel
RFuture<String> future1 = asyncMap.getAsync("key1");
RFuture<String> future2 = asyncMap.getAsync("key2");
RFuture<String> future3 = asyncMap.getAsync("key3");

RFuture.allOf(future1, future2, future3)
    .thenApply(v -> Arrays.asList(
        future1.getNow(null),
        future2.getNow(null), 
        future3.getNow(null)
    ))
    .whenComplete((results, error) -> {
        if (error == null) {
            System.out.println("All results: " + results);
        } else {
            System.err.println("Error getting results: " + error.getMessage());
        }
    });

Reactive Error Handling and Patterns

// Error handling patterns for reactive streams
public class ReactivePatterns {
    
    // Retry with backoff
    public static <T> Mono<T> retryWithBackoff(Mono<T> source, int maxRetries) {
        return source.retryWhen(Retry.backoff(maxRetries, Duration.ofMillis(100)));
    }
    
    // Timeout handling
    public static <T> Mono<T> withTimeout(Mono<T> source, Duration timeout) {
        return source.timeout(timeout)
            .onErrorResume(TimeoutException.class, 
                ex -> Mono.error(new RuntimeException("Operation timed out", ex)));
    }
    
    // Fallback handling
    public static <T> Mono<T> withFallback(Mono<T> primary, Mono<T> fallback) {
        return primary.onErrorResume(throwable -> {
            System.err.println("Primary failed, using fallback: " + throwable.getMessage());
            return fallback;
        });
    }
}

// Usage examples
RMapReactive<String, String> reactiveMap = reactiveClient.getMap("data");

// Apply patterns
Mono<String> robustOperation = ReactivePatterns.withTimeout(
    ReactivePatterns.retryWithBackoff(
        reactiveMap.get("important-key"), 3
    ), 
    Duration.ofSeconds(10)
);

Mono<String> withFallback = ReactivePatterns.withFallback(
    robustOperation,
    Mono.just("default-value")
);

Install with Tessl CLI

npx tessl i tessl/maven-org-redisson--redisson

docs

collections.md

configuration.md

data-structures.md

index.md

messaging.md

reactive-async.md

synchronization.md

tile.json