CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-redisson--redisson

Valkey and Redis Java client providing complete Real-Time Data Platform with distributed objects and services

Pending
Overview
Eval results
Files

messaging.mddocs/

Pub/Sub Messaging

Redisson provides a comprehensive publish/subscribe messaging system built on Redis pub/sub capabilities. It supports regular topics, pattern-based subscriptions, sharded topics for scalability, and reliable topics with guaranteed delivery.

Capabilities

Basic Topics

Standard publish/subscribe topics for real-time messaging between distributed components.

/**
 * Get a topic for publish/subscribe messaging
 * @param name - unique name of the topic
 * @return RTopic instance
 */
public RTopic getTopic(String name);
public RTopic getTopic(String name, Codec codec);
public RTopic getTopic(PlainOptions options);

Topic Interface:

public interface RTopic extends RTopicAsync {
    // Publishing messages
    long publish(Object message);
    
    // Subscribing to messages
    int addListener(MessageListener<Object> listener);
    int addListener(Class<?> type, MessageListener<?> listener);
    
    // Listener management
    void removeListener(int listenerId);
    void removeAllListeners();
    
    // Topic information
    List<String> getChannelNames();
    long countListeners();
    long countSubscribers();
}

// Message listener interface
@FunctionalInterface
public interface MessageListener<M> {
    void onMessage(CharSequence channel, M msg);
}

Usage Examples:

import org.redisson.api.*;

// Get topic
RTopic topic = redisson.getTopic("notifications");

// Subscribe to messages
int listenerId = topic.addListener(String.class, (channel, message) -> {
    System.out.println("Received on " + channel + ": " + message);
});

// Publish messages
long subscribersCount = topic.publish("Hello, subscribers!");
System.out.println("Message delivered to " + subscribersCount + " subscribers");

// Publish different message types
RTopic<User> userTopic = redisson.getTopic("user-events");
userTopic.addListener(User.class, (channel, user) -> {
    System.out.println("User event: " + user.getName());
});
userTopic.publish(new User("Alice", 25));

// Remove listener when done
topic.removeListener(listenerId);

// Multiple listeners for same topic
RTopic eventTopic = redisson.getTopic("events");

int listener1 = eventTopic.addListener(String.class, (channel, msg) -> {
    System.out.println("Listener 1: " + msg);
});

int listener2 = eventTopic.addListener(String.class, (channel, msg) -> {
    System.out.println("Listener 2: " + msg);
});

eventTopic.publish("Event message"); // Both listeners receive this

// Remove specific listener
eventTopic.removeListener(listener1);
eventTopic.publish("Another event"); // Only listener2 receives this

Pattern Topics

Pattern-based subscriptions allowing wildcard matching for dynamic topic subscription.

/**
 * Get a pattern topic for wildcard subscriptions
 * @param pattern - pattern with wildcards (* and ?)
 * @return RPatternTopic instance
 */
public RPatternTopic getPatternTopic(String pattern);
public RPatternTopic getPatternTopic(String pattern, Codec codec);
public RPatternTopic getPatternTopic(PatternTopicOptions options);

Pattern Topic Interface:

public interface RPatternTopic extends RPatternTopicAsync {
    // Subscribe with pattern matching
    int addListener(PatternMessageListener<Object> listener);
    int addListener(Class<?> type, PatternMessageListener<?> listener);
    
    // Listener management
    void removeListener(int listenerId);
    void removeAllListeners();
    
    // Pattern information
    List<String> getPatternNames();
    long countListeners();
}

// Pattern message listener interface
@FunctionalInterface
public interface PatternMessageListener<M> {
    void onMessage(CharSequence pattern, CharSequence channel, M msg);
}

Usage Examples:

// Pattern subscription - listen to multiple related topics
RPatternTopic patternTopic = redisson.getPatternTopic("user.*");

int patternListener = patternTopic.addListener(String.class, (pattern, channel, message) -> {
    System.out.println("Pattern: " + pattern + ", Channel: " + channel + ", Message: " + message);
});

