Jedis is a blazingly small and sane Redis java client.
This document covers Redis publish/subscribe messaging functionality, including channel subscriptions, pattern matching, message handling, and both text and binary message support.
Abstract class for handling Redis pub/sub messages with string-based channels and messages.
public abstract class JedisPubSub {
/**
* Called when a message is received on a subscribed channel
* @param channel Channel name that received the message
* @param message Message content
*/
public abstract void onMessage(String channel, String message);
/**
* Called when a message is received matching a pattern subscription
* @param pattern Pattern that matched
* @param channel Actual channel name
* @param message Message content
*/
public abstract void onPMessage(String pattern, String channel, String message);
/**
* Called when successfully subscribed to a channel
* @param channel Channel name
* @param subscribedChannels Total number of subscribed channels
*/
public abstract void onSubscribe(String channel, int subscribedChannels);
/**
* Called when successfully subscribed to a pattern
* @param pattern Pattern subscribed to
* @param subscribedChannels Total number of subscribed patterns
*/
public abstract void onPSubscribe(String pattern, int subscribedChannels);
/**
* Called when unsubscribed from a channel
* @param channel Channel name
* @param subscribedChannels Remaining subscribed channels
*/
public abstract void onUnsubscribe(String channel, int subscribedChannels);
/**
* Called when unsubscribed from a pattern
* @param pattern Pattern unsubscribed from
* @param subscribedChannels Remaining subscribed patterns
*/
public abstract void onPUnsubscribe(String pattern, int subscribedChannels);
/**
* Subscribe to channels
* @param channels Channel names to subscribe to
*/
public void subscribe(String... channels);
/**
* Subscribe to patterns
* @param patterns Pattern strings to subscribe to
*/
public void psubscribe(String... patterns);
/**
* Unsubscribe from channels
* @param channels Channel names to unsubscribe from (empty = all)
*/
public void unsubscribe(String... channels);
/**
* Unsubscribe from patterns
* @param patterns Pattern strings to unsubscribe from (empty = all)
*/
public void punsubscribe(String... patterns);
/**
* Check if subscribed to any channels or patterns
* @return true if subscribed
*/
public boolean isSubscribed();
/**
* Get number of subscribed channels
* @return Number of channel subscriptions
*/
public int getSubscribedChannels();
/**
* Get number of subscribed patterns
* @return Number of pattern subscriptions
*/
public int getSubscribedPatterns();
}class MessageHandler extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received on " + channel + ": " + message);
// Handle different message types
if (channel.equals("notifications")) {
handleNotification(message);
} else if (channel.equals("events")) {
handleEvent(message);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Pattern " + pattern + " matched " + channel + ": " + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Subscribed to " + channel +
" (total subscriptions: " + subscribedChannels + ")");
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("Subscribed to pattern " + pattern);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Unsubscribed from " + channel);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("Unsubscribed from pattern " + pattern);
}
private void handleNotification(String message) {
// Process notification
}
private void handleEvent(String message) {
// Process event
}
}
// Usage
Jedis subscriberJedis = new Jedis("localhost", 6379);
MessageHandler handler = new MessageHandler();
// Subscribe in separate thread (blocking operation)
new Thread(() -> {
try {
subscriberJedis.subscribe(handler, "notifications", "events", "alerts");
} catch (Exception e) {
e.printStackTrace();
} finally {
subscriberJedis.close();
}
}).start();
// Publish messages from another connection
Jedis publisherJedis = new Jedis("localhost", 6379);
publisherJedis.publish("notifications", "System maintenance scheduled");
publisherJedis.publish("events", "User login: user123");
publisherJedis.close();
// Unsubscribe when done
handler.unsubscribe("alerts"); // Unsubscribe from specific channel
// handler.unsubscribe(); // Unsubscribe from all channelsAbstract class for handling pub/sub messages with binary data support.
public abstract class BinaryJedisPubSub {
/**
* Called when a binary message is received on a subscribed channel
* @param channel Channel name as bytes
* @param message Binary message content
*/
public abstract void onMessage(byte[] channel, byte[] message);
/**
* Called when a binary message matches a pattern subscription
* @param pattern Pattern as bytes
* @param channel Channel name as bytes
* @param message Binary message content
*/
public abstract void onPMessage(byte[] pattern, byte[] channel, byte[] message);
/**
* Called when successfully subscribed to a binary channel
* @param channel Channel name as bytes
* @param subscribedChannels Total subscribed channels
*/
public abstract void onSubscribe(byte[] channel, int subscribedChannels);
/**
* Called when successfully subscribed to a binary pattern
* @param pattern Pattern as bytes
* @param subscribedChannels Total subscribed patterns
*/
public abstract void onPSubscribe(byte[] pattern, int subscribedChannels);
/**
* Called when unsubscribed from a binary channel
* @param channel Channel name as bytes
* @param subscribedChannels Remaining subscribed channels
*/
public abstract void onUnsubscribe(byte[] channel, int subscribedChannels);
/**
* Called when unsubscribed from a binary pattern
* @param pattern Pattern as bytes
* @param subscribedChannels Remaining subscribed patterns
*/
public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels);
/**
* Subscribe to binary channels
* @param channels Channel names as byte arrays
*/
public void subscribe(byte[]... channels);
/**
* Subscribe to binary patterns
* @param patterns Pattern byte arrays
*/
public void psubscribe(byte[]... patterns);
/**
* Unsubscribe from binary channels
* @param channels Channel names as byte arrays
*/
public void unsubscribe(byte[]... channels);
/**
* Unsubscribe from binary patterns
* @param patterns Pattern byte arrays
*/
public void punsubscribe(byte[]... patterns);
/**
* Check if subscribed to any channels or patterns
* @return true if subscribed
*/
public boolean isSubscribed();
}class BinaryMessageHandler extends BinaryJedisPubSub {
@Override
public void onMessage(byte[] channel, byte[] message) {
String channelStr = new String(channel, StandardCharsets.UTF_8);
if (channelStr.equals("image_data")) {
handleImageData(message);
} else if (channelStr.equals("file_uploads")) {
handleFileUpload(message);
}
}
@Override
public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
// Handle pattern-matched binary messages
String patternStr = new String(pattern, StandardCharsets.UTF_8);
String channelStr = new String(channel, StandardCharsets.UTF_8);
System.out.println("Binary pattern " + patternStr + " matched " + channelStr);
}
@Override
public void onSubscribe(byte[] channel, int subscribedChannels) {
String channelStr = new String(channel, StandardCharsets.UTF_8);
System.out.println("Subscribed to binary channel: " + channelStr);
}
// ... other callback implementations
private void handleImageData(byte[] imageBytes) {
// Process binary image data
System.out.println("Received image data: " + imageBytes.length + " bytes");
}
private void handleFileUpload(byte[] fileData) {
// Process binary file upload
System.out.println("Received file upload: " + fileData.length + " bytes");
}
}
// Usage
Jedis subscriberJedis = new Jedis("localhost", 6379);
BinaryMessageHandler binaryHandler = new BinaryMessageHandler();
new Thread(() -> {
try {
subscriberJedis.subscribe(binaryHandler,
"image_data".getBytes(),
"file_uploads".getBytes());
} finally {
subscriberJedis.close();
}
}).start();
// Publish binary data
Jedis publisherJedis = new Jedis("localhost", 6379);
byte[] imageData = loadImageFile();
publisherJedis.publish("image_data".getBytes(), imageData);
publisherJedis.close();Core publishing commands available on any Jedis client.
public interface JedisCommands {
/**
* Publish message to channel
* @param channel Channel name
* @param message Message content
* @return Number of clients that received the message
*/
Long publish(String channel, String message);
/**
* Publish binary message to channel
* @param channel Channel name as bytes
* @param message Binary message content
* @return Number of clients that received the message
*/
Long publish(byte[] channel, byte[] message);
/**
* Get number of subscribers for channels
* @param channels Channel names
* @return List of subscriber counts for each channel
*/
List<Long> pubsubNumSub(String... channels);
/**
* Get number of subscriptions to patterns
* @return Number of pattern subscriptions
*/
Long pubsubNumPat();
/**
* Get number of subscribers for binary channels
* @param channels Channel names as bytes
* @return List of subscriber counts
*/
List<Long> pubsubNumSub(byte[]... channels);
/**
* List active channels
* @return List of channels with at least one subscriber
*/
List<String> pubsubChannels();
/**
* List active channels matching pattern
* @param pattern Channel pattern
* @return List of matching channels with subscribers
*/
List<String> pubsubChannels(String pattern);
/**
* Get detailed pubsub information for sharded channels
* @param channels Shard channel names
* @return List of shard channel subscriber counts
*/
List<Long> pubsubShardNumSub(String... channels);
/**
* List active shard channels
* @return List of shard channels with subscribers
*/
List<String> pubsubShardChannels();
/**
* List active shard channels matching pattern
* @param pattern Shard channel pattern
* @return List of matching shard channels
*/
List<String> pubsubShardChannels(String pattern);
}Methods available during pub/sub subscriptions.
// Available in JedisPubSub and BinaryJedisPubSub
public void subscribe(String... channels); // Subscribe to channels
public void psubscribe(String... patterns); // Subscribe to patterns
public void unsubscribe(String... channels); // Unsubscribe from channels
public void punsubscribe(String... patterns); // Unsubscribe from patterns// Publisher
Jedis publisher = new Jedis("localhost", 6379);
// Publish to different channels
Long subscribers1 = publisher.publish("news", "Breaking news update");
Long subscribers2 = publisher.publish("chat:room1", "Hello everyone!");
Long subscribers3 = publisher.publish("alerts", "System maintenance in 1 hour");
System.out.println("News channel has " + subscribers1 + " subscribers");
// Check channel activity
List<String> activeChannels = publisher.pubsubChannels();
System.out.println("Active channels: " + activeChannels);
// Get subscriber counts
List<Long> counts = publisher.pubsubNumSub("news", "chat:room1", "alerts");
System.out.println("Subscriber counts: " + counts);
// Get pattern subscription count
Long patternSubs = publisher.pubsubNumPat();
System.out.println("Pattern subscriptions: " + patternSubs);
publisher.close();Redis 7.0+ feature for sharded pub/sub that scales across cluster nodes.
public interface JedisCommands {
/**
* Publish message to sharded channel
* @param shardChannel Shard channel name
* @param message Message content
* @return Number of clients that received the message
*/
Long spublish(String shardChannel, String message);
/**
* Subscribe to sharded channels
* @param jedisPubSub Message handler
* @param shardChannels Shard channel names
*/
void ssubscribe(JedisPubSub jedisPubSub, String... shardChannels);
/**
* Subscribe to binary sharded channels
* @param jedisPubSub Binary message handler
* @param shardChannels Shard channel names as bytes
*/
void ssubscribe(BinaryJedisPubSub jedisPubSub, byte[]... shardChannels);
}Subscribe to channels using glob-style patterns.
// Pattern examples:
// "news.*" - matches "news.sports", "news.politics", etc.
// "chat:*" - matches "chat:room1", "chat:room2", etc.
// "log:?:*" - matches "log:1:debug", "log:2:error", etc.
// "*" - matches all channels
class PatternSubscriber extends JedisPubSub {
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Pattern: " + pattern);
System.out.println("Channel: " + channel);
System.out.println("Message: " + message);
// Route based on pattern
switch (pattern) {
case "news.*":
handleNewsMessage(channel, message);
break;
case "chat:*":
handleChatMessage(channel, message);
break;
case "error:*":
handleErrorMessage(channel, message);
break;
}
}
@Override
public void onMessage(String channel, String message) {
// Handle direct channel subscriptions
}
// ... other required methods
}
// Usage
PatternSubscriber subscriber = new PatternSubscriber();
Jedis jedis = new Jedis("localhost", 6379);
// Subscribe to patterns (blocking)
jedis.psubscribe(subscriber, "news.*", "chat:*", "error:*");Efficient publishing to multiple channels or patterns.
// Publish to multiple related channels
Jedis publisher = new Jedis("localhost", 6379);
String eventData = "User logged in: user123";
// Publish to multiple channels for different consumers
publisher.publish("events", eventData); // General events
publisher.publish("user:events", eventData); // User-specific events
publisher.publish("audit:login", eventData); // Audit trail
publisher.publish("analytics:user", eventData); // Analytics
// Publish different messages to related channels
String baseChannel = "game:room1:";
publisher.publish(baseChannel + "chat", "Player joined the game");
publisher.publish(baseChannel + "state", "Game started");
publisher.publish(baseChannel + "score", "Score updated");Proper connection handling for pub/sub operations.
public class PubSubManager {
private final Jedis subscriberJedis;
private final Jedis publisherJedis;
private final ExecutorService executor;
private volatile boolean running = true;
public PubSubManager() {
this.subscriberJedis = new Jedis("localhost", 6379);
this.publisherJedis = new Jedis("localhost", 6379);
this.executor = Executors.newCachedThreadPool();
}
public void startSubscribing(JedisPubSub pubSub, String... channels) {
executor.submit(() -> {
try {
subscriberJedis.subscribe(pubSub, channels);
} catch (Exception e) {
System.err.println("Subscription error: " + e.getMessage());
}
});
}
public void startPatternSubscribing(JedisPubSub pubSub, String... patterns) {
executor.submit(() -> {
try {
subscriberJedis.psubscribe(pubSub, patterns);
} catch (Exception e) {
System.err.println("Pattern subscription error: " + e.getMessage());
}
});
}
public Long publish(String channel, String message) {
return publisherJedis.publish(channel, message);
}
public void shutdown() {
running = false;
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
subscriberJedis.close();
publisherJedis.close();
}
}Common patterns for processing pub/sub messages.
// 1. Message routing based on channel
class ChannelRouter extends JedisPubSub {
private final Map<String, MessageProcessor> processors = new HashMap<>();
public void registerProcessor(String channel, MessageProcessor processor) {
processors.put(channel, processor);
}
@Override
public void onMessage(String channel, String message) {
MessageProcessor processor = processors.get(channel);
if (processor != null) {
processor.process(channel, message);
} else {
System.out.println("No processor for channel: " + channel);
}
}
// ... other required methods
}
// 2. JSON message deserialization
class JsonMessageHandler extends JedisPubSub {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void onMessage(String channel, String message) {
try {
switch (channel) {
case "user_events":
UserEvent event = objectMapper.readValue(message, UserEvent.class);
handleUserEvent(event);
break;
case "system_alerts":
SystemAlert alert = objectMapper.readValue(message, SystemAlert.class);
handleSystemAlert(alert);
break;
}
} catch (Exception e) {
System.err.println("Failed to parse message: " + e.getMessage());
}
}
// ... handler methods
}
// 3. Batch message processing
class BatchMessageProcessor extends JedisPubSub {
private final List<String> messageBuffer = new ArrayList<>();
private final int batchSize = 100;
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
public BatchMessageProcessor() {
// Process batches every 5 seconds
scheduler.scheduleAtFixedRate(this::processBatch, 5, 5, TimeUnit.SECONDS);
}
@Override
public synchronized void onMessage(String channel, String message) {
messageBuffer.add(channel + ":" + message);
if (messageBuffer.size() >= batchSize) {
processBatch();
}
}
private synchronized void processBatch() {
if (!messageBuffer.isEmpty()) {
List<String> batch = new ArrayList<>(messageBuffer);
messageBuffer.clear();
// Process batch in background
CompletableFuture.runAsync(() -> {
processBatchAsync(batch);
});
}
}
private void processBatchAsync(List<String> messages) {
// Batch processing logic
System.out.println("Processing batch of " + messages.size() + " messages");
}
}Best practices for robust pub/sub implementations.
class ResilientSubscriber extends JedisPubSub {
private final AtomicBoolean connected = new AtomicBoolean(false);
private final int maxRetries = 3;
private volatile boolean shouldReconnect = true;
@Override
public void onSubscribe(String channel, int subscribedChannels) {
connected.set(true);
System.out.println("Successfully subscribed to " + channel);
}
@Override
public void onMessage(String channel, String message) {
try {
processMessage(channel, message);
} catch (Exception e) {
System.err.println("Error processing message from " + channel + ": " + e.getMessage());
// Could implement dead letter queue or retry logic here
}
}
public void handleConnectionLoss() {
connected.set(false);
if (shouldReconnect) {
reconnectWithBackoff();
}
}
private void reconnectWithBackoff() {
int attempt = 0;
while (attempt < maxRetries && shouldReconnect) {
try {
Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Exponential backoff
Jedis jedis = new Jedis("localhost", 6379);
jedis.subscribe(this, "channel1", "channel2");
break; // Success
} catch (Exception e) {
attempt++;
System.err.println("Reconnection attempt " + attempt + " failed: " + e.getMessage());
}
}
}
public void shutdown() {
shouldReconnect = false;
unsubscribe(); // Unsubscribe from all channels
}
private void processMessage(String channel, String message) {
// Message processing logic with error handling
}
}Redis pub/sub provides a powerful messaging system for real-time communication. Jedis offers comprehensive support for both simple and advanced pub/sub scenarios with proper error handling and connection management.
Install with Tessl CLI
npx tessl i tessl/maven-redis-clients--jedis