CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-redis-clients--jedis

Jedis is a blazingly small and sane Redis java client.

Overview
Eval results
Files

pubsub.mddocs/

Pub/Sub Messaging

This document covers Redis publish/subscribe messaging functionality, including channel subscriptions, pattern matching, message handling, and both text and binary message support.

Core Pub/Sub Classes

JedisPubSub

Abstract class for handling Redis pub/sub messages with string-based channels and messages.

public abstract class JedisPubSub {
    /**
     * Called when a message is received on a subscribed channel
     * @param channel Channel name that received the message
     * @param message Message content
     */
    public abstract void onMessage(String channel, String message);
    
    /**
     * Called when a message is received matching a pattern subscription
     * @param pattern Pattern that matched
     * @param channel Actual channel name
     * @param message Message content
     */
    public abstract void onPMessage(String pattern, String channel, String message);
    
    /**
     * Called when successfully subscribed to a channel
     * @param channel Channel name
     * @param subscribedChannels Total number of subscribed channels
     */
    public abstract void onSubscribe(String channel, int subscribedChannels);
    
    /**
     * Called when successfully subscribed to a pattern
     * @param pattern Pattern subscribed to
     * @param subscribedChannels Total number of subscribed patterns
     */
    public abstract void onPSubscribe(String pattern, int subscribedChannels);
    
    /**
     * Called when unsubscribed from a channel
     * @param channel Channel name
     * @param subscribedChannels Remaining subscribed channels
     */
    public abstract void onUnsubscribe(String channel, int subscribedChannels);
    
    /**
     * Called when unsubscribed from a pattern
     * @param pattern Pattern unsubscribed from
     * @param subscribedChannels Remaining subscribed patterns
     */
    public abstract void onPUnsubscribe(String pattern, int subscribedChannels);
    
    /**
     * Subscribe to channels
     * @param channels Channel names to subscribe to
     */
    public void subscribe(String... channels);
    
    /**
     * Subscribe to patterns
     * @param patterns Pattern strings to subscribe to
     */
    public void psubscribe(String... patterns);
    
    /**
     * Unsubscribe from channels
     * @param channels Channel names to unsubscribe from (empty = all)
     */
    public void unsubscribe(String... channels);
    
    /**
     * Unsubscribe from patterns
     * @param patterns Pattern strings to unsubscribe from (empty = all)
     */
    public void punsubscribe(String... patterns);
    
    /**
     * Check if subscribed to any channels or patterns
     * @return true if subscribed
     */
    public boolean isSubscribed();
    
    /**
     * Get number of subscribed channels
     * @return Number of channel subscriptions
     */
    public int getSubscribedChannels();
    
    /**
     * Get number of subscribed patterns
     * @return Number of pattern subscriptions
     */
    public int getSubscribedPatterns();
}

Usage Example

class MessageHandler extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Received on " + channel + ": " + message);
        
        // Handle different message types
        if (channel.equals("notifications")) {
            handleNotification(message);
        } else if (channel.equals("events")) {
            handleEvent(message);
        }
    }
    
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("Pattern " + pattern + " matched " + channel + ": " + message);
    }
    
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("Subscribed to " + channel + 
                          " (total subscriptions: " + subscribedChannels + ")");
    }
    
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        System.out.println("Subscribed to pattern " + pattern);
    }
    
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("Unsubscribed from " + channel);
    }
    
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("Unsubscribed from pattern " + pattern);
    }
    
    private void handleNotification(String message) {
        // Process notification
    }
    
    private void handleEvent(String message) {
        // Process event
    }
}

// Usage
Jedis subscriberJedis = new Jedis("localhost", 6379);
MessageHandler handler = new MessageHandler();

// Subscribe in separate thread (blocking operation)
new Thread(() -> {
    try {
        subscriberJedis.subscribe(handler, "notifications", "events", "alerts");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        subscriberJedis.close();
    }
}).start();

