0
# Pub/Sub Messaging
1
2
Redisson provides a comprehensive publish/subscribe messaging system built on Redis pub/sub capabilities. It supports regular topics, pattern-based subscriptions, sharded topics for scalability, and reliable topics with guaranteed delivery.
3
4
## Capabilities
5
6
### Basic Topics
7
8
Standard publish/subscribe topics for real-time messaging between distributed components.
9
10
```java { .api }
11
/**
12
* Get a topic for publish/subscribe messaging
13
* @param name - unique name of the topic
14
* @return RTopic instance
15
*/
16
public RTopic getTopic(String name);
17
public RTopic getTopic(String name, Codec codec);
18
public RTopic getTopic(PlainOptions options);
19
```
20
21
**Topic Interface:**
22
23
```java { .api }
24
public interface RTopic extends RTopicAsync {
25
// Publishing messages
26
long publish(Object message);
27
28
// Subscribing to messages
29
int addListener(MessageListener<Object> listener);
30
int addListener(Class<?> type, MessageListener<?> listener);
31
32
// Listener management
33
void removeListener(int listenerId);
34
void removeAllListeners();
35
36
// Topic information
37
List<String> getChannelNames();
38
long countListeners();
39
long countSubscribers();
40
}
41
42
// Message listener interface
43
@FunctionalInterface
44
public interface MessageListener<M> {
45
void onMessage(CharSequence channel, M msg);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
import org.redisson.api.*;
53
54
// Get topic
55
RTopic topic = redisson.getTopic("notifications");
56
57
// Subscribe to messages
58
int listenerId = topic.addListener(String.class, (channel, message) -> {
59
System.out.println("Received on " + channel + ": " + message);
60
});
61
62
// Publish messages
63
long subscribersCount = topic.publish("Hello, subscribers!");
64
System.out.println("Message delivered to " + subscribersCount + " subscribers");
65
66
// Publish different message types
67
RTopic<User> userTopic = redisson.getTopic("user-events");
68
userTopic.addListener(User.class, (channel, user) -> {
69
System.out.println("User event: " + user.getName());
70
});
71
userTopic.publish(new User("Alice", 25));
72
73
// Remove listener when done
74
topic.removeListener(listenerId);
75
76
// Multiple listeners for same topic
77
RTopic eventTopic = redisson.getTopic("events");
78
79
int listener1 = eventTopic.addListener(String.class, (channel, msg) -> {
80
System.out.println("Listener 1: " + msg);
81
});
82
83
int listener2 = eventTopic.addListener(String.class, (channel, msg) -> {
84
System.out.println("Listener 2: " + msg);
85
});
86
87
eventTopic.publish("Event message"); // Both listeners receive this
88
89
// Remove specific listener
90
eventTopic.removeListener(listener1);
91
eventTopic.publish("Another event"); // Only listener2 receives this
92
```
93
94
### Pattern Topics
95
96
Pattern-based subscriptions allowing wildcard matching for dynamic topic subscription.
97
98
```java { .api }
99
/**
100
* Get a pattern topic for wildcard subscriptions
101
* @param pattern - pattern with wildcards (* and ?)
102
* @return RPatternTopic instance
103
*/
104
public RPatternTopic getPatternTopic(String pattern);
105
public RPatternTopic getPatternTopic(String pattern, Codec codec);
106
public RPatternTopic getPatternTopic(PatternTopicOptions options);
107
```
108
109
**Pattern Topic Interface:**
110
111
```java { .api }
112
public interface RPatternTopic extends RPatternTopicAsync {
113
// Subscribe with pattern matching
114
int addListener(PatternMessageListener<Object> listener);
115
int addListener(Class<?> type, PatternMessageListener<?> listener);
116
117
// Listener management
118
void removeListener(int listenerId);
119
void removeAllListeners();
120
121
// Pattern information
122
List<String> getPatternNames();
123
long countListeners();
124
}
125
126
// Pattern message listener interface
127
@FunctionalInterface
128
public interface PatternMessageListener<M> {
129
void onMessage(CharSequence pattern, CharSequence channel, M msg);
130
}
131
```
132
133
**Usage Examples:**
134
135
```java
136
// Pattern subscription - listen to multiple related topics
137
RPatternTopic patternTopic = redisson.getPatternTopic("user.*");
138
139
int patternListener = patternTopic.addListener(String.class, (pattern, channel, message) -> {
140
System.out.println("Pattern: " + pattern + ", Channel: " + channel + ", Message: " + message);
141
});
142
143
// These will all match the pattern "user.*"
144
RTopic userLoginTopic = redisson.getTopic("user.login");
145
RTopic userLogoutTopic = redisson.getTopic("user.logout");
146
RTopic userUpdateTopic = redisson.getTopic("user.update");
147
148
userLoginTopic.publish("User John logged in"); // Matches pattern
149
userLogoutTopic.publish("User John logged out"); // Matches pattern
150
userUpdateTopic.publish("User John updated profile"); // Matches pattern
151
152
// More specific patterns
153
RPatternTopic orderPattern = redisson.getPatternTopic("order.*.created");
154
orderPattern.addListener(String.class, (pattern, channel, message) -> {
155
System.out.println("New order created: " + message);
156
});
157
158
// Matches: order.electronics.created, order.books.created, etc.
159
redisson.getTopic("order.electronics.created").publish("Order #123 created");
160
redisson.getTopic("order.books.created").publish("Order #456 created");
161
162
// Multiple patterns
163
RPatternTopic alertPattern = redisson.getPatternTopic("alert.*");
164
RPatternTopic errorPattern = redisson.getPatternTopic("error.*");
165
166
alertPattern.addListener(String.class, (pattern, channel, msg) -> {
167
System.out.println("ALERT - " + channel + ": " + msg);
168
});
169
170
errorPattern.addListener(String.class, (pattern, channel, msg) -> {
171
System.err.println("ERROR - " + channel + ": " + msg);
172
});
173
```
174
175
### Sharded Topics
176
177
Sharded topics distribute messages across multiple Redis nodes for improved scalability and performance.
178
179
```java { .api }
180
/**
181
* Get a sharded topic for scalable messaging
182
* @param name - unique name of the sharded topic
183
* @return RShardedTopic instance
184
*/
185
public RShardedTopic getShardedTopic(String name);
186
public RShardedTopic getShardedTopic(String name, Codec codec);
187
public RShardedTopic getShardedTopic(PlainOptions options);
188
```
189
190
**Sharded Topic Interface:**
191
192
```java { .api }
193
public interface RShardedTopic extends RShardedTopicAsync {
194
// Publishing with sharding
195
long publish(Object message);
196
197
// Subscribing with automatic shard distribution
198
int addListener(MessageListener<Object> listener);
199
int addListener(Class<?> type, MessageListener<?> listener);
200
201
// Listener management
202
void removeListener(int listenerId);
203
void removeAllListeners();
204
205
// Sharding information
206
List<String> getChannelNames();
207
long countListeners();
208
long countSubscribers();
209
}
210
```
211
212
**Usage Examples:**
213
214
```java
215
// Sharded topic for high-throughput messaging
216
RShardedTopic shardedTopic = redisson.getShardedTopic("high-volume-events");
217
218
// Subscribe - automatically distributed across shards
219
int shardedListener = shardedTopic.addListener(String.class, (channel, message) -> {
220
System.out.println("Sharded message: " + message + " on " + channel);
221
});
222
223
// Publish - automatically distributed across available shards
224
for (int i = 0; i < 1000; i++) {
225
shardedTopic.publish("Message " + i);
226
}
227
228
// Multiple listeners automatically distributed
229
RShardedTopic eventStream = redisson.getShardedTopic("event-stream");
230
231
// Add multiple listeners - they'll be distributed across shards
232
for (int i = 0; i < 5; i++) {
233
final int listenerId = i;
234
eventStream.addListener(String.class, (channel, message) -> {
235
System.out.println("Listener " + listenerId + " received: " + message);
236
});
237
}
238
239
// Messages are distributed across all listeners/shards
240
for (int i = 0; i < 100; i++) {
241
eventStream.publish("Event " + i);
242
}
243
```
244
245
### Reliable Topics
246
247
Reliable topics provide guaranteed message delivery with acknowledgment and replay capabilities.
248
249
```java { .api }
250
/**
251
* Get a reliable topic with guaranteed delivery
252
* @param name - unique name of the reliable topic
253
* @return RReliableTopic instance
254
*/
255
public RReliableTopic getReliableTopic(String name);
256
public RReliableTopic getReliableTopic(String name, Codec codec);
257
public RReliableTopic getReliableTopic(PlainOptions options);
258
```
259
260
**Reliable Topic Interface:**
261
262
```java { .api }
263
public interface RReliableTopic extends RReliableTopicAsync {
264
// Publishing with confirmation
265
long publish(Object message);
266
267
// Subscribing with acknowledgment
268
String addListener(MessageListener<Object> listener);
269
String addListener(Class<?> type, MessageListener<?> listener);
270
271
// Listener management with IDs
272
void removeListener(String listenerId);
273
void removeAllListeners();
274
275
// Message acknowledgment and replay
276
long size();
277
List<Object> readAll();
278
void expire(long timeToLive, TimeUnit timeUnit);
279
void expireAt(long timestamp);
280
long remainTimeToLive();
281
}
282
```
283
284
**Usage Examples:**
285
286
```java
287
// Reliable topic ensures message delivery
288
RReliableTopic reliableTopic = redisson.getReliableTopic("critical-events");
289
290
// Subscribe with guaranteed delivery
291
String listenerId = reliableTopic.addListener(String.class, (channel, message) -> {
292
System.out.println("Processing critical event: " + message);
293
// Message is automatically acknowledged after successful processing
294
});
295
296
// Publish critical messages
297
reliableTopic.publish("System alert: High CPU usage");
298
reliableTopic.publish("System alert: Low disk space");
299
300
// Check message queue size
301
long pendingMessages = reliableTopic.size();
302
System.out.println("Pending messages: " + pendingMessages);
303
304
// Read all unprocessed messages (useful for debugging)
305
List<Object> allMessages = reliableTopic.readAll();
306
System.out.println("All messages in queue: " + allMessages);
307
308
// Set expiration for old messages
309
reliableTopic.expire(1, TimeUnit.HOURS); // Messages expire after 1 hour
310
311
// Multiple reliable listeners
312
RReliableTopic orderTopic = redisson.getReliableTopic("order-processing");
313
314
String processor1 = orderTopic.addListener(Order.class, (channel, order) -> {
315
System.out.println("Processor 1 handling order: " + order.getId());
316
processOrder(order);
317
});
318
319
String processor2 = orderTopic.addListener(Order.class, (channel, order) -> {
320
System.out.println("Processor 2 handling order: " + order.getId());
321
processOrder(order);
322
});
323
324
// Orders are reliably delivered to all processors
325
orderTopic.publish(new Order("123", "Product A"));
326
orderTopic.publish(new Order("124", "Product B"));
327
```
328
329
### Async Topic Operations
330
331
All topic operations have async variants returning `RFuture<T>`.
332
333
```java { .api }
334
// Async topic interface
335
public interface RTopicAsync extends RObjectAsync {
336
RFuture<Long> publishAsync(Object message);
337
RFuture<Integer> addListenerAsync(MessageListener<Object> listener);
338
RFuture<Integer> addListenerAsync(Class<?> type, MessageListener<?> listener);
339
RFuture<Void> removeListenerAsync(int listenerId);
340
RFuture<Void> removeAllListenersAsync();
341
RFuture<Long> countListenersAsync();
342
RFuture<Long> countSubscribersAsync();
343
}
344
345
// Async pattern topic interface
346
public interface RPatternTopicAsync extends RObjectAsync {
347
RFuture<Integer> addListenerAsync(PatternMessageListener<Object> listener);
348
RFuture<Integer> addListenerAsync(Class<?> type, PatternMessageListener<?> listener);
349
RFuture<Void> removeListenerAsync(int listenerId);
350
RFuture<Void> removeAllListenersAsync();
351
RFuture<Long> countListenersAsync();
352
}
353
354
// Async reliable topic interface
355
public interface RReliableTopicAsync extends RObjectAsync, RExpirableAsync {
356
RFuture<Long> publishAsync(Object message);
357
RFuture<String> addListenerAsync(MessageListener<Object> listener);
358
RFuture<String> addListenerAsync(Class<?> type, MessageListener<?> listener);
359
RFuture<Void> removeListenerAsync(String listenerId);
360
RFuture<Void> removeAllListenersAsync();
361
RFuture<Long> sizeAsync();
362
RFuture<List<Object>> readAllAsync();
363
}
364
```
365
366
**Async Usage Examples:**
367
368
```java
369
// Async topic operations
370
RTopicAsync asyncTopic = redisson.getTopic("async-events");
371
372
// Async subscribe
373
RFuture<Integer> listenerFuture = asyncTopic.addListenerAsync(String.class, (channel, message) -> {
374
System.out.println("Async message: " + message);
375
});
376
377
listenerFuture.whenComplete((listenerId, error) -> {
378
if (error == null) {
379
System.out.println("Listener added with ID: " + listenerId);
380
381
// Async publish after successful subscription
382
asyncTopic.publishAsync("Hello async world!")
383
.whenComplete((subscribers, publishError) -> {
384
if (publishError == null) {
385
System.out.println("Message sent to " + subscribers + " subscribers");
386
} else {
387
System.err.println("Publish failed: " + publishError.getMessage());
388
}
389
});
390
} else {
391
System.err.println("Failed to add listener: " + error.getMessage());
392
}
393
});
394
395
// Chain async operations
396
RFuture<Long> chainedOperation = asyncTopic.countSubscribersAsync()
397
.thenCompose(count -> {
398
System.out.println("Current subscribers: " + count);
399
return asyncTopic.publishAsync("Subscriber count: " + count);
400
})
401
.thenCompose(delivered -> {
402
System.out.println("Message delivered to: " + delivered);
403
return asyncTopic.countListenersAsync();
404
});
405
406
chainedOperation.whenComplete((listeners, error) -> {
407
if (error == null) {
408
System.out.println("Total listeners: " + listeners);
409
} else {
410
System.err.println("Operation chain failed: " + error.getMessage());
411
}
412
});
413
```
414
415
### Message Filtering and Transformation
416
417
Advanced messaging patterns with filtering and transformation capabilities.
418
419
```java { .api }
420
// Message filtering example
421
public class MessageFiltering {
422
423
public static void setupFilteredTopic(RedissonClient redisson) {
424
RTopic topic = redisson.getTopic("filtered-events");
425
426
// Filter messages by type
427
topic.addListener(String.class, (channel, message) -> {
428
if (message.startsWith("URGENT:")) {
429
handleUrgentMessage(message);
430
}
431
});
432
433
topic.addListener(String.class, (channel, message) -> {
434
if (message.startsWith("INFO:")) {
435
handleInfoMessage(message);
436
}
437
});
438
}
439
440
// Message transformation example
441
public static void setupTransformingTopic(RedissonClient redisson) {
442
RTopic<Map<String, Object>> eventTopic = redisson.getTopic("raw-events");
443
RTopic<ProcessedEvent> processedTopic = redisson.getTopic("processed-events");
444
445
// Transform and republish messages
446
eventTopic.addListener(Map.class, (channel, rawEvent) -> {
447
ProcessedEvent processed = transformEvent(rawEvent);
448
processedTopic.publish(processed);
449
});
450
}
451
}
452
453
// Custom message types
454
public class ProcessedEvent {
455
private String id;
456
private String type;
457
private long timestamp;
458
private Map<String, Object> data;
459
460
// constructors, getters, setters...
461
}
462
```
463
464
## Topic Configuration Options
465
466
```java { .api }
467
// Pattern topic options
468
public class PatternTopicOptions extends PlainOptions {
469
private String pattern;
470
471
public PatternTopicOptions pattern(String pattern);
472
public String getPattern();
473
}
474
475
// Topic listener configuration
476
public class TopicListener<M> {
477
private final Class<M> messageClass;
478
private final MessageListener<M> listener;
479
private final boolean removeOnError;
480
481
public TopicListener(Class<M> messageClass, MessageListener<M> listener);
482
public TopicListener(Class<M> messageClass, MessageListener<M> listener, boolean removeOnError);
483
484
public Class<M> getMessageClass();
485
public MessageListener<M> getListener();
486
public boolean isRemoveOnError();
487
}
488
489
// Reliable topic configuration
490
public class ReliableTopicOptions extends PlainOptions {
491
private long watchdogTimeout = 10 * 60000; // 10 minutes
492
private int batchSize = 100;
493
494
public ReliableTopicOptions watchdogTimeout(long watchdogTimeout, TimeUnit timeUnit);
495
public ReliableTopicOptions batchSize(int batchSize);
496
497
public long getWatchdogTimeout();
498
public int getBatchSize();
499
}
500
```
501
502
**Configuration Examples:**
503
504
```java
505
// Configure pattern topic
506
PatternTopicOptions patternOptions = new PatternTopicOptions()
507
.pattern("system.*.alerts")
508
.codec(new JsonJacksonCodec());
509
510
RPatternTopic patternTopic = redisson.getPatternTopic(patternOptions);
511
512
// Configure reliable topic with custom settings
513
ReliableTopicOptions reliableOptions = new ReliableTopicOptions()
514
.watchdogTimeout(30, TimeUnit.MINUTES)
515
.batchSize(50)
516
.codec(new KryoCodec());
517
518
RReliableTopic reliableTopic = redisson.getReliableTopic(reliableOptions);
519
520
// Error handling for listeners
521
RTopic errorHandlingTopic = redisson.getTopic("error-prone-events");
522
523
errorHandlingTopic.addListener(String.class, (channel, message) -> {
524
try {
525
processMessage(message);
526
} catch (Exception e) {
527
System.err.println("Error processing message: " + e.getMessage());
528
// Message processing failed but listener remains active
529
}
530
});
531
```