Valkey and Redis Java client providing complete Real-Time Data Platform with distributed objects and services
—
Redisson provides a comprehensive publish/subscribe messaging system built on Redis pub/sub capabilities. It supports regular topics, pattern-based subscriptions, sharded topics for scalability, and reliable topics with guaranteed delivery.
Standard publish/subscribe topics for real-time messaging between distributed components.
/**
* Get a topic for publish/subscribe messaging
* @param name - unique name of the topic
* @return RTopic instance
*/
public RTopic getTopic(String name);
public RTopic getTopic(String name, Codec codec);
public RTopic getTopic(PlainOptions options);Topic Interface:
public interface RTopic extends RTopicAsync {
// Publishing messages
long publish(Object message);
// Subscribing to messages
int addListener(MessageListener<Object> listener);
int addListener(Class<?> type, MessageListener<?> listener);
// Listener management
void removeListener(int listenerId);
void removeAllListeners();
// Topic information
List<String> getChannelNames();
long countListeners();
long countSubscribers();
}
// Message listener interface
@FunctionalInterface
public interface MessageListener<M> {
void onMessage(CharSequence channel, M msg);
}Usage Examples:
import org.redisson.api.*;
// Get topic
RTopic topic = redisson.getTopic("notifications");
// Subscribe to messages
int listenerId = topic.addListener(String.class, (channel, message) -> {
System.out.println("Received on " + channel + ": " + message);
});
// Publish messages
long subscribersCount = topic.publish("Hello, subscribers!");
System.out.println("Message delivered to " + subscribersCount + " subscribers");
// Publish different message types
RTopic<User> userTopic = redisson.getTopic("user-events");
userTopic.addListener(User.class, (channel, user) -> {
System.out.println("User event: " + user.getName());
});
userTopic.publish(new User("Alice", 25));
// Remove listener when done
topic.removeListener(listenerId);
// Multiple listeners for same topic
RTopic eventTopic = redisson.getTopic("events");
int listener1 = eventTopic.addListener(String.class, (channel, msg) -> {
System.out.println("Listener 1: " + msg);
});
int listener2 = eventTopic.addListener(String.class, (channel, msg) -> {
System.out.println("Listener 2: " + msg);
});
eventTopic.publish("Event message"); // Both listeners receive this
// Remove specific listener
eventTopic.removeListener(listener1);
eventTopic.publish("Another event"); // Only listener2 receives thisPattern-based subscriptions allowing wildcard matching for dynamic topic subscription.
/**
* Get a pattern topic for wildcard subscriptions
* @param pattern - pattern with wildcards (* and ?)
* @return RPatternTopic instance
*/
public RPatternTopic getPatternTopic(String pattern);
public RPatternTopic getPatternTopic(String pattern, Codec codec);
public RPatternTopic getPatternTopic(PatternTopicOptions options);Pattern Topic Interface:
public interface RPatternTopic extends RPatternTopicAsync {
// Subscribe with pattern matching
int addListener(PatternMessageListener<Object> listener);
int addListener(Class<?> type, PatternMessageListener<?> listener);
// Listener management
void removeListener(int listenerId);
void removeAllListeners();
// Pattern information
List<String> getPatternNames();
long countListeners();
}
// Pattern message listener interface
@FunctionalInterface
public interface PatternMessageListener<M> {
void onMessage(CharSequence pattern, CharSequence channel, M msg);
}Usage Examples:
// Pattern subscription - listen to multiple related topics
RPatternTopic patternTopic = redisson.getPatternTopic("user.*");
int patternListener = patternTopic.addListener(String.class, (pattern, channel, message) -> {
System.out.println("Pattern: " + pattern + ", Channel: " + channel + ", Message: " + message);
});
// These will all match the pattern "user.*"
RTopic userLoginTopic = redisson.getTopic("user.login");
RTopic userLogoutTopic = redisson.getTopic("user.logout");
RTopic userUpdateTopic = redisson.getTopic("user.update");
userLoginTopic.publish("User John logged in"); // Matches pattern
userLogoutTopic.publish("User John logged out"); // Matches pattern
userUpdateTopic.publish("User John updated profile"); // Matches pattern
// More specific patterns
RPatternTopic orderPattern = redisson.getPatternTopic("order.*.created");
orderPattern.addListener(String.class, (pattern, channel, message) -> {
System.out.println("New order created: " + message);
});
// Matches: order.electronics.created, order.books.created, etc.
redisson.getTopic("order.electronics.created").publish("Order #123 created");
redisson.getTopic("order.books.created").publish("Order #456 created");
// Multiple patterns
RPatternTopic alertPattern = redisson.getPatternTopic("alert.*");
RPatternTopic errorPattern = redisson.getPatternTopic("error.*");
alertPattern.addListener(String.class, (pattern, channel, msg) -> {
System.out.println("ALERT - " + channel + ": " + msg);
});
errorPattern.addListener(String.class, (pattern, channel, msg) -> {
System.err.println("ERROR - " + channel + ": " + msg);
});Sharded topics distribute messages across multiple Redis nodes for improved scalability and performance.
/**
* Get a sharded topic for scalable messaging
* @param name - unique name of the sharded topic
* @return RShardedTopic instance
*/
public RShardedTopic getShardedTopic(String name);
public RShardedTopic getShardedTopic(String name, Codec codec);
public RShardedTopic getShardedTopic(PlainOptions options);Sharded Topic Interface:
public interface RShardedTopic extends RShardedTopicAsync {
// Publishing with sharding
long publish(Object message);
// Subscribing with automatic shard distribution
int addListener(MessageListener<Object> listener);
int addListener(Class<?> type, MessageListener<?> listener);
// Listener management
void removeListener(int listenerId);
void removeAllListeners();
// Sharding information
List<String> getChannelNames();
long countListeners();
long countSubscribers();
}Usage Examples:
// Sharded topic for high-throughput messaging
RShardedTopic shardedTopic = redisson.getShardedTopic("high-volume-events");
// Subscribe - automatically distributed across shards
int shardedListener = shardedTopic.addListener(String.class, (channel, message) -> {
System.out.println("Sharded message: " + message + " on " + channel);
});
// Publish - automatically distributed across available shards
for (int i = 0; i < 1000; i++) {
shardedTopic.publish("Message " + i);
}
// Multiple listeners automatically distributed
RShardedTopic eventStream = redisson.getShardedTopic("event-stream");
// Add multiple listeners - they'll be distributed across shards
for (int i = 0; i < 5; i++) {
final int listenerId = i;
eventStream.addListener(String.class, (channel, message) -> {
System.out.println("Listener " + listenerId + " received: " + message);
});
}
// Messages are distributed across all listeners/shards
for (int i = 0; i < 100; i++) {
eventStream.publish("Event " + i);
}Reliable topics provide guaranteed message delivery with acknowledgment and replay capabilities.
/**
* Get a reliable topic with guaranteed delivery
* @param name - unique name of the reliable topic
* @return RReliableTopic instance
*/
public RReliableTopic getReliableTopic(String name);
public RReliableTopic getReliableTopic(String name, Codec codec);
public RReliableTopic getReliableTopic(PlainOptions options);Reliable Topic Interface:
public interface RReliableTopic extends RReliableTopicAsync {
// Publishing with confirmation
long publish(Object message);
// Subscribing with acknowledgment
String addListener(MessageListener<Object> listener);
String addListener(Class<?> type, MessageListener<?> listener);
// Listener management with IDs
void removeListener(String listenerId);
void removeAllListeners();
// Message acknowledgment and replay
long size();
List<Object> readAll();
void expire(long timeToLive, TimeUnit timeUnit);
void expireAt(long timestamp);
long remainTimeToLive();
}Usage Examples:
// Reliable topic ensures message delivery
RReliableTopic reliableTopic = redisson.getReliableTopic("critical-events");
// Subscribe with guaranteed delivery
String listenerId = reliableTopic.addListener(String.class, (channel, message) -> {
System.out.println("Processing critical event: " + message);
// Message is automatically acknowledged after successful processing
});
// Publish critical messages
reliableTopic.publish("System alert: High CPU usage");
reliableTopic.publish("System alert: Low disk space");
// Check message queue size
long pendingMessages = reliableTopic.size();
System.out.println("Pending messages: " + pendingMessages);
// Read all unprocessed messages (useful for debugging)
List<Object> allMessages = reliableTopic.readAll();
System.out.println("All messages in queue: " + allMessages);
// Set expiration for old messages
reliableTopic.expire(1, TimeUnit.HOURS); // Messages expire after 1 hour
// Multiple reliable listeners
RReliableTopic orderTopic = redisson.getReliableTopic("order-processing");
String processor1 = orderTopic.addListener(Order.class, (channel, order) -> {
System.out.println("Processor 1 handling order: " + order.getId());
processOrder(order);
});
String processor2 = orderTopic.addListener(Order.class, (channel, order) -> {
System.out.println("Processor 2 handling order: " + order.getId());
processOrder(order);
});
// Orders are reliably delivered to all processors
orderTopic.publish(new Order("123", "Product A"));
orderTopic.publish(new Order("124", "Product B"));All topic operations have async variants returning RFuture<T>.
// Async topic interface
public interface RTopicAsync extends RObjectAsync {
RFuture<Long> publishAsync(Object message);
RFuture<Integer> addListenerAsync(MessageListener<Object> listener);
RFuture<Integer> addListenerAsync(Class<?> type, MessageListener<?> listener);
RFuture<Void> removeListenerAsync(int listenerId);
RFuture<Void> removeAllListenersAsync();
RFuture<Long> countListenersAsync();
RFuture<Long> countSubscribersAsync();
}
// Async pattern topic interface
public interface RPatternTopicAsync extends RObjectAsync {
RFuture<Integer> addListenerAsync(PatternMessageListener<Object> listener);
RFuture<Integer> addListenerAsync(Class<?> type, PatternMessageListener<?> listener);
RFuture<Void> removeListenerAsync(int listenerId);
RFuture<Void> removeAllListenersAsync();
RFuture<Long> countListenersAsync();
}
// Async reliable topic interface
public interface RReliableTopicAsync extends RObjectAsync, RExpirableAsync {
RFuture<Long> publishAsync(Object message);
RFuture<String> addListenerAsync(MessageListener<Object> listener);
RFuture<String> addListenerAsync(Class<?> type, MessageListener<?> listener);
RFuture<Void> removeListenerAsync(String listenerId);
RFuture<Void> removeAllListenersAsync();
RFuture<Long> sizeAsync();
RFuture<List<Object>> readAllAsync();
}Async Usage Examples:
// Async topic operations
RTopicAsync asyncTopic = redisson.getTopic("async-events");
// Async subscribe
RFuture<Integer> listenerFuture = asyncTopic.addListenerAsync(String.class, (channel, message) -> {
System.out.println("Async message: " + message);
});
listenerFuture.whenComplete((listenerId, error) -> {
if (error == null) {
System.out.println("Listener added with ID: " + listenerId);
// Async publish after successful subscription
asyncTopic.publishAsync("Hello async world!")
.whenComplete((subscribers, publishError) -> {
if (publishError == null) {
System.out.println("Message sent to " + subscribers + " subscribers");
} else {
System.err.println("Publish failed: " + publishError.getMessage());
}
});
} else {
System.err.println("Failed to add listener: " + error.getMessage());
}
});
// Chain async operations
RFuture<Long> chainedOperation = asyncTopic.countSubscribersAsync()
.thenCompose(count -> {
System.out.println("Current subscribers: " + count);
return asyncTopic.publishAsync("Subscriber count: " + count);
})
.thenCompose(delivered -> {
System.out.println("Message delivered to: " + delivered);
return asyncTopic.countListenersAsync();
});
chainedOperation.whenComplete((listeners, error) -> {
if (error == null) {
System.out.println("Total listeners: " + listeners);
} else {
System.err.println("Operation chain failed: " + error.getMessage());
}
});Advanced messaging patterns with filtering and transformation capabilities.
// Message filtering example
public class MessageFiltering {
public static void setupFilteredTopic(RedissonClient redisson) {
RTopic topic = redisson.getTopic("filtered-events");
// Filter messages by type
topic.addListener(String.class, (channel, message) -> {
if (message.startsWith("URGENT:")) {
handleUrgentMessage(message);
}
});
topic.addListener(String.class, (channel, message) -> {
if (message.startsWith("INFO:")) {
handleInfoMessage(message);
}
});
}
// Message transformation example
public static void setupTransformingTopic(RedissonClient redisson) {
RTopic<Map<String, Object>> eventTopic = redisson.getTopic("raw-events");
RTopic<ProcessedEvent> processedTopic = redisson.getTopic("processed-events");
// Transform and republish messages
eventTopic.addListener(Map.class, (channel, rawEvent) -> {
ProcessedEvent processed = transformEvent(rawEvent);
processedTopic.publish(processed);
});
}
}
// Custom message types
public class ProcessedEvent {
private String id;
private String type;
private long timestamp;
private Map<String, Object> data;
// constructors, getters, setters...
}// Pattern topic options
public class PatternTopicOptions extends PlainOptions {
private String pattern;
public PatternTopicOptions pattern(String pattern);
public String getPattern();
}
// Topic listener configuration
public class TopicListener<M> {
private final Class<M> messageClass;
private final MessageListener<M> listener;
private final boolean removeOnError;
public TopicListener(Class<M> messageClass, MessageListener<M> listener);
public TopicListener(Class<M> messageClass, MessageListener<M> listener, boolean removeOnError);
public Class<M> getMessageClass();
public MessageListener<M> getListener();
public boolean isRemoveOnError();
}
// Reliable topic configuration
public class ReliableTopicOptions extends PlainOptions {
private long watchdogTimeout = 10 * 60000; // 10 minutes
private int batchSize = 100;
public ReliableTopicOptions watchdogTimeout(long watchdogTimeout, TimeUnit timeUnit);
public ReliableTopicOptions batchSize(int batchSize);
public long getWatchdogTimeout();
public int getBatchSize();
}Configuration Examples:
// Configure pattern topic
PatternTopicOptions patternOptions = new PatternTopicOptions()
.pattern("system.*.alerts")
.codec(new JsonJacksonCodec());
RPatternTopic patternTopic = redisson.getPatternTopic(patternOptions);
// Configure reliable topic with custom settings
ReliableTopicOptions reliableOptions = new ReliableTopicOptions()
.watchdogTimeout(30, TimeUnit.MINUTES)
.batchSize(50)
.codec(new KryoCodec());
RReliableTopic reliableTopic = redisson.getReliableTopic(reliableOptions);
// Error handling for listeners
RTopic errorHandlingTopic = redisson.getTopic("error-prone-events");
errorHandlingTopic.addListener(String.class, (channel, message) -> {
try {
processMessage(message);
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
// Message processing failed but listener remains active
}
});Install with Tessl CLI
npx tessl i tessl/maven-org-redisson--redisson