// These will all match the pattern "user.*"
RTopic userLoginTopic = redisson.getTopic("user.login");
RTopic userLogoutTopic = redisson.getTopic("user.logout");  
RTopic userUpdateTopic = redisson.getTopic("user.update");

userLoginTopic.publish("User John logged in");    // Matches pattern
userLogoutTopic.publish("User John logged out");  // Matches pattern
userUpdateTopic.publish("User John updated profile"); // Matches pattern

// More specific patterns
RPatternTopic orderPattern = redisson.getPatternTopic("order.*.created");
orderPattern.addListener(String.class, (pattern, channel, message) -> {
    System.out.println("New order created: " + message);
});

// Matches: order.electronics.created, order.books.created, etc.
redisson.getTopic("order.electronics.created").publish("Order #123 created");
redisson.getTopic("order.books.created").publish("Order #456 created");

// Multiple patterns
RPatternTopic alertPattern = redisson.getPatternTopic("alert.*");
RPatternTopic errorPattern = redisson.getPatternTopic("error.*");

alertPattern.addListener(String.class, (pattern, channel, msg) -> {
    System.out.println("ALERT - " + channel + ": " + msg);
});

errorPattern.addListener(String.class, (pattern, channel, msg) -> {
    System.err.println("ERROR - " + channel + ": " + msg);
});

Sharded Topics

Sharded topics distribute messages across multiple Redis nodes for improved scalability and performance.

/**
 * Get a sharded topic for scalable messaging
 * @param name - unique name of the sharded topic
 * @return RShardedTopic instance
 */
public RShardedTopic getShardedTopic(String name);
public RShardedTopic getShardedTopic(String name, Codec codec);
public RShardedTopic getShardedTopic(PlainOptions options);

Sharded Topic Interface:

public interface RShardedTopic extends RShardedTopicAsync {
    // Publishing with sharding
    long publish(Object message);
    
    // Subscribing with automatic shard distribution
    int addListener(MessageListener<Object> listener);
    int addListener(Class<?> type, MessageListener<?> listener);
    
    // Listener management
    void removeListener(int listenerId);
    void removeAllListeners();
    
    // Sharding information
    List<String> getChannelNames();
    long countListeners();
    long countSubscribers();
}

Usage Examples:

// Sharded topic for high-throughput messaging
RShardedTopic shardedTopic = redisson.getShardedTopic("high-volume-events");

// Subscribe - automatically distributed across shards
int shardedListener = shardedTopic.addListener(String.class, (channel, message) -> {
    System.out.println("Sharded message: " + message + " on " + channel);
});

// Publish - automatically distributed across available shards
for (int i = 0; i < 1000; i++) {
    shardedTopic.publish("Message " + i);
}

// Multiple listeners automatically distributed
RShardedTopic eventStream = redisson.getShardedTopic("event-stream");

// Add multiple listeners - they'll be distributed across shards
for (int i = 0; i < 5; i++) {
    final int listenerId = i;
    eventStream.addListener(String.class, (channel, message) -> {
        System.out.println("Listener " + listenerId + " received: " + message);
    });
}

// Messages are distributed across all listeners/shards
for (int i = 0; i < 100; i++) {
    eventStream.publish("Event " + i);
}

Reliable Topics

Reliable topics provide guaranteed message delivery with acknowledgment and replay capabilities.

/**
 * Get a reliable topic with guaranteed delivery
 * @param name - unique name of the reliable topic
 * @return RReliableTopic instance
 */
public RReliableTopic getReliableTopic(String name);
public RReliableTopic getReliableTopic(String name, Codec codec);
public RReliableTopic getReliableTopic(PlainOptions options);

Reliable Topic Interface:

public interface RReliableTopic extends RReliableTopicAsync {
    // Publishing with confirmation
    long publish(Object message);
    
    // Subscribing with acknowledgment
    String addListener(MessageListener<Object> listener);
    String addListener(Class<?> type, MessageListener<?> listener);
    
    // Listener management with IDs
    void removeListener(String listenerId);
    void removeAllListeners();
    
    // Message acknowledgment and replay
    long size();
    List<Object> readAll();
    void expire(long timeToLive, TimeUnit timeUnit);
    void expireAt(long timestamp);
    long remainTimeToLive();
}

