Python implementation of redis API, can be used for testing purposes
—
Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels. Pub/Sub provides a powerful message broadcasting system for real-time applications, event notifications, and decoupled communication between application components.
Functions for publishing messages to channels and managing message distribution.
def publish(self, channel: KeyT, message: EncodableT) -> int: ...
def spublish(self, channel: KeyT, message: EncodableT) -> int: ...
def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...
def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...
def pubsub_numpat(self) -> int: ...
def pubsub_shardchannels(self, pattern: str = "*") -> List[bytes]: ...
def pubsub_shardnumsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...Redis PubSub client for subscribing to channels and receiving messages.
class PubSub:
def subscribe(self, *args: ChannelT) -> None: ...
def psubscribe(self, *args: PatternT) -> None: ...
def ssubscribe(self, *args: ChannelT) -> None: ...
def unsubscribe(self, *args: ChannelT) -> None: ...
def punsubscribe(self, *args: PatternT) -> None: ...
def sunsubscribe(self, *args: ChannelT) -> None: ...
def get_message(
self,
ignore_subscribe_messages: bool = False,
timeout: float = 0.0
) -> Optional[Dict[str, Any]]: ...
def listen(self) -> Iterator[Dict[str, Any]]: ...
def get_sharded_message(
self,
ignore_subscribe_messages: bool = False,
timeout: float = 0.0
) -> Optional[Dict[str, Any]]: ...
def close(self) -> None: ...
def reset(self) -> None: ...Core message processing functions for PubSub operations.
def pubsub(self, **kwargs) -> PubSub: ...
# Message structure returned by get_message()
MessageType = Dict[str, Any]
# {
# 'type': str, # 'message', 'pmessage', 'subscribe', etc.
# 'channel': bytes, # Channel name
# 'pattern': bytes, # Pattern (for pattern subscriptions)
# 'data': Union[bytes, int] # Message data or subscription count
# }import fakeredis
import threading
import time
client = fakeredis.FakeRedis()
def publisher():
"""Publisher that sends messages every second"""
for i in range(5):
time.sleep(1)
subscribers = client.publish('news', f'Breaking news #{i}')
print(f"Published message {i} to {subscribers} subscribers")
def subscriber():
"""Subscriber that listens to messages"""
pubsub = client.pubsub()
pubsub.subscribe('news')
# Skip subscription confirmation message
confirmation = pubsub.get_message()
print(f"Subscribed: {confirmation}")
# Listen for actual messages
for i in range(5):
message = pubsub.get_message(timeout=2.0)
if message:
print(f"Received: {message['data'].decode()}")
else:
print("No message received")
break
pubsub.close()
# Start publisher and subscriber
pub_thread = threading.Thread(target=publisher)
sub_thread = threading.Thread(target=subscriber)
sub_thread.start()
time.sleep(0.5) # Give subscriber time to connect
pub_thread.start()
pub_thread.join()
sub_thread.join()import fakeredis
import threading
import time
client = fakeredis.FakeRedis()
def pattern_subscriber():
"""Subscriber using pattern matching"""
pubsub = client.pubsub()
# Subscribe to all channels starting with 'user:'
pubsub.psubscribe('user:*')
# Skip subscription confirmation
confirmation = pubsub.get_message()
print(f"Pattern subscribed: {confirmation}")
# Listen for pattern-matched messages
while True:
message = pubsub.get_message(timeout=1.0)
if message:
if message['type'] == 'pmessage':
pattern = message['pattern'].decode()
channel = message['channel'].decode()
data = message['data'].decode()
print(f"Pattern '{pattern}' matched channel '{channel}': {data}")
else:
break
pubsub.close()
# Start pattern subscriber
sub_thread = threading.Thread(target=pattern_subscriber)
sub_thread.start()
time.sleep(0.5) # Give subscriber time to connect
# Publish to various channels
client.publish('user:123', 'User 123 logged in')
client.publish('user:456', 'User 456 updated profile')
client.publish('system:alert', 'System alert') # Won't match pattern
client.publish('user:789', 'User 789 logged out')
time.sleep(2) # Allow messages to be processed
sub_thread.join()import fakeredis
import threading
import time
client = fakeredis.FakeRedis()
def create_subscriber(name, channels):
"""Create a subscriber for specific channels"""
def subscriber():
pubsub = client.pubsub()
pubsub.subscribe(*channels)
# Skip subscription confirmations
for _ in channels:
pubsub.get_message()
print(f"Subscriber {name} ready, listening to {channels}")
# Listen for messages
start_time = time.time()
while time.time() - start_time < 5: # Listen for 5 seconds
message = pubsub.get_message(timeout=0.1)
if message and message['type'] == 'message':
channel = message['channel'].decode()
data = message['data'].decode()
print(f"[{name}] {channel}: {data}")
pubsub.close()
print(f"Subscriber {name} finished")
return subscriber
# Create multiple subscribers
subscribers = [
threading.Thread(target=create_subscriber('News Reader', ['news', 'alerts'])),
threading.Thread(target=create_subscriber('Sports Fan', ['sports', 'alerts'])),
threading.Thread(target=create_subscriber('Tech Enthusiast', ['tech', 'alerts']))
]
# Start all subscribers
for sub in subscribers:
sub.start()
time.sleep(1) # Let subscribers connect
# Publish messages to different channels
messages = [
('news', 'Election results announced'),
('sports', 'Championship game tonight'),
('tech', 'New AI breakthrough'),
('alerts', 'System maintenance in 1 hour'), # All subscribers get this
('news', 'Weather update: sunny skies'),
('sports', 'Trade deadline approaching')
]
for channel, message in messages:
subscribers_count = client.publish(channel, message)
print(f"Published to {channel}: '{message}' ({subscribers_count} subscribers)")
time.sleep(0.5)
# Wait for all subscribers to finish
for sub in subscribers:
sub.join()import fakeredis
import time
client = fakeredis.FakeRedis()
# Create some subscribers
pubsub1 = client.pubsub()
pubsub2 = client.pubsub()
pubsub3 = client.pubsub()
# Subscribe to various channels
pubsub1.subscribe('news', 'sports')
pubsub2.subscribe('news', 'tech')
pubsub3.psubscribe('user:*', 'system:*')
# Skip subscription confirmations
for pubsub in [pubsub1, pubsub2, pubsub3]:
while True:
msg = pubsub.get_message(timeout=0.1)
if not msg:
break
# Check active channels
channels = client.pubsub_channels()
print(f"Active channels: {[ch.decode() for ch in channels]}")
# Check specific channel patterns
news_channels = client.pubsub_channels('news*')
print(f"News channels: {[ch.decode() for ch in news_channels]}")
# Check subscriber counts for specific channels
numsub = client.pubsub_numsub('news', 'sports', 'tech')
for channel, count in numsub:
print(f"Channel {channel.decode()}: {count} subscribers")
# Check pattern subscription count
pattern_count = client.pubsub_numpat()
print(f"Active pattern subscriptions: {pattern_count}")
# Test publishing and monitoring
print("\nPublishing test messages:")
for channel in ['news', 'sports', 'tech']:
count = client.publish(channel, f'Test message for {channel}')
print(f" {channel}: reached {count} subscribers")
# Cleanup
pubsub1.close()
pubsub2.close()
pubsub3.close()import fakeredis
# Create client with Redis 7.0+ for sharded pub/sub support
client = fakeredis.FakeRedis(version=(7, 0))
# Sharded pub/sub provides better performance for high-throughput scenarios
# by distributing channels across Redis cluster shards
def test_sharded_pubsub():
# Create sharded pub/sub clients
pubsub1 = client.pubsub()
pubsub2 = client.pubsub()
# Subscribe to sharded channels
pubsub1.ssubscribe('shard:1', 'shard:2')
pubsub2.ssubscribe('shard:2', 'shard:3')
# Skip subscription confirmations
for pubsub in [pubsub1, pubsub2]:
while True:
msg = pubsub.get_sharded_message(timeout=0.1)
if not msg:
break
print("Sharded subscribers ready")
# Publish to sharded channels
for i in range(3):
channel = f'shard:{i+1}'
count = client.spublish(channel, f'Sharded message {i+1}')
print(f"Published to {channel}: {count} subscribers")
# Read sharded messages
print("\nReceived messages:")
for name, pubsub in [('Sub1', pubsub1), ('Sub2', pubsub2)]:
for _ in range(3): # Try to read multiple messages
msg = pubsub.get_sharded_message(timeout=0.1)
if msg and msg['type'] == 'smessage':
channel = msg['channel'].decode()
data = msg['data'].decode()
print(f" [{name}] {channel}: {data}")
# Check sharded channel info
shard_channels = client.pubsub_shardchannels()
print(f"\nActive sharded channels: {[ch.decode() for ch in shard_channels]}")
shard_numsub = client.pubsub_shardnumsub('shard:1', 'shard:2', 'shard:3')
for channel, count in shard_numsub:
print(f"Shard {channel.decode()}: {count} subscribers")
# Cleanup
pubsub1.close()
pubsub2.close()
test_sharded_pubsub()import fakeredis
import time
import threading
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ResilientSubscriber:
def __init__(self, client, channels=None, patterns=None):
self.client = client
self.channels = channels or []
self.patterns = patterns or []
self.pubsub = None
self.running = False
def connect(self):
"""Establish pub/sub connection and subscriptions"""
self.pubsub = self.client.pubsub()
if self.channels:
self.pubsub.subscribe(*self.channels)
logger.info(f"Subscribed to channels: {self.channels}")
if self.patterns:
self.pubsub.psubscribe(*self.patterns)
logger.info(f"Subscribed to patterns: {self.patterns}")
# Skip subscription confirmation messages
expected_confirmations = len(self.channels) + len(self.patterns)
for _ in range(expected_confirmations):
self.pubsub.get_message(timeout=1.0)
def start(self):
"""Start listening for messages"""
self.running = True
self.connect()
logger.info("Starting message listener...")
while self.running:
try:
message = self.pubsub.get_message(timeout=1.0)
if message:
self.handle_message(message)
except Exception as e:
logger.error(f"Error receiving message: {e}")
# Attempt to reconnect
try:
self.pubsub.close()
time.sleep(1)
self.connect()
logger.info("Reconnected successfully")
except Exception as reconnect_error:
logger.error(f"Reconnection failed: {reconnect_error}")
time.sleep(5)
def handle_message(self, message):
"""Process received message"""
msg_type = message['type']
if msg_type == 'message':
channel = message['channel'].decode()
data = message['data'].decode()
logger.info(f"Channel message - {channel}: {data}")
elif msg_type == 'pmessage':
pattern = message['pattern'].decode()
channel = message['channel'].decode()
data = message['data'].decode()
logger.info(f"Pattern message - {pattern} -> {channel}: {data}")
def stop(self):
"""Stop the message listener"""
self.running = False
if self.pubsub:
self.pubsub.close()
logger.info("Message listener stopped")
# Usage example
client = fakeredis.FakeRedis()
# Create resilient subscriber
subscriber = ResilientSubscriber(
client,
channels=['notifications', 'alerts'],
patterns=['user:*', 'system:*']
)
# Start subscriber in background thread
sub_thread = threading.Thread(target=subscriber.start)
sub_thread.start()
time.sleep(1) # Let subscriber initialize
# Simulate message publishing
test_messages = [
('notifications', 'New notification available'),
('user:123', 'User 123 logged in'),
('alerts', 'Critical system alert'),
('system:backup', 'Backup completed successfully'),
('notifications', 'Another notification'),
('user:456', 'User 456 updated settings')
]
for channel, message in test_messages:
client.publish(channel, message)
time.sleep(0.5)
# Let messages be processed
time.sleep(2)
# Stop subscriber
subscriber.stop()
sub_thread.join()import fakeredis
import json
import time
import threading
from typing import Callable, Dict, Any
from dataclasses import dataclass
@dataclass
class Event:
type: str
data: Dict[str, Any]
timestamp: float
source: str = 'unknown'
class EventBus:
def __init__(self, client: fakeredis.FakeRedis):
self.client = client
self.handlers: Dict[str, list] = {}
self.running = False
self.pubsub = None
def register_handler(self, event_type: str, handler: Callable[[Event], None]):
"""Register an event handler for specific event type"""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
def publish_event(self, event: Event) -> int:
"""Publish an event to the event bus"""
channel = f"events:{event.type}"
event_data = {
'type': event.type,
'data': event.data,
'timestamp': event.timestamp,
'source': event.source
}
return self.client.publish(channel, json.dumps(event_data))
def start_listening(self):
"""Start listening for events"""
if not self.handlers:
return
self.running = True
self.pubsub = self.client.pubsub()
# Subscribe to all event types we have handlers for
channels = [f"events:{event_type}" for event_type in self.handlers.keys()]
self.pubsub.subscribe(*channels)
# Skip subscription confirmations
for _ in channels:
self.pubsub.get_message(timeout=1.0)
print(f"Event bus listening for: {list(self.handlers.keys())}")
while self.running:
message = self.pubsub.get_message(timeout=1.0)
if message and message['type'] == 'message':
try:
channel = message['channel'].decode()
event_type = channel.replace('events:', '')
event_data = json.loads(message['data'].decode())
event = Event(
type=event_data['type'],
data=event_data['data'],
timestamp=event_data['timestamp'],
source=event_data['source']
)
# Call all handlers for this event type
for handler in self.handlers.get(event_type, []):
try:
handler(event)
except Exception as e:
print(f"Error in event handler: {e}")
except Exception as e:
print(f"Error processing event: {e}")
def stop_listening(self):
"""Stop listening for events"""
self.running = False
if self.pubsub:
self.pubsub.close()
# Event handlers
def user_login_handler(event: Event):
print(f"User {event.data['user_id']} logged in at {event.timestamp}")
def user_logout_handler(event: Event):
print(f"User {event.data['user_id']} logged out")
def order_created_handler(event: Event):
print(f"Order {event.data['order_id']} created for ${event.data['amount']}")
def audit_handler(event: Event):
"""Generic audit handler that logs all events"""
print(f"AUDIT: {event.type} from {event.source} at {event.timestamp}")
# Usage example
client = fakeredis.FakeRedis()
event_bus = EventBus(client)
# Register event handlers
event_bus.register_handler('user_login', user_login_handler)
event_bus.register_handler('user_login', audit_handler) # Multiple handlers for same event
event_bus.register_handler('user_logout', user_logout_handler)
event_bus.register_handler('user_logout', audit_handler)
event_bus.register_handler('order_created', order_created_handler)
event_bus.register_handler('order_created', audit_handler)
# Start event bus in background
event_thread = threading.Thread(target=event_bus.start_listening)
event_thread.start()
time.sleep(1) # Let event bus initialize
# Publish events
events = [
Event('user_login', {'user_id': '123', 'ip': '192.168.1.1'}, time.time(), 'auth_service'),
Event('order_created', {'order_id': 'ORD001', 'amount': 99.99, 'user_id': '123'}, time.time(), 'order_service'),
Event('user_logout', {'user_id': '123'}, time.time(), 'auth_service'),
]
for event in events:
subscribers = event_bus.publish_event(event)
print(f"Published {event.type} to {subscribers} subscribers")
time.sleep(1)
# Let events be processed
time.sleep(2)
# Stop event bus
event_bus.stop_listening()
event_thread.join()import fakeredis
import json
import time
import threading
from enum import Enum
from dataclasses import dataclass, asdict
from typing import List, Optional
class NotificationPriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
@dataclass
class Notification:
id: str
user_id: str
title: str
message: str
priority: NotificationPriority
timestamp: float
read: bool = False
category: str = "general"
class NotificationService:
def __init__(self, client: fakeredis.FakeRedis):
self.client = client
def send_notification(self, notification: Notification):
"""Send notification to specific user"""
channel = f"notifications:user:{notification.user_id}"
data = asdict(notification)
data['priority'] = notification.priority.value # Serialize enum
return self.client.publish(channel, json.dumps(data))
def send_broadcast(self, title: str, message: str, priority: NotificationPriority = NotificationPriority.NORMAL):
"""Send broadcast notification to all users"""
notification_data = {
'id': f"broadcast_{int(time.time() * 1000)}",
'title': title,
'message': message,
'priority': priority.value,
'timestamp': time.time(),
'category': 'broadcast',
'read': False
}
return self.client.publish("notifications:broadcast", json.dumps(notification_data))
def send_admin_alert(self, message: str):
"""Send urgent alert to admin channels"""
alert_data = {
'id': f"alert_{int(time.time() * 1000)}",
'message': message,
'timestamp': time.time(),
'severity': 'urgent'
}
return self.client.publish("notifications:admin:alerts", json.dumps(alert_data))
class NotificationClient:
def __init__(self, client: fakeredis.FakeRedis, user_id: str):
self.client = client
self.user_id = user_id
self.pubsub = None
self.running = False
self.notifications: List[Notification] = []
def start_listening(self, include_broadcasts: bool = True, is_admin: bool = False):
"""Start listening for notifications"""
self.running = True
self.pubsub = self.client.pubsub()
# Subscribe to user-specific notifications
channels = [f"notifications:user:{self.user_id}"]
# Subscribe to broadcasts if requested
if include_broadcasts:
channels.append("notifications:broadcast")
# Subscribe to admin alerts if user is admin
if is_admin:
channels.append("notifications:admin:alerts")
self.pubsub.subscribe(*channels)
# Skip subscription confirmations
for _ in channels:
self.pubsub.get_message(timeout=1.0)
print(f"NotificationClient for user {self.user_id} listening on: {channels}")
while self.running:
message = self.pubsub.get_message(timeout=1.0)
if message and message['type'] == 'message':
self.handle_notification(message)
def handle_notification(self, message):
"""Process incoming notification"""
channel = message['channel'].decode()
data = json.loads(message['data'].decode())
if 'admin:alerts' in channel:
# Handle admin alert
print(f"🚨 ADMIN ALERT: {data['message']}")
elif 'broadcast' in channel:
# Handle broadcast notification
priority_emoji = self.get_priority_emoji(data['priority'])
print(f"{priority_emoji} BROADCAST: {data['title']} - {data['message']}")
else:
# Handle personal notification
notification = Notification(
id=data['id'],
user_id=data['user_id'],
title=data['title'],
message=data['message'],
priority=NotificationPriority(data['priority']),
timestamp=data['timestamp'],
read=data['read'],
category=data['category']
)
self.notifications.append(notification)
priority_emoji = self.get_priority_emoji(notification.priority.value)
print(f"{priority_emoji} {notification.title}: {notification.message}")
def get_priority_emoji(self, priority: str) -> str:
"""Get emoji for notification priority"""
emojis = {
'low': '🔵',
'normal': '🟢',
'high': '🟡',
'urgent': '🔴'
}
return emojis.get(priority, '⚪')
def get_unread_count(self) -> int:
"""Get count of unread notifications"""
return len([n for n in self.notifications if not n.read])
def mark_all_read(self):
"""Mark all notifications as read"""
for notification in self.notifications:
notification.read = True
def stop_listening(self):
"""Stop listening for notifications"""
self.running = False
if self.pubsub:
self.pubsub.close()
# Usage example
client = fakeredis.FakeRedis()
notification_service = NotificationService(client)
# Create notification clients for different users
users = ['user123', 'user456', 'admin789']
clients = {}
for user_id in users:
client_obj = NotificationClient(client, user_id)
clients[user_id] = client_obj
# Start listening (admin gets admin alerts)
is_admin = user_id.startswith('admin')
thread = threading.Thread(
target=client_obj.start_listening,
args=(True, is_admin)
)
thread.start()
time.sleep(1) # Let clients initialize
# Send various notifications
print("Sending notifications...")
# Personal notifications
notification_service.send_notification(Notification(
id="notif_001",
user_id="user123",
title="Welcome!",
message="Welcome to our platform",
priority=NotificationPriority.NORMAL,
timestamp=time.time()
))
notification_service.send_notification(Notification(
id="notif_002",
user_id="user456",
title="Payment Due",
message="Your payment is due in 3 days",
priority=NotificationPriority.HIGH,
timestamp=time.time()
))
time.sleep(1)
# Broadcast notifications
notification_service.send_broadcast(
"System Maintenance",
"Scheduled maintenance tonight from 2-4 AM",
NotificationPriority.HIGH
)
time.sleep(1)
# Admin alert
notification_service.send_admin_alert("High CPU usage detected on server cluster")
# Let notifications be processed
time.sleep(2)
# Show unread counts
for user_id, client_obj in clients.items():
unread = client_obj.get_unread_count()
print(f"User {user_id} has {unread} unread notifications")
# Stop all clients
for client_obj in clients.values():
client_obj.stop_listening()
time.sleep(1) # Allow threads to finishInstall with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs