CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

pubsub-messaging.mddocs/

Pub/Sub Messaging

Redis Pub/Sub provides publish/subscribe messaging with channels and pattern-based subscriptions. Publishers send messages to named channels, and subscribers receive messages from channels they've subscribed to, enabling real-time messaging and event-driven architectures.

Capabilities

Pub/Sub Client

Redis Pub/Sub client for subscribing to channels and receiving messages.

def pubsub(self, **kwargs) -> "PubSub": ...

class PubSub:
    def __init__(
        self,
        connection_pool: ConnectionPool,
        shard_hint: Optional[str] = None,
        ignore_subscribe_messages: bool = False,
        **kwargs
    ): ...
    
    def subscribe(self, *args, **kwargs) -> None: ...
    
    def unsubscribe(self, *args) -> None: ...
    
    def psubscribe(self, *args, **kwargs) -> None: ...
    
    def punsubscribe(self, *args) -> None: ...
    
    def listen(self) -> Iterator[Dict[str, Any]]: ...
    
    def get_message(
        self,
        ignore_subscribe_messages: bool = False,
        timeout: float = 0.0
    ) -> Optional[Dict[str, Any]]: ...
    
    def ping(self, message: Optional[EncodableT] = None) -> None: ...
    
    def close(self) -> None: ...
    
    def reset(self) -> None: ...
    
    @property
    def subscribed(self) -> bool: ...
    
    @property
    def channels(self) -> Dict[bytes, Optional[Callable]]: ...
    
    @property
    def patterns(self) -> Dict[bytes, Optional[Callable]]: ...

Publishing Operations

Redis publish operations for sending messages to channels.

def publish(self, channel: str, message: EncodableT) -> int: ...

def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...

def pubsub_numsub(self, *args: str) -> List[Tuple[bytes, int]]: ...

def pubsub_numpat(self) -> int: ...

Message Types

Different types of messages received through Pub/Sub subscriptions.

# Message dictionary structure returned by get_message() and listen()
MessageDict = Dict[str, Union[str, bytes, int, None]]

# Message types:
# - 'subscribe': Confirmation of channel subscription
# - 'unsubscribe': Confirmation of channel unsubscription  
# - 'psubscribe': Confirmation of pattern subscription
# - 'punsubscribe': Confirmation of pattern unsubscription
# - 'message': Regular channel message
# - 'pmessage': Pattern-matched message

Usage Examples

Basic Publisher and Subscriber

import redis
import threading
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def publisher():
    """Publish messages to a channel"""
    for i in range(10):
        message = f"Hello {i}"
        subscribers = r.publish('notifications', message)
        print(f"Published '{message}' to {subscribers} subscribers")
        time.sleep(1)

def subscriber():
    """Subscribe and listen for messages"""
    pubsub = r.pubsub()
    pubsub.subscribe('notifications')
    
    print("Subscriber listening for messages...")
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data']}")
        elif message['type'] == 'subscribe':
            print(f"Subscribed to: {message['channel']}")

# Run publisher and subscriber in separate threads
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread = threading.Thread(target=publisher)

subscriber_thread.start()
time.sleep(0.5)  # Let subscriber connect first
publisher_thread.start()

publisher_thread.join()

Multiple Channels Subscription

import redis

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Subscribe to multiple channels
pubsub = r.pubsub()
pubsub.subscribe('news', 'sports', 'weather')

print("Listening to multiple channels...")
for message in pubsub.listen():
    msg_type = message['type']
    
    if msg_type == 'subscribe':
        print(f"Subscribed to channel: {message['channel']}")
    elif msg_type == 'message':
        channel = message['channel']
        data = message['data']
        print(f"[{channel}] {data}")

# In another process/thread, publish to different channels:
# r.publish('news', 'Breaking news update')
# r.publish('sports', 'Game results')  
# r.publish('weather', 'Sunny today')

Pattern-Based Subscriptions

import redis

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Subscribe to channel patterns
pubsub = r.pubsub()
pubsub.psubscribe('user:*:notifications', 'system:*')

print("Listening to channel patterns...")
for message in pubsub.listen():
    msg_type = message['type']
    
    if msg_type == 'psubscribe':
        print(f"Subscribed to pattern: {message['pattern']}")
    elif msg_type == 'pmessage':
        pattern = message['pattern']
        channel = message['channel']
        data = message['data']
        print(f"Pattern [{pattern}] Channel [{channel}]: {data}")

# Publish to channels matching patterns:
# r.publish('user:1001:notifications', 'New message')
# r.publish('user:1002:notifications', 'Friend request')  
# r.publish('system:alerts', 'System maintenance')

Callback-Based Message Handling