// Publish messages from another connection
Jedis publisherJedis = new Jedis("localhost", 6379);
publisherJedis.publish("notifications", "System maintenance scheduled");
publisherJedis.publish("events", "User login: user123");
publisherJedis.close();

// Unsubscribe when done
handler.unsubscribe("alerts");  // Unsubscribe from specific channel
// handler.unsubscribe();       // Unsubscribe from all channels

BinaryJedisPubSub

Abstract class for handling pub/sub messages with binary data support.

public abstract class BinaryJedisPubSub {
    /**
     * Called when a binary message is received on a subscribed channel
     * @param channel Channel name as bytes
     * @param message Binary message content
     */
    public abstract void onMessage(byte[] channel, byte[] message);
    
    /**
     * Called when a binary message matches a pattern subscription
     * @param pattern Pattern as bytes
     * @param channel Channel name as bytes
     * @param message Binary message content
     */
    public abstract void onPMessage(byte[] pattern, byte[] channel, byte[] message);
    
    /**
     * Called when successfully subscribed to a binary channel
     * @param channel Channel name as bytes
     * @param subscribedChannels Total subscribed channels
     */
    public abstract void onSubscribe(byte[] channel, int subscribedChannels);
    
    /**
     * Called when successfully subscribed to a binary pattern
     * @param pattern Pattern as bytes
     * @param subscribedChannels Total subscribed patterns
     */
    public abstract void onPSubscribe(byte[] pattern, int subscribedChannels);
    
    /**
     * Called when unsubscribed from a binary channel
     * @param channel Channel name as bytes
     * @param subscribedChannels Remaining subscribed channels
     */
    public abstract void onUnsubscribe(byte[] channel, int subscribedChannels);
    
    /**
     * Called when unsubscribed from a binary pattern
     * @param pattern Pattern as bytes
     * @param subscribedChannels Remaining subscribed patterns
     */
    public abstract void onPUnsubscribe(byte[] pattern, int subscribedChannels);
    
    /**
     * Subscribe to binary channels
     * @param channels Channel names as byte arrays
     */
    public void subscribe(byte[]... channels);
    
    /**
     * Subscribe to binary patterns
     * @param patterns Pattern byte arrays
     */
    public void psubscribe(byte[]... patterns);
    
    /**
     * Unsubscribe from binary channels
     * @param channels Channel names as byte arrays
     */
    public void unsubscribe(byte[]... channels);
    
    /**
     * Unsubscribe from binary patterns
     * @param patterns Pattern byte arrays
     */
    public void punsubscribe(byte[]... patterns);
    
    /**
     * Check if subscribed to any channels or patterns
     * @return true if subscribed
     */
    public boolean isSubscribed();
}

Usage Example

class BinaryMessageHandler extends BinaryJedisPubSub {
    @Override
    public void onMessage(byte[] channel, byte[] message) {
        String channelStr = new String(channel, StandardCharsets.UTF_8);
        
        if (channelStr.equals("image_data")) {
            handleImageData(message);
        } else if (channelStr.equals("file_uploads")) {
            handleFileUpload(message);
        }
    }
    
    @Override
    public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
        // Handle pattern-matched binary messages
        String patternStr = new String(pattern, StandardCharsets.UTF_8);
        String channelStr = new String(channel, StandardCharsets.UTF_8);
        System.out.println("Binary pattern " + patternStr + " matched " + channelStr);
    }
    
    @Override
    public void onSubscribe(byte[] channel, int subscribedChannels) {
        String channelStr = new String(channel, StandardCharsets.UTF_8);
        System.out.println("Subscribed to binary channel: " + channelStr);
    }
    
    // ... other callback implementations
    
    private void handleImageData(byte[] imageBytes) {
        // Process binary image data
        System.out.println("Received image data: " + imageBytes.length + " bytes");
    }
    
    private void handleFileUpload(byte[] fileData) {
        // Process binary file upload
        System.out.println("Received file upload: " + fileData.length + " bytes");
    }
}

