Python client for Redis database and key-value store
—
Redis Pub/Sub provides publish/subscribe messaging with channels and pattern-based subscriptions. Publishers send messages to named channels, and subscribers receive messages from channels they've subscribed to, enabling real-time messaging and event-driven architectures.
Redis Pub/Sub client for subscribing to channels and receiving messages.
def pubsub(self, **kwargs) -> "PubSub": ...
class PubSub:
def __init__(
self,
connection_pool: ConnectionPool,
shard_hint: Optional[str] = None,
ignore_subscribe_messages: bool = False,
**kwargs
): ...
def subscribe(self, *args, **kwargs) -> None: ...
def unsubscribe(self, *args) -> None: ...
def psubscribe(self, *args, **kwargs) -> None: ...
def punsubscribe(self, *args) -> None: ...
def listen(self) -> Iterator[Dict[str, Any]]: ...
def get_message(
self,
ignore_subscribe_messages: bool = False,
timeout: float = 0.0
) -> Optional[Dict[str, Any]]: ...
def ping(self, message: Optional[EncodableT] = None) -> None: ...
def close(self) -> None: ...
def reset(self) -> None: ...
@property
def subscribed(self) -> bool: ...
@property
def channels(self) -> Dict[bytes, Optional[Callable]]: ...
@property
def patterns(self) -> Dict[bytes, Optional[Callable]]: ...Redis publish operations for sending messages to channels.
def publish(self, channel: str, message: EncodableT) -> int: ...
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
def pubsub_numsub(self, *args: str) -> List[Tuple[bytes, int]]: ...
def pubsub_numpat(self) -> int: ...Different types of messages received through Pub/Sub subscriptions.
# Message dictionary structure returned by get_message() and listen()
MessageDict = Dict[str, Union[str, bytes, int, None]]
# Message types:
# - 'subscribe': Confirmation of channel subscription
# - 'unsubscribe': Confirmation of channel unsubscription
# - 'psubscribe': Confirmation of pattern subscription
# - 'punsubscribe': Confirmation of pattern unsubscription
# - 'message': Regular channel message
# - 'pmessage': Pattern-matched messageimport redis
import threading
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def publisher():
"""Publish messages to a channel"""
for i in range(10):
message = f"Hello {i}"
subscribers = r.publish('notifications', message)
print(f"Published '{message}' to {subscribers} subscribers")
time.sleep(1)
def subscriber():
"""Subscribe and listen for messages"""
pubsub = r.pubsub()
pubsub.subscribe('notifications')
print("Subscriber listening for messages...")
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
elif message['type'] == 'subscribe':
print(f"Subscribed to: {message['channel']}")
# Run publisher and subscriber in separate threads
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread = threading.Thread(target=publisher)
subscriber_thread.start()
time.sleep(0.5) # Let subscriber connect first
publisher_thread.start()
publisher_thread.join()import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Subscribe to multiple channels
pubsub = r.pubsub()
pubsub.subscribe('news', 'sports', 'weather')
print("Listening to multiple channels...")
for message in pubsub.listen():
msg_type = message['type']
if msg_type == 'subscribe':
print(f"Subscribed to channel: {message['channel']}")
elif msg_type == 'message':
channel = message['channel']
data = message['data']
print(f"[{channel}] {data}")
# In another process/thread, publish to different channels:
# r.publish('news', 'Breaking news update')
# r.publish('sports', 'Game results')
# r.publish('weather', 'Sunny today')import redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Subscribe to channel patterns
pubsub = r.pubsub()
pubsub.psubscribe('user:*:notifications', 'system:*')
print("Listening to channel patterns...")
for message in pubsub.listen():
msg_type = message['type']
if msg_type == 'psubscribe':
print(f"Subscribed to pattern: {message['pattern']}")
elif msg_type == 'pmessage':
pattern = message['pattern']
channel = message['channel']
data = message['data']
print(f"Pattern [{pattern}] Channel [{channel}]: {data}")
# Publish to channels matching patterns:
# r.publish('user:1001:notifications', 'New message')
# r.publish('user:1002:notifications', 'Friend request')
# r.publish('system:alerts', 'System maintenance')import redis
import threading
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Message handler functions
def news_handler(message):
print(f"📰 NEWS: {message['data']}")
def alert_handler(message):
print(f"🚨 ALERT: {message['data']}")
def user_notification_handler(message):
channel = message['channel']
user_id = channel.split(':')[1] # Extract user ID from channel
print(f"👤 User {user_id}: {message['data']}")
# Subscribe with callback handlers
pubsub = r.pubsub()
pubsub.subscribe(**{
'news': news_handler,
'alerts': alert_handler
})
pubsub.psubscribe(**{
'user:*:notifications': user_notification_handler
})
# Message processing loop
def message_processor():
for message in pubsub.listen():
# Handlers are called automatically for subscribed channels
pass
# Start message processor
processor_thread = threading.Thread(target=message_processor)
processor_thread.daemon = True
processor_thread.start()
# Simulate publishing
time.sleep(0.5)
r.publish('news', 'Market update')
r.publish('alerts', 'Server overload warning')
r.publish('user:1001:notifications', 'New friend request')
time.sleep(2)
pubsub.close()import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('events')
print("Non-blocking message retrieval...")
start_time = time.time()
while time.time() - start_time < 10: # Run for 10 seconds
# Get message with timeout (non-blocking)
message = pubsub.get_message(timeout=1.0)
if message:
if message['type'] == 'message':
print(f"Received: {message['data']}")
elif message['type'] == 'subscribe':
print(f"Subscribed to: {message['channel']}")
else:
print("No message received, doing other work...")
# Perform other tasks while waiting for messages
time.sleep(0.1)
pubsub.close()import redis
from redis.exceptions import ConnectionError, ResponseError
def create_authenticated_pubsub():
"""Create Pub/Sub client with authentication"""
try:
r = redis.Redis(
host='localhost',
port=6379,
password='your_password',
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True
)
# Test connection
r.ping()
pubsub = r.pubsub()
return r, pubsub
except ConnectionError as e:
print(f"Connection failed: {e}")
raise
except ResponseError as e:
print(f"Authentication failed: {e}")
raise
def robust_subscriber(channels):
"""Robust subscriber with error handling and reconnection"""
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
r, pubsub = create_authenticated_pubsub()
pubsub.subscribe(*channels)
print(f"Connected and subscribed to: {channels}")
retry_count = 0 # Reset retry count on success
for message in pubsub.listen():
if message['type'] == 'message':
print(f"[{message['channel']}] {message['data']}")
except ConnectionError as e:
retry_count += 1
print(f"Connection lost (attempt {retry_count}): {e}")
if retry_count < max_retries:
wait_time = min(2 ** retry_count, 30) # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Max retries exceeded")
raise
except KeyboardInterrupt:
print("Shutting down subscriber...")
if 'pubsub' in locals():
pubsub.close()
break
# Use robust subscriber
robust_subscriber(['important', 'alerts'])import redis
import threading
import time
from datetime import datetime
class ChatRoom:
def __init__(self, room_name, username):
self.room_name = room_name
self.username = username
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.pubsub = self.r.pubsub()
self.running = False
def join(self):
"""Join the chat room"""
channel = f"chat:{self.room_name}"
self.pubsub.subscribe(channel)
self.running = True
# Announce joining
self.r.publish(channel, f">>> {self.username} joined the room")
# Start message listener
listener_thread = threading.Thread(target=self._listen_messages)
listener_thread.daemon = True
listener_thread.start()
print(f"Joined chat room: {self.room_name}")
print("Type messages (or 'quit' to leave):")
# Message input loop
try:
while self.running:
message = input()
if message.lower() == 'quit':
break
self.send_message(message)
except KeyboardInterrupt:
pass
finally:
self.leave()
def send_message(self, message):
"""Send message to chat room"""
if message.strip():
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_message = f"[{timestamp}] {self.username}: {message}"
channel = f"chat:{self.room_name}"
self.r.publish(channel, formatted_message)
def _listen_messages(self):
"""Listen for incoming messages"""
for message in self.pubsub.listen():
if not self.running:
break
if message['type'] == 'message':
print(message['data'])
def leave(self):
"""Leave the chat room"""
self.running = False
channel = f"chat:{self.room_name}"
self.r.publish(channel, f"<<< {self.username} left the room")
self.pubsub.close()
print(f"Left chat room: {self.room_name}")
# Usage example
if __name__ == "__main__":
room = ChatRoom("general", "Alice")
room.join()import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def monitor_pubsub():
"""Monitor Pub/Sub channels and subscriptions"""
print("Pub/Sub Monitoring Dashboard")
print("-" * 40)
while True:
try:
# Get active channels
channels = r.pubsub_channels()
print(f"Active channels: {len(channels)}")
if channels:
# Get subscriber counts for each channel
channel_stats = r.pubsub_numsub(*channels)
for channel, subscriber_count in channel_stats:
print(f" {channel}: {subscriber_count} subscribers")
# Get pattern subscription count
pattern_count = r.pubsub_numpat()
print(f"Pattern subscriptions: {pattern_count}")
print("-" * 40)
time.sleep(5)
except KeyboardInterrupt:
print("Monitoring stopped")
break
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(5)
# Monitor Pub/Sub activity
monitor_pubsub()import redis
import json
import threading
from datetime import datetime
class EventBus:
def __init__(self):
self.r = redis.Redis(host='localhost', port=6379, decode_responses=True)
self.pubsub = self.r.pubsub()
self.handlers = {}
self.running = False
def subscribe_to_events(self, event_types):
"""Subscribe to specific event types"""
channels = [f"events:{event_type}" for event_type in event_types]
self.pubsub.subscribe(*channels)
self.running = True
listener_thread = threading.Thread(target=self._event_listener)
listener_thread.daemon = True
listener_thread.start()
def register_handler(self, event_type, handler):
"""Register event handler function"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def publish_event(self, event_type, data):
"""Publish event to the bus"""
event = {
'type': event_type,
'data': data,
'timestamp': datetime.now().isoformat(),
'id': int(time.time() * 1000000) # Microsecond timestamp as ID
}
channel = f"events:{event_type}"
self.r.publish(channel, json.dumps(event))
def _event_listener(self):
"""Listen for events and dispatch to handlers"""
for message in self.pubsub.listen():
if not self.running:
break
if message['type'] == 'message':
try:
event = json.loads(message['data'])
event_type = event['type']
# Dispatch to registered handlers
if event_type in self.handlers:
for handler in self.handlers[event_type]:
try:
handler(event)
except Exception as e:
print(f"Handler error for {event_type}: {e}")
except json.JSONDecodeError as e:
print(f"Invalid event format: {e}")
def stop(self):
"""Stop the event bus"""
self.running = False
self.pubsub.close()
# Event handlers
def user_created_handler(event):
user_data = event['data']
print(f"🆕 User created: {user_data['name']} ({user_data['email']})")
def order_placed_handler(event):
order_data = event['data']
print(f"🛒 Order placed: #{order_data['order_id']} by user {order_data['user_id']}")
def system_alert_handler(event):
alert_data = event['data']
print(f"🚨 ALERT: {alert_data['message']} (Level: {alert_data['level']})")
# Usage example
if __name__ == "__main__":
# Create event bus
event_bus = EventBus()
# Register handlers
event_bus.register_handler('user_created', user_created_handler)
event_bus.register_handler('order_placed', order_placed_handler)
event_bus.register_handler('system_alert', system_alert_handler)
# Subscribe to events
event_bus.subscribe_to_events(['user_created', 'order_placed', 'system_alert'])
# Simulate publishing events
time.sleep(0.5) # Let subscriber connect
event_bus.publish_event('user_created', {
'user_id': 1001,
'name': 'John Doe',
'email': 'john@example.com'
})
event_bus.publish_event('order_placed', {
'order_id': 'ORD-12345',
'user_id': 1001,
'total': 99.99
})
event_bus.publish_event('system_alert', {
'message': 'High memory usage detected',
'level': 'WARNING'
})
# Keep running to receive events
try:
time.sleep(10)
except KeyboardInterrupt:
pass
finally:
event_bus.stop()Install with Tessl CLI
npx tessl i tessl/pypi-redis