import redis
import threading

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Message handler functions
def news_handler(message):
    print(f"📰 NEWS: {message['data']}")

def alert_handler(message):
    print(f"🚨 ALERT: {message['data']}")

def user_notification_handler(message):
    channel = message['channel']
    user_id = channel.split(':')[1]  # Extract user ID from channel
    print(f"👤 User {user_id}: {message['data']}")

# Subscribe with callback handlers
pubsub = r.pubsub()
pubsub.subscribe(**{
    'news': news_handler,
    'alerts': alert_handler
})
pubsub.psubscribe(**{
    'user:*:notifications': user_notification_handler
})

# Message processing loop
def message_processor():
    for message in pubsub.listen():
        # Handlers are called automatically for subscribed channels
        pass

# Start message processor
processor_thread = threading.Thread(target=message_processor)
processor_thread.daemon = True
processor_thread.start()

# Simulate publishing
time.sleep(0.5)
r.publish('news', 'Market update')
r.publish('alerts', 'Server overload warning')
r.publish('user:1001:notifications', 'New friend request')

time.sleep(2)
pubsub.close()

Non-Blocking Message Retrieval

import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

pubsub = r.pubsub()
pubsub.subscribe('events')

print("Non-blocking message retrieval...")
start_time = time.time()

while time.time() - start_time < 10:  # Run for 10 seconds
    # Get message with timeout (non-blocking)
    message = pubsub.get_message(timeout=1.0)
    
    if message:
        if message['type'] == 'message':
            print(f"Received: {message['data']}")
        elif message['type'] == 'subscribe':
            print(f"Subscribed to: {message['channel']}")
    else:
        print("No message received, doing other work...")
        # Perform other tasks while waiting for messages
        time.sleep(0.1)

pubsub.close()

Pub/Sub with Authentication and Error Handling

import redis
from redis.exceptions import ConnectionError, ResponseError

def create_authenticated_pubsub():
    """Create Pub/Sub client with authentication"""
    try:
        r = redis.Redis(
            host='localhost',
            port=6379,
            password='your_password',
            decode_responses=True,
            socket_timeout=5,
            socket_connect_timeout=5,
            retry_on_timeout=True
        )
        
        # Test connection
        r.ping()
        
        pubsub = r.pubsub()
        return r, pubsub
        
    except ConnectionError as e:
        print(f"Connection failed: {e}")
        raise
    except ResponseError as e:
        print(f"Authentication failed: {e}")
        raise

def robust_subscriber(channels):
    """Robust subscriber with error handling and reconnection"""
    max_retries = 5
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            r, pubsub = create_authenticated_pubsub()
            pubsub.subscribe(*channels)
            
            print(f"Connected and subscribed to: {channels}")
            retry_count = 0  # Reset retry count on success
            
            for message in pubsub.listen():
                if message['type'] == 'message':
                    print(f"[{message['channel']}] {message['data']}")
                    
        except ConnectionError as e:
            retry_count += 1
            print(f"Connection lost (attempt {retry_count}): {e}")
            
            if retry_count < max_retries:
                wait_time = min(2 ** retry_count, 30)  # Exponential backoff
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print("Max retries exceeded")
                raise
        except KeyboardInterrupt:
            print("Shutting down subscriber...")
            if 'pubsub' in locals():
                pubsub.close()
            break

# Use robust subscriber
robust_subscriber(['important', 'alerts'])

Chat Room Implementation

import redis
import threading
import time
from datetime import datetime

class ChatRoom:
    def __init__(self, room_name, username):
        self.room_name = room_name
        self.username = username
        self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.pubsub = self.r.pubsub()
        self.running = False
    
    def join(self):
        """Join the chat room"""
        channel = f"chat:{self.room_name}"
        self.pubsub.subscribe(channel)
        self.running = True
        
        # Announce joining
        self.r.publish(channel, f">>> {self.username} joined the room")
        
        # Start message listener
        listener_thread = threading.Thread(target=self._listen_messages)
        listener_thread.daemon = True
        listener_thread.start()
        
        print(f"Joined chat room: {self.room_name}")
        print("Type messages (or 'quit' to leave):")
        
        # Message input loop
        try:
            while self.running:
                message = input()
                if message.lower() == 'quit':
                    break
                self.send_message(message)
        except KeyboardInterrupt:
            pass
        finally:
            self.leave()
    
    def send_message(self, message):
        """Send message to chat room"""
        if message.strip():
            timestamp = datetime.now().strftime('%H:%M:%S')
            formatted_message = f"[{timestamp}] {self.username}: {message}"
            channel = f"chat:{self.room_name}"
            self.r.publish(channel, formatted_message)
    
    def _listen_messages(self):
        """Listen for incoming messages"""
        for message in self.pubsub.listen():
            if not self.running:
                break
                
            if message['type'] == 'message':
                print(message['data'])
    
    def leave(self):
        """Leave the chat room"""
        self.running = False
        channel = f"chat:{self.room_name}"
        self.r.publish(channel, f"<<< {self.username} left the room")
        self.pubsub.close()
        print(f"Left chat room: {self.room_name}")