// Usage
Jedis subscriberJedis = new Jedis("localhost", 6379);
BinaryMessageHandler binaryHandler = new BinaryMessageHandler();

new Thread(() -> {
    try {
        subscriberJedis.subscribe(binaryHandler, 
                               "image_data".getBytes(),
                               "file_uploads".getBytes());
    } finally {
        subscriberJedis.close();
    }
}).start();

// Publish binary data
Jedis publisherJedis = new Jedis("localhost", 6379);
byte[] imageData = loadImageFile();
publisherJedis.publish("image_data".getBytes(), imageData);
publisherJedis.close();

Pub/Sub Commands

Publishing Messages

Core publishing commands available on any Jedis client.

public interface JedisCommands {
    /**
     * Publish message to channel
     * @param channel Channel name
     * @param message Message content
     * @return Number of clients that received the message
     */
    Long publish(String channel, String message);
    
    /**
     * Publish binary message to channel
     * @param channel Channel name as bytes
     * @param message Binary message content
     * @return Number of clients that received the message
     */
    Long publish(byte[] channel, byte[] message);
    
    /**
     * Get number of subscribers for channels
     * @param channels Channel names
     * @return List of subscriber counts for each channel
     */
    List<Long> pubsubNumSub(String... channels);
    
    /**
     * Get number of subscriptions to patterns
     * @return Number of pattern subscriptions
     */
    Long pubsubNumPat();
    
    /**
     * Get number of subscribers for binary channels
     * @param channels Channel names as bytes
     * @return List of subscriber counts
     */
    List<Long> pubsubNumSub(byte[]... channels);
    
    /**
     * List active channels
     * @return List of channels with at least one subscriber
     */
    List<String> pubsubChannels();
    
    /**
     * List active channels matching pattern
     * @param pattern Channel pattern
     * @return List of matching channels with subscribers
     */
    List<String> pubsubChannels(String pattern);
    
    /**
     * Get detailed pubsub information for sharded channels
     * @param channels Shard channel names
     * @return List of shard channel subscriber counts
     */
    List<Long> pubsubShardNumSub(String... channels);
    
    /**
     * List active shard channels
     * @return List of shard channels with subscribers
     */
    List<String> pubsubShardChannels();
    
    /**
     * List active shard channels matching pattern
     * @param pattern Shard channel pattern
     * @return List of matching shard channels
     */
    List<String> pubsubShardChannels(String pattern);
}

Subscription Management

Methods available during pub/sub subscriptions.

// Available in JedisPubSub and BinaryJedisPubSub
public void subscribe(String... channels);     // Subscribe to channels
public void psubscribe(String... patterns);   // Subscribe to patterns
public void unsubscribe(String... channels);  // Unsubscribe from channels
public void punsubscribe(String... patterns); // Unsubscribe from patterns

Usage Example

// Publisher
Jedis publisher = new Jedis("localhost", 6379);

// Publish to different channels
Long subscribers1 = publisher.publish("news", "Breaking news update");
Long subscribers2 = publisher.publish("chat:room1", "Hello everyone!");
Long subscribers3 = publisher.publish("alerts", "System maintenance in 1 hour");

System.out.println("News channel has " + subscribers1 + " subscribers");

// Check channel activity
List<String> activeChannels = publisher.pubsubChannels();
System.out.println("Active channels: " + activeChannels);

// Get subscriber counts
List<Long> counts = publisher.pubsubNumSub("news", "chat:room1", "alerts");
System.out.println("Subscriber counts: " + counts);

// Get pattern subscription count
Long patternSubs = publisher.pubsubNumPat();
System.out.println("Pattern subscriptions: " + patternSubs);

publisher.close();

Advanced Pub/Sub Features

Sharded Pub/Sub

Redis 7.0+ feature for sharded pub/sub that scales across cluster nodes.

public interface JedisCommands {
    /**
     * Publish message to sharded channel
     * @param shardChannel Shard channel name
     * @param message Message content
     * @return Number of clients that received the message
     */
    Long spublish(String shardChannel, String message);
    