Usage Examples:

// Reliable topic ensures message delivery
RReliableTopic reliableTopic = redisson.getReliableTopic("critical-events");

// Subscribe with guaranteed delivery
String listenerId = reliableTopic.addListener(String.class, (channel, message) -> {
    System.out.println("Processing critical event: " + message);
    // Message is automatically acknowledged after successful processing
});

// Publish critical messages
reliableTopic.publish("System alert: High CPU usage");
reliableTopic.publish("System alert: Low disk space");

// Check message queue size
long pendingMessages = reliableTopic.size();
System.out.println("Pending messages: " + pendingMessages);

// Read all unprocessed messages (useful for debugging)
List<Object> allMessages = reliableTopic.readAll();
System.out.println("All messages in queue: " + allMessages);

// Set expiration for old messages
reliableTopic.expire(1, TimeUnit.HOURS); // Messages expire after 1 hour

// Multiple reliable listeners
RReliableTopic orderTopic = redisson.getReliableTopic("order-processing");

String processor1 = orderTopic.addListener(Order.class, (channel, order) -> {
    System.out.println("Processor 1 handling order: " + order.getId());
    processOrder(order);
});

String processor2 = orderTopic.addListener(Order.class, (channel, order) -> {
    System.out.println("Processor 2 handling order: " + order.getId());
    processOrder(order);
});

// Orders are reliably delivered to all processors
orderTopic.publish(new Order("123", "Product A"));
orderTopic.publish(new Order("124", "Product B"));

Async Topic Operations

All topic operations have async variants returning RFuture<T>.

// Async topic interface
public interface RTopicAsync extends RObjectAsync {
    RFuture<Long> publishAsync(Object message);
    RFuture<Integer> addListenerAsync(MessageListener<Object> listener);
    RFuture<Integer> addListenerAsync(Class<?> type, MessageListener<?> listener);
    RFuture<Void> removeListenerAsync(int listenerId);
    RFuture<Void> removeAllListenersAsync();
    RFuture<Long> countListenersAsync();
    RFuture<Long> countSubscribersAsync();
}

// Async pattern topic interface  
public interface RPatternTopicAsync extends RObjectAsync {
    RFuture<Integer> addListenerAsync(PatternMessageListener<Object> listener);
    RFuture<Integer> addListenerAsync(Class<?> type, PatternMessageListener<?> listener);
    RFuture<Void> removeListenerAsync(int listenerId);
    RFuture<Void> removeAllListenersAsync();
    RFuture<Long> countListenersAsync();
}

// Async reliable topic interface
public interface RReliableTopicAsync extends RObjectAsync, RExpirableAsync {
    RFuture<Long> publishAsync(Object message);
    RFuture<String> addListenerAsync(MessageListener<Object> listener);
    RFuture<String> addListenerAsync(Class<?> type, MessageListener<?> listener);
    RFuture<Void> removeListenerAsync(String listenerId);
    RFuture<Void> removeAllListenersAsync();
    RFuture<Long> sizeAsync();
    RFuture<List<Object>> readAllAsync();
}

Async Usage Examples:

// Async topic operations
RTopicAsync asyncTopic = redisson.getTopic("async-events");

// Async subscribe
RFuture<Integer> listenerFuture = asyncTopic.addListenerAsync(String.class, (channel, message) -> {
    System.out.println("Async message: " + message);
});

listenerFuture.whenComplete((listenerId, error) -> {
    if (error == null) {
        System.out.println("Listener added with ID: " + listenerId);
        
        // Async publish after successful subscription
        asyncTopic.publishAsync("Hello async world!")
            .whenComplete((subscribers, publishError) -> {
                if (publishError == null) {
                    System.out.println("Message sent to " + subscribers + " subscribers");
                } else {
                    System.err.println("Publish failed: " + publishError.getMessage());
                }
            });
    } else {
        System.err.println("Failed to add listener: " + error.getMessage());
    }
});