# Usage example
if __name__ == "__main__":
    room = ChatRoom("general", "Alice")
    room.join()

Pub/Sub Statistics and Monitoring

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def monitor_pubsub():
    """Monitor Pub/Sub channels and subscriptions"""
    print("Pub/Sub Monitoring Dashboard")
    print("-" * 40)
    
    while True:
        try:
            # Get active channels
            channels = r.pubsub_channels()
            print(f"Active channels: {len(channels)}")
            
            if channels:
                # Get subscriber counts for each channel
                channel_stats = r.pubsub_numsub(*channels)
                for channel, subscriber_count in channel_stats:
                    print(f"  {channel}: {subscriber_count} subscribers")
            
            # Get pattern subscription count
            pattern_count = r.pubsub_numpat()
            print(f"Pattern subscriptions: {pattern_count}")
            
            print("-" * 40)
            time.sleep(5)
            
        except KeyboardInterrupt:
            print("Monitoring stopped")
            break
        except Exception as e:
            print(f"Monitoring error: {e}")
            time.sleep(5)

# Monitor Pub/Sub activity
monitor_pubsub()

Event-Driven Architecture Example

import redis
import json
import threading
from datetime import datetime

class EventBus:
    def __init__(self):
        self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.pubsub = self.r.pubsub()
        self.handlers = {}
        self.running = False
    
    def subscribe_to_events(self, event_types):
        """Subscribe to specific event types"""
        channels = [f"events:{event_type}" for event_type in event_types]
        self.pubsub.subscribe(*channels)
        
        self.running = True
        listener_thread = threading.Thread(target=self._event_listener)
        listener_thread.daemon = True
        listener_thread.start()
    
    def register_handler(self, event_type, handler):
        """Register event handler function"""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    def publish_event(self, event_type, data):
        """Publish event to the bus"""
        event = {
            'type': event_type,
            'data': data,
            'timestamp': datetime.now().isoformat(),
            'id': int(time.time() * 1000000)  # Microsecond timestamp as ID
        }
        
        channel = f"events:{event_type}"
        self.r.publish(channel, json.dumps(event))
    
    def _event_listener(self):
        """Listen for events and dispatch to handlers"""
        for message in self.pubsub.listen():
            if not self.running:
                break
                
            if message['type'] == 'message':
                try:
                    event = json.loads(message['data'])
                    event_type = event['type']
                    
                    # Dispatch to registered handlers
                    if event_type in self.handlers:
                        for handler in self.handlers[event_type]:
                            try:
                                handler(event)
                            except Exception as e:
                                print(f"Handler error for {event_type}: {e}")
                                
                except json.JSONDecodeError as e:
                    print(f"Invalid event format: {e}")
    
    def stop(self):
        """Stop the event bus"""
        self.running = False
        self.pubsub.close()

# Event handlers
def user_created_handler(event):
    user_data = event['data']
    print(f"🆕 User created: {user_data['name']} ({user_data['email']})")

def order_placed_handler(event):
    order_data = event['data']
    print(f"🛒 Order placed: #{order_data['order_id']} by user {order_data['user_id']}")

def system_alert_handler(event):
    alert_data = event['data']
    print(f"🚨 ALERT: {alert_data['message']} (Level: {alert_data['level']})")

# Usage example
if __name__ == "__main__":
    # Create event bus
    event_bus = EventBus()
    
    # Register handlers
    event_bus.register_handler('user_created', user_created_handler)
    event_bus.register_handler('order_placed', order_placed_handler)
    event_bus.register_handler('system_alert', system_alert_handler)
    
    # Subscribe to events
    event_bus.subscribe_to_events(['user_created', 'order_placed', 'system_alert'])
    
    # Simulate publishing events
    time.sleep(0.5)  # Let subscriber connect
    
    event_bus.publish_event('user_created', {
        'user_id': 1001,
        'name': 'John Doe',
        'email': 'john@example.com'
    })
    
    event_bus.publish_event('order_placed', {
        'order_id': 'ORD-12345',
        'user_id': 1001,
        'total': 99.99
    })
    
    event_bus.publish_event('system_alert', {
        'message': 'High memory usage detected',
        'level': 'WARNING'
    })
    
    # Keep running to receive events
    try:
        time.sleep(10)
    except KeyboardInterrupt:
        pass
    finally:
        event_bus.stop()

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json