    /**
     * Subscribe to sharded channels
     * @param jedisPubSub Message handler
     * @param shardChannels Shard channel names
     */
    void ssubscribe(JedisPubSub jedisPubSub, String... shardChannels);
    
    /**
     * Subscribe to binary sharded channels
     * @param jedisPubSub Binary message handler
     * @param shardChannels Shard channel names as bytes
     */
    void ssubscribe(BinaryJedisPubSub jedisPubSub, byte[]... shardChannels);
}

Pattern Subscriptions

Subscribe to channels using glob-style patterns.

// Pattern examples:
// "news.*"     - matches "news.sports", "news.politics", etc.
// "chat:*"     - matches "chat:room1", "chat:room2", etc.
// "log:?:*"    - matches "log:1:debug", "log:2:error", etc.
// "*"          - matches all channels

class PatternSubscriber extends JedisPubSub {
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("Pattern: " + pattern);
        System.out.println("Channel: " + channel);
        System.out.println("Message: " + message);
        
        // Route based on pattern
        switch (pattern) {
            case "news.*":
                handleNewsMessage(channel, message);
                break;
            case "chat:*":
                handleChatMessage(channel, message);
                break;
            case "error:*":
                handleErrorMessage(channel, message);
                break;
        }
    }
    
    @Override
    public void onMessage(String channel, String message) {
        // Handle direct channel subscriptions
    }
    
    // ... other required methods
}

// Usage
PatternSubscriber subscriber = new PatternSubscriber();
Jedis jedis = new Jedis("localhost", 6379);

// Subscribe to patterns (blocking)
jedis.psubscribe(subscriber, "news.*", "chat:*", "error:*");

Multi-Channel Publishing

Efficient publishing to multiple channels or patterns.

// Publish to multiple related channels
Jedis publisher = new Jedis("localhost", 6379);

String eventData = "User logged in: user123";

// Publish to multiple channels for different consumers
publisher.publish("events", eventData);          // General events
publisher.publish("user:events", eventData);     // User-specific events  
publisher.publish("audit:login", eventData);     // Audit trail
publisher.publish("analytics:user", eventData);  // Analytics

// Publish different messages to related channels
String baseChannel = "game:room1:";
publisher.publish(baseChannel + "chat", "Player joined the game");
publisher.publish(baseChannel + "state", "Game started");
publisher.publish(baseChannel + "score", "Score updated");

Connection Management for Pub/Sub

Proper connection handling for pub/sub operations.

public class PubSubManager {
    private final Jedis subscriberJedis;
    private final Jedis publisherJedis;
    private final ExecutorService executor;
    private volatile boolean running = true;
    
    public PubSubManager() {
        this.subscriberJedis = new Jedis("localhost", 6379);
        this.publisherJedis = new Jedis("localhost", 6379);
        this.executor = Executors.newCachedThreadPool();
    }
    
    public void startSubscribing(JedisPubSub pubSub, String... channels) {
        executor.submit(() -> {
            try {
                subscriberJedis.subscribe(pubSub, channels);
            } catch (Exception e) {
                System.err.println("Subscription error: " + e.getMessage());
            }
        });
    }
    
    public void startPatternSubscribing(JedisPubSub pubSub, String... patterns) {
        executor.submit(() -> {
            try {
                subscriberJedis.psubscribe(pubSub, patterns);
            } catch (Exception e) {
                System.err.println("Pattern subscription error: " + e.getMessage());
            }
        });
    }
    
    public Long publish(String channel, String message) {
        return publisherJedis.publish(channel, message);
    }
    
    public void shutdown() {
        running = false;
        executor.shutdown();
        
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        subscriberJedis.close();
        publisherJedis.close();
    }
}

Message Processing Patterns

Common patterns for processing pub/sub messages.

// 1. Message routing based on channel
class ChannelRouter extends JedisPubSub {
    private final Map<String, MessageProcessor> processors = new HashMap<>();
    
    public void registerProcessor(String channel, MessageProcessor processor) {
        processors.put(channel, processor);
    }
    