// Chain async operations
RFuture<Long> chainedOperation = asyncTopic.countSubscribersAsync()
    .thenCompose(count -> {
        System.out.println("Current subscribers: " + count);
        return asyncTopic.publishAsync("Subscriber count: " + count);
    })
    .thenCompose(delivered -> {
        System.out.println("Message delivered to: " + delivered);
        return asyncTopic.countListenersAsync();
    });

chainedOperation.whenComplete((listeners, error) -> {
    if (error == null) {
        System.out.println("Total listeners: " + listeners);
    } else {
        System.err.println("Operation chain failed: " + error.getMessage());
    }
});

Message Filtering and Transformation

Advanced messaging patterns with filtering and transformation capabilities.

// Message filtering example
public class MessageFiltering {
    
    public static void setupFilteredTopic(RedissonClient redisson) {
        RTopic topic = redisson.getTopic("filtered-events");
        
        // Filter messages by type
        topic.addListener(String.class, (channel, message) -> {
            if (message.startsWith("URGENT:")) {
                handleUrgentMessage(message);
            }
        });
        
        topic.addListener(String.class, (channel, message) -> {
            if (message.startsWith("INFO:")) {
                handleInfoMessage(message);
            }
        });
    }
    
    // Message transformation example
    public static void setupTransformingTopic(RedissonClient redisson) {
        RTopic<Map<String, Object>> eventTopic = redisson.getTopic("raw-events");
        RTopic<ProcessedEvent> processedTopic = redisson.getTopic("processed-events");
        
        // Transform and republish messages
        eventTopic.addListener(Map.class, (channel, rawEvent) -> {
            ProcessedEvent processed = transformEvent(rawEvent);
            processedTopic.publish(processed);
        });
    }
}

// Custom message types
public class ProcessedEvent {
    private String id;
    private String type;
    private long timestamp;
    private Map<String, Object> data;
    
    // constructors, getters, setters...
}

Topic Configuration Options

// Pattern topic options
public class PatternTopicOptions extends PlainOptions {
    private String pattern;
    
    public PatternTopicOptions pattern(String pattern);
    public String getPattern();
}

// Topic listener configuration
public class TopicListener<M> {
    private final Class<M> messageClass;
    private final MessageListener<M> listener;
    private final boolean removeOnError;
    
    public TopicListener(Class<M> messageClass, MessageListener<M> listener);
    public TopicListener(Class<M> messageClass, MessageListener<M> listener, boolean removeOnError);
    
    public Class<M> getMessageClass();
    public MessageListener<M> getListener();
    public boolean isRemoveOnError();
}

// Reliable topic configuration
public class ReliableTopicOptions extends PlainOptions {
    private long watchdogTimeout = 10 * 60000; // 10 minutes
    private int batchSize = 100;
    
    public ReliableTopicOptions watchdogTimeout(long watchdogTimeout, TimeUnit timeUnit);
    public ReliableTopicOptions batchSize(int batchSize);
    
    public long getWatchdogTimeout();
    public int getBatchSize();
}

Configuration Examples:

// Configure pattern topic
PatternTopicOptions patternOptions = new PatternTopicOptions()
    .pattern("system.*.alerts")
    .codec(new JsonJacksonCodec());
    
RPatternTopic patternTopic = redisson.getPatternTopic(patternOptions);

// Configure reliable topic with custom settings
ReliableTopicOptions reliableOptions = new ReliableTopicOptions()
    .watchdogTimeout(30, TimeUnit.MINUTES)
    .batchSize(50)
    .codec(new KryoCodec());
    
RReliableTopic reliableTopic = redisson.getReliableTopic(reliableOptions);

// Error handling for listeners
RTopic errorHandlingTopic = redisson.getTopic("error-prone-events");

errorHandlingTopic.addListener(String.class, (channel, message) -> {
    try {
        processMessage(message);
    } catch (Exception e) {
        System.err.println("Error processing message: " + e.getMessage());
        // Message processing failed but listener remains active
    }
});

Install with Tessl CLI

npx tessl i tessl/maven-org-redisson--redisson

docs

collections.md

configuration.md

data-structures.md

index.md

messaging.md

reactive-async.md

synchronization.md

tile.json