0
# Pub/Sub Messaging
1
2
This document covers Redis publish/subscribe messaging functionality, including channel subscriptions, pattern matching, message handling, and both text and binary message support.
3
4
## Core Pub/Sub Classes
5
6
### JedisPubSub
7
8
Abstract class for handling Redis pub/sub messages with string-based channels and messages.
9
10
```java { .api }
11
public abstract class JedisPubSub {
12
/**
13
* Called when a message is received on a subscribed channel
14
* @param channel Channel name that received the message
15
* @param message Message content
16
*/
17
public abstract void onMessage(String channel, String message);
18
19
/**
20
* Called when a message is received matching a pattern subscription
21
* @param pattern Pattern that matched
22
* @param channel Actual channel name
23
* @param message Message content
24
*/
25
public abstract void onPMessage(String pattern, String channel, String message);
26
27
/**
28
* Called when successfully subscribed to a channel
29
* @param channel Channel name
30
* @param subscribedChannels Total number of subscribed channels
31
*/
32
public abstract void onSubscribe(String channel, int subscribedChannels);
33
34
/**
35
* Called when successfully subscribed to a pattern
36
* @param pattern Pattern subscribed to
37
* @param subscribedChannels Total number of subscribed patterns
38
*/
39
public abstract void onPSubscribe(String pattern, int subscribedChannels);
40
41
/**
42
* Called when unsubscribed from a channel
43
* @param channel Channel name
44
* @param subscribedChannels Remaining subscribed channels
45
*/
46
public abstract void onUnsubscribe(String channel, int subscribedChannels);
47
48
/**
49
* Called when unsubscribed from a pattern
50
* @param pattern Pattern unsubscribed from
51
* @param subscribedChannels Remaining subscribed patterns
52
*/
53
public abstract void onPUnsubscribe(String pattern, int subscribedChannels);
54
55
/**
56
* Subscribe to channels
57
* @param channels Channel names to subscribe to
58
*/
59
public void subscribe(String... channels);
60
61
/**
62
* Subscribe to patterns
63
* @param patterns Pattern strings to subscribe to
64
*/
65
public void psubscribe(String... patterns);
66
67
/**
68
* Unsubscribe from channels
69
* @param channels Channel names to unsubscribe from (empty = all)
70
*/
71
public void unsubscribe(String... channels);
72
73
/**
74
* Unsubscribe from patterns
75
* @param patterns Pattern strings to unsubscribe from (empty = all)
76
*/
77
public void punsubscribe(String... patterns);
78
79
/**
80
* Check if subscribed to any channels or patterns
81
* @return true if subscribed
82
*/
83
public boolean isSubscribed();
84
85
/**
86
* Get number of subscribed channels
87
* @return Number of channel subscriptions
88
*/
89
public int getSubscribedChannels();
90
91
/**
92
* Get number of subscribed patterns
93
* @return Number of pattern subscriptions
94
*/
95
public int getSubscribedPatterns();
96
}
97
```
98
99
#### Usage Example
100
101
```java
102
class MessageHandler extends JedisPubSub {
103
@Override
104
public void onMessage(String channel, String message) {
105
System.out.println("Received on " + channel + ": " + message);
106
107
// Handle different message types
108
if (channel.equals("notifications")) {
109
handleNotification(message);
110
} else if (channel.equals("events")) {
111
handleEvent(message);
112
}
113
}
114
115
@Override
116
public void onPMessage(String pattern, String channel, String message) {
117
System.out.println("Pattern " + pattern + " matched " + channel + ": " + message);
118
}
119
120
@Override
121
public void onSubscribe(String channel, int subscribedChannels) {
122
System.out.println("Subscribed to " + channel +
123
" (total subscriptions: " + subscribedChannels + ")");
124
}
125
126
@Override
127
public void onPSubscribe(String pattern, int subscribedChannels) {
128
System.out.println("Subscribed to pattern " + pattern);
129
}
130
131
@Override
132
public void onUnsubscribe(String channel, int subscribedChannels) {
133
System.out.println("Unsubscribed from " + channel);
134
}
135
136
@Override
137
public void onPUnsubscribe(String pattern, int subscribedChannels) {
138
System.out.println("Unsubscribed from pattern " + pattern);
139
}
140
141
private void handleNotification(String message) {
142
// Process notification
143
}
144
145
private void handleEvent(String message) {
146
// Process event
147
}
148
}
149
150
// Usage
151
Jedis subscriberJedis = new Jedis("localhost", 6379);
152
MessageHandler handler = new MessageHandler();
153
154
// Subscribe in separate thread (blocking operation)
155
new Thread(() -> {
156
try {
157
subscriberJedis.subscribe(handler, "notifications", "events", "alerts");
158
} catch (Exception e) {
159
e.printStackTrace();
160
} finally {
161
subscriberJedis.close();
162
}
163
}).start();
164
165
// Publish messages from another connection
166
Jedis publisherJedis = new Jedis("localhost", 6379);
167
publisherJedis.publish("notifications", "System maintenance scheduled");
168
publisherJedis.publish("events", "User login: user123");
169
publisherJedis.close();
170
171
// Unsubscribe when done
172
handler.unsubscribe("alerts"); // Unsubscribe from specific channel
173
// handler.unsubscribe(); // Unsubscribe from all channels
174
```
175
176
### BinaryJedisPubSub
177
178
Abstract class for handling pub/sub messages with binary data support.
179
180
```java { .api }
181
public abstract class BinaryJedisPubSub {
182
/**
183
* Called when a binary message is received on a subscribed channel
184
* @param channel Channel name as bytes
185
* @param message Binary message content
186
*/
187
public abstract void onMessage(byte[] channel, byte[] message);
188
189
/**
190
* Called when a binary message matches a pattern subscription
191
* @param pattern Pattern as bytes
192
* @param channel Channel name as bytes
193
* @param message Binary message content
194
*/
195
public abstract void onPMessage(byte[] pattern, byte[] channel, byte[] message);
196
197
/**
198
* Called when successfully subscribed to a binary channel
199
* @param channel Channel name as bytes
200
* @param subscribedChannels Total subscribed channels
201
*/
202
public abstract void onSubscribe(byte[] channel, int subscribedChannels);
203
204
/**
205
* Called when successfully subscribed to a binary pattern
206
* @param pattern Pattern as bytes
207
* @param subscribedChannels Total subscribed patterns
208
*/
209
public abstract void onPSubscribe(byte[] pattern, int subscribedChannels);
210
211
/**
212
* Called when unsubscribed from a binary channel
213
* @param channel Channel name as bytes
214
* @param subscribedChannels Remaining subscribed channels
215
*/
216
public abstract void onUnsubscribe(byte[] channel, int subscribedChannels);
217
218
/**
219
* Called when unsubscribed from a binary pattern
220
* @param pattern Pattern as bytes
221
* @param subscribedChannels Remaining subscribed patterns
222
*/
223
public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels);
224
225
/**
226
* Subscribe to binary channels
227
* @param channels Channel names as byte arrays
228
*/
229
public void subscribe(byte[]... channels);
230
231
/**
232
* Subscribe to binary patterns
233
* @param patterns Pattern byte arrays
234
*/
235
public void psubscribe(byte[]... patterns);
236
237
/**
238
* Unsubscribe from binary channels
239
* @param channels Channel names as byte arrays
240
*/
241
public void unsubscribe(byte[]... channels);
242
243
/**
244
* Unsubscribe from binary patterns
245
* @param patterns Pattern byte arrays
246
*/
247
public void punsubscribe(byte[]... patterns);
248
249
/**
250
* Check if subscribed to any channels or patterns
251
* @return true if subscribed
252
*/
253
public boolean isSubscribed();
254
}
255
```
256
257
#### Usage Example
258
259
```java
260
class BinaryMessageHandler extends BinaryJedisPubSub {
261
@Override
262
public void onMessage(byte[] channel, byte[] message) {
263
String channelStr = new String(channel, StandardCharsets.UTF_8);
264
265
if (channelStr.equals("image_data")) {
266
handleImageData(message);
267
} else if (channelStr.equals("file_uploads")) {
268
handleFileUpload(message);
269
}
270
}
271
272
@Override
273
public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
274
// Handle pattern-matched binary messages
275
String patternStr = new String(pattern, StandardCharsets.UTF_8);
276
String channelStr = new String(channel, StandardCharsets.UTF_8);
277
System.out.println("Binary pattern " + patternStr + " matched " + channelStr);
278
}
279
280
@Override
281
public void onSubscribe(byte[] channel, int subscribedChannels) {
282
String channelStr = new String(channel, StandardCharsets.UTF_8);
283
System.out.println("Subscribed to binary channel: " + channelStr);
284
}
285
286
// ... other callback implementations
287
288
private void handleImageData(byte[] imageBytes) {
289
// Process binary image data
290
System.out.println("Received image data: " + imageBytes.length + " bytes");
291
}
292
293
private void handleFileUpload(byte[] fileData) {
294
// Process binary file upload
295
System.out.println("Received file upload: " + fileData.length + " bytes");
296
}
297
}
298
299
// Usage
300
Jedis subscriberJedis = new Jedis("localhost", 6379);
301
BinaryMessageHandler binaryHandler = new BinaryMessageHandler();
302
303
new Thread(() -> {
304
try {
305
subscriberJedis.subscribe(binaryHandler,
306
"image_data".getBytes(),
307
"file_uploads".getBytes());
308
} finally {
309
subscriberJedis.close();
310
}
311
}).start();
312
313
// Publish binary data
314
Jedis publisherJedis = new Jedis("localhost", 6379);
315
byte[] imageData = loadImageFile();
316
publisherJedis.publish("image_data".getBytes(), imageData);
317
publisherJedis.close();
318
```
319
320
## Pub/Sub Commands
321
322
### Publishing Messages
323
324
Core publishing commands available on any Jedis client.
325
326
```java { .api }
327
public interface JedisCommands {
328
/**
329
* Publish message to channel
330
* @param channel Channel name
331
* @param message Message content
332
* @return Number of clients that received the message
333
*/
334
Long publish(String channel, String message);
335
336
/**
337
* Publish binary message to channel
338
* @param channel Channel name as bytes
339
* @param message Binary message content
340
* @return Number of clients that received the message
341
*/
342
Long publish(byte[] channel, byte[] message);
343
344
/**
345
* Get number of subscribers for channels
346
* @param channels Channel names
347
* @return List of subscriber counts for each channel
348
*/
349
List<Long> pubsubNumSub(String... channels);
350
351
/**
352
* Get number of subscriptions to patterns
353
* @return Number of pattern subscriptions
354
*/
355
Long pubsubNumPat();
356
357
/**
358
* Get number of subscribers for binary channels
359
* @param channels Channel names as bytes
360
* @return List of subscriber counts
361
*/
362
List<Long> pubsubNumSub(byte[]... channels);
363
364
/**
365
* List active channels
366
* @return List of channels with at least one subscriber
367
*/
368
List<String> pubsubChannels();
369
370
/**
371
* List active channels matching pattern
372
* @param pattern Channel pattern
373
* @return List of matching channels with subscribers
374
*/
375
List<String> pubsubChannels(String pattern);
376
377
/**
378
* Get detailed pubsub information for sharded channels
379
* @param channels Shard channel names
380
* @return List of shard channel subscriber counts
381
*/
382
List<Long> pubsubShardNumSub(String... channels);
383
384
/**
385
* List active shard channels
386
* @return List of shard channels with subscribers
387
*/
388
List<String> pubsubShardChannels();
389
390
/**
391
* List active shard channels matching pattern
392
* @param pattern Shard channel pattern
393
* @return List of matching shard channels
394
*/
395
List<String> pubsubShardChannels(String pattern);
396
}
397
```
398
399
### Subscription Management
400
401
Methods available during pub/sub subscriptions.
402
403
```java { .api }
404
// Available in JedisPubSub and BinaryJedisPubSub
405
public void subscribe(String... channels); // Subscribe to channels
406
public void psubscribe(String... patterns); // Subscribe to patterns
407
public void unsubscribe(String... channels); // Unsubscribe from channels
408
public void punsubscribe(String... patterns); // Unsubscribe from patterns
409
```
410
411
#### Usage Example
412
413
```java
414
// Publisher
415
Jedis publisher = new Jedis("localhost", 6379);
416
417
// Publish to different channels
418
Long subscribers1 = publisher.publish("news", "Breaking news update");
419
Long subscribers2 = publisher.publish("chat:room1", "Hello everyone!");
420
Long subscribers3 = publisher.publish("alerts", "System maintenance in 1 hour");
421
422
System.out.println("News channel has " + subscribers1 + " subscribers");
423
424
// Check channel activity
425
List<String> activeChannels = publisher.pubsubChannels();
426
System.out.println("Active channels: " + activeChannels);
427
428
// Get subscriber counts
429
List<Long> counts = publisher.pubsubNumSub("news", "chat:room1", "alerts");
430
System.out.println("Subscriber counts: " + counts);
431
432
// Get pattern subscription count
433
Long patternSubs = publisher.pubsubNumPat();
434
System.out.println("Pattern subscriptions: " + patternSubs);
435
436
publisher.close();
437
```
438
439
## Advanced Pub/Sub Features
440
441
### Sharded Pub/Sub
442
443
Redis 7.0+ feature for sharded pub/sub that scales across cluster nodes.
444
445
```java { .api }
446
public interface JedisCommands {
447
/**
448
* Publish message to sharded channel
449
* @param shardChannel Shard channel name
450
* @param message Message content
451
* @return Number of clients that received the message
452
*/
453
Long spublish(String shardChannel, String message);
454
455
/**
456
* Subscribe to sharded channels
457
* @param jedisPubSub Message handler
458
* @param shardChannels Shard channel names
459
*/
460
void ssubscribe(JedisPubSub jedisPubSub, String... shardChannels);
461
462
/**
463
* Subscribe to binary sharded channels
464
* @param jedisPubSub Binary message handler
465
* @param shardChannels Shard channel names as bytes
466
*/
467
void ssubscribe(BinaryJedisPubSub jedisPubSub, byte[]... shardChannels);
468
}
469
```
470
471
### Pattern Subscriptions
472
473
Subscribe to channels using glob-style patterns.
474
475
```java { .api }
476
// Pattern examples:
477
// "news.*" - matches "news.sports", "news.politics", etc.
478
// "chat:*" - matches "chat:room1", "chat:room2", etc.
479
// "log:?:*" - matches "log:1:debug", "log:2:error", etc.
480
// "*" - matches all channels
481
482
class PatternSubscriber extends JedisPubSub {
483
@Override
484
public void onPMessage(String pattern, String channel, String message) {
485
System.out.println("Pattern: " + pattern);
486
System.out.println("Channel: " + channel);
487
System.out.println("Message: " + message);
488
489
// Route based on pattern
490
switch (pattern) {
491
case "news.*":
492
handleNewsMessage(channel, message);
493
break;
494
case "chat:*":
495
handleChatMessage(channel, message);
496
break;
497
case "error:*":
498
handleErrorMessage(channel, message);
499
break;
500
}
501
}
502
503
@Override
504
public void onMessage(String channel, String message) {
505
// Handle direct channel subscriptions
506
}
507
508
// ... other required methods
509
}
510
511
// Usage
512
PatternSubscriber subscriber = new PatternSubscriber();
513
Jedis jedis = new Jedis("localhost", 6379);
514
515
// Subscribe to patterns (blocking)
516
jedis.psubscribe(subscriber, "news.*", "chat:*", "error:*");
517
```
518
519
### Multi-Channel Publishing
520
521
Efficient publishing to multiple channels or patterns.
522
523
```java { .api }
524
// Publish to multiple related channels
525
Jedis publisher = new Jedis("localhost", 6379);
526
527
String eventData = "User logged in: user123";
528
529
// Publish to multiple channels for different consumers
530
publisher.publish("events", eventData); // General events
531
publisher.publish("user:events", eventData); // User-specific events
532
publisher.publish("audit:login", eventData); // Audit trail
533
publisher.publish("analytics:user", eventData); // Analytics
534
535
// Publish different messages to related channels
536
String baseChannel = "game:room1:";
537
publisher.publish(baseChannel + "chat", "Player joined the game");
538
publisher.publish(baseChannel + "state", "Game started");
539
publisher.publish(baseChannel + "score", "Score updated");
540
```
541
542
### Connection Management for Pub/Sub
543
544
Proper connection handling for pub/sub operations.
545
546
```java { .api }
547
public class PubSubManager {
548
private final Jedis subscriberJedis;
549
private final Jedis publisherJedis;
550
private final ExecutorService executor;
551
private volatile boolean running = true;
552
553
public PubSubManager() {
554
this.subscriberJedis = new Jedis("localhost", 6379);
555
this.publisherJedis = new Jedis("localhost", 6379);
556
this.executor = Executors.newCachedThreadPool();
557
}
558
559
public void startSubscribing(JedisPubSub pubSub, String... channels) {
560
executor.submit(() -> {
561
try {
562
subscriberJedis.subscribe(pubSub, channels);
563
} catch (Exception e) {
564
System.err.println("Subscription error: " + e.getMessage());
565
}
566
});
567
}
568
569
public void startPatternSubscribing(JedisPubSub pubSub, String... patterns) {
570
executor.submit(() -> {
571
try {
572
subscriberJedis.psubscribe(pubSub, patterns);
573
} catch (Exception e) {
574
System.err.println("Pattern subscription error: " + e.getMessage());
575
}
576
});
577
}
578
579
public Long publish(String channel, String message) {
580
return publisherJedis.publish(channel, message);
581
}
582
583
public void shutdown() {
584
running = false;
585
executor.shutdown();
586
587
try {
588
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
589
executor.shutdownNow();
590
}
591
} catch (InterruptedException e) {
592
executor.shutdownNow();
593
Thread.currentThread().interrupt();
594
}
595
596
subscriberJedis.close();
597
publisherJedis.close();
598
}
599
}
600
```
601
602
### Message Processing Patterns
603
604
Common patterns for processing pub/sub messages.
605
606
```java { .api }
607
// 1. Message routing based on channel
608
class ChannelRouter extends JedisPubSub {
609
private final Map<String, MessageProcessor> processors = new HashMap<>();
610
611
public void registerProcessor(String channel, MessageProcessor processor) {
612
processors.put(channel, processor);
613
}
614
615
@Override
616
public void onMessage(String channel, String message) {
617
MessageProcessor processor = processors.get(channel);
618
if (processor != null) {
619
processor.process(channel, message);
620
} else {
621
System.out.println("No processor for channel: " + channel);
622
}
623
}
624
625
// ... other required methods
626
}
627
628
// 2. JSON message deserialization
629
class JsonMessageHandler extends JedisPubSub {
630
private final ObjectMapper objectMapper = new ObjectMapper();
631
632
@Override
633
public void onMessage(String channel, String message) {
634
try {
635
switch (channel) {
636
case "user_events":
637
UserEvent event = objectMapper.readValue(message, UserEvent.class);
638
handleUserEvent(event);
639
break;
640
case "system_alerts":
641
SystemAlert alert = objectMapper.readValue(message, SystemAlert.class);
642
handleSystemAlert(alert);
643
break;
644
}
645
} catch (Exception e) {
646
System.err.println("Failed to parse message: " + e.getMessage());
647
}
648
}
649
650
// ... handler methods
651
}
652
653
// 3. Batch message processing
654
class BatchMessageProcessor extends JedisPubSub {
655
private final List<String> messageBuffer = new ArrayList<>();
656
private final int batchSize = 100;
657
private final ScheduledExecutorService scheduler =
658
Executors.newSingleThreadScheduledExecutor();
659
660
public BatchMessageProcessor() {
661
// Process batches every 5 seconds
662
scheduler.scheduleAtFixedRate(this::processBatch, 5, 5, TimeUnit.SECONDS);
663
}
664
665
@Override
666
public synchronized void onMessage(String channel, String message) {
667
messageBuffer.add(channel + ":" + message);
668
669
if (messageBuffer.size() >= batchSize) {
670
processBatch();
671
}
672
}
673
674
private synchronized void processBatch() {
675
if (!messageBuffer.isEmpty()) {
676
List<String> batch = new ArrayList<>(messageBuffer);
677
messageBuffer.clear();
678
679
// Process batch in background
680
CompletableFuture.runAsync(() -> {
681
processBatchAsync(batch);
682
});
683
}
684
}
685
686
private void processBatchAsync(List<String> messages) {
687
// Batch processing logic
688
System.out.println("Processing batch of " + messages.size() + " messages");
689
}
690
}
691
```
692
693
### Error Handling and Resilience
694
695
Best practices for robust pub/sub implementations.
696
697
```java { .api }
698
class ResilientSubscriber extends JedisPubSub {
699
private final AtomicBoolean connected = new AtomicBoolean(false);
700
private final int maxRetries = 3;
701
private volatile boolean shouldReconnect = true;
702
703
@Override
704
public void onSubscribe(String channel, int subscribedChannels) {
705
connected.set(true);
706
System.out.println("Successfully subscribed to " + channel);
707
}
708
709
@Override
710
public void onMessage(String channel, String message) {
711
try {
712
processMessage(channel, message);
713
} catch (Exception e) {
714
System.err.println("Error processing message from " + channel + ": " + e.getMessage());
715
// Could implement dead letter queue or retry logic here
716
}
717
}
718
719
public void handleConnectionLoss() {
720
connected.set(false);
721
722
if (shouldReconnect) {
723
reconnectWithBackoff();
724
}
725
}
726
727
private void reconnectWithBackoff() {
728
int attempt = 0;
729
while (attempt < maxRetries && shouldReconnect) {
730
try {
731
Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Exponential backoff
732
733
Jedis jedis = new Jedis("localhost", 6379);
734
jedis.subscribe(this, "channel1", "channel2");
735
736
break; // Success
737
} catch (Exception e) {
738
attempt++;
739
System.err.println("Reconnection attempt " + attempt + " failed: " + e.getMessage());
740
}
741
}
742
}
743
744
public void shutdown() {
745
shouldReconnect = false;
746
unsubscribe(); // Unsubscribe from all channels
747
}
748
749
private void processMessage(String channel, String message) {
750
// Message processing logic with error handling
751
}
752
}
753
```
754
755
Redis pub/sub provides a powerful messaging system for real-time communication. Jedis offers comprehensive support for both simple and advanced pub/sub scenarios with proper error handling and connection management.