    @Override
    public void onMessage(String channel, String message) {
        MessageProcessor processor = processors.get(channel);
        if (processor != null) {
            processor.process(channel, message);
        } else {
            System.out.println("No processor for channel: " + channel);
        }
    }
    
    // ... other required methods
}

// 2. JSON message deserialization
class JsonMessageHandler extends JedisPubSub {
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void onMessage(String channel, String message) {
        try {
            switch (channel) {
                case "user_events":
                    UserEvent event = objectMapper.readValue(message, UserEvent.class);
                    handleUserEvent(event);
                    break;
                case "system_alerts":
                    SystemAlert alert = objectMapper.readValue(message, SystemAlert.class);
                    handleSystemAlert(alert);
                    break;
            }
        } catch (Exception e) {
            System.err.println("Failed to parse message: " + e.getMessage());
        }
    }
    
    // ... handler methods
}

// 3. Batch message processing
class BatchMessageProcessor extends JedisPubSub {
    private final List<String> messageBuffer = new ArrayList<>();
    private final int batchSize = 100;
    private final ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor();
    
    public BatchMessageProcessor() {
        // Process batches every 5 seconds
        scheduler.scheduleAtFixedRate(this::processBatch, 5, 5, TimeUnit.SECONDS);
    }
    
    @Override
    public synchronized void onMessage(String channel, String message) {
        messageBuffer.add(channel + ":" + message);
        
        if (messageBuffer.size() >= batchSize) {
            processBatch();
        }
    }
    
    private synchronized void processBatch() {
        if (!messageBuffer.isEmpty()) {
            List<String> batch = new ArrayList<>(messageBuffer);
            messageBuffer.clear();
            
            // Process batch in background
            CompletableFuture.runAsync(() -> {
                processBatchAsync(batch);
            });
        }
    }
    
    private void processBatchAsync(List<String> messages) {
        // Batch processing logic
        System.out.println("Processing batch of " + messages.size() + " messages");
    }
}

Error Handling and Resilience

Best practices for robust pub/sub implementations.

class ResilientSubscriber extends JedisPubSub {
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final int maxRetries = 3;
    private volatile boolean shouldReconnect = true;
    
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        connected.set(true);
        System.out.println("Successfully subscribed to " + channel);
    }
    
    @Override
    public void onMessage(String channel, String message) {
        try {
            processMessage(channel, message);
        } catch (Exception e) {
            System.err.println("Error processing message from " + channel + ": " + e.getMessage());
            // Could implement dead letter queue or retry logic here
        }
    }
    
    public void handleConnectionLoss() {
        connected.set(false);
        
        if (shouldReconnect) {
            reconnectWithBackoff();
        }
    }
    
    private void reconnectWithBackoff() {
        int attempt = 0;
        while (attempt < maxRetries && shouldReconnect) {
            try {
                Thread.sleep(Math.min(1000 * (1 << attempt), 30000)); // Exponential backoff
                
                Jedis jedis = new Jedis("localhost", 6379);
                jedis.subscribe(this, "channel1", "channel2");
                
                break; // Success
            } catch (Exception e) {
                attempt++;
                System.err.println("Reconnection attempt " + attempt + " failed: " + e.getMessage());
            }
        }
    }
    
    public void shutdown() {
        shouldReconnect = false;
        unsubscribe(); // Unsubscribe from all channels
    }
    
    private void processMessage(String channel, String message) {
        // Message processing logic with error handling
    }
}

Redis pub/sub provides a powerful messaging system for real-time communication. Jedis offers comprehensive support for both simple and advanced pub/sub scenarios with proper error handling and connection management.

Install with Tessl CLI

npx tessl i tessl/maven-redis-clients--jedis

docs

authentication.md

client-side-caching.md

clustering.md

commands-operations.md

connection-management.md

core-clients.md

exceptions.md

index.md

modules.md

parameters.md

pubsub.md

transactions-pipelining.md

tile.json