CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

pubsub-operations.mddocs/

Pub/Sub Operations

Redis publish/subscribe messaging with support for channels, pattern subscriptions, and shard channels. Pub/Sub provides a powerful message broadcasting system for real-time applications, event notifications, and decoupled communication between application components.

Capabilities

Publishing Messages

Functions for publishing messages to channels and managing message distribution.

def publish(self, channel: KeyT, message: EncodableT) -> int: ...

def spublish(self, channel: KeyT, message: EncodableT) -> int: ...

def pubsub_channels(self, pattern: str = "*") -> List[bytes]: ...

def pubsub_numsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...

def pubsub_numpat(self) -> int: ...

def pubsub_shardchannels(self, pattern: str = "*") -> List[bytes]: ...

def pubsub_shardnumsub(self, *args: KeyT) -> List[Tuple[bytes, int]]: ...

PubSub Client

Redis PubSub client for subscribing to channels and receiving messages.

class PubSub:
    def subscribe(self, *args: ChannelT) -> None: ...
    
    def psubscribe(self, *args: PatternT) -> None: ...
    
    def ssubscribe(self, *args: ChannelT) -> None: ...
    
    def unsubscribe(self, *args: ChannelT) -> None: ...
    
    def punsubscribe(self, *args: PatternT) -> None: ...
    
    def sunsubscribe(self, *args: ChannelT) -> None: ...
    
    def get_message(
        self, 
        ignore_subscribe_messages: bool = False, 
        timeout: float = 0.0
    ) -> Optional[Dict[str, Any]]: ...
    
    def listen(self) -> Iterator[Dict[str, Any]]: ...
    
    def get_sharded_message(
        self,
        ignore_subscribe_messages: bool = False,
        timeout: float = 0.0
    ) -> Optional[Dict[str, Any]]: ...
    
    def close(self) -> None: ...
    
    def reset(self) -> None: ...

Message Handling

Core message processing functions for PubSub operations.

def pubsub(self, **kwargs) -> PubSub: ...

# Message structure returned by get_message()
MessageType = Dict[str, Any]
# {
#     'type': str,           # 'message', 'pmessage', 'subscribe', etc.
#     'channel': bytes,      # Channel name
#     'pattern': bytes,      # Pattern (for pattern subscriptions)
#     'data': Union[bytes, int]  # Message data or subscription count
# }

Usage Examples

Basic Publishing and Subscribing

import fakeredis
import threading
import time

client = fakeredis.FakeRedis()

def publisher():
    """Publisher that sends messages every second"""
    for i in range(5):
        time.sleep(1)
        subscribers = client.publish('news', f'Breaking news #{i}')
        print(f"Published message {i} to {subscribers} subscribers")

def subscriber():
    """Subscriber that listens to messages"""
    pubsub = client.pubsub()
    pubsub.subscribe('news')
    
    # Skip subscription confirmation message
    confirmation = pubsub.get_message()
    print(f"Subscribed: {confirmation}")
    
    # Listen for actual messages
    for i in range(5):
        message = pubsub.get_message(timeout=2.0)
        if message:
            print(f"Received: {message['data'].decode()}")
        else:
            print("No message received")
            break
    
    pubsub.close()

# Start publisher and subscriber
pub_thread = threading.Thread(target=publisher)
sub_thread = threading.Thread(target=subscriber)

sub_thread.start()
time.sleep(0.5)  # Give subscriber time to connect
pub_thread.start()

pub_thread.join()
sub_thread.join()

Pattern Subscriptions

import fakeredis
import threading
import time

client = fakeredis.FakeRedis()

def pattern_subscriber():
    """Subscriber using pattern matching"""
    pubsub = client.pubsub()
    
    # Subscribe to all channels starting with 'user:'
    pubsub.psubscribe('user:*')
    
    # Skip subscription confirmation
    confirmation = pubsub.get_message()
    print(f"Pattern subscribed: {confirmation}")
    
    # Listen for pattern-matched messages
    while True:
        message = pubsub.get_message(timeout=1.0)
        if message:
            if message['type'] == 'pmessage':
                pattern = message['pattern'].decode()
                channel = message['channel'].decode()
                data = message['data'].decode()
                print(f"Pattern '{pattern}' matched channel '{channel}': {data}")
        else:
            break
    
    pubsub.close()

# Start pattern subscriber
sub_thread = threading.Thread(target=pattern_subscriber)
sub_thread.start()

time.sleep(0.5)  # Give subscriber time to connect

# Publish to various channels
client.publish('user:123', 'User 123 logged in')
client.publish('user:456', 'User 456 updated profile')
client.publish('system:alert', 'System alert')  # Won't match pattern
client.publish('user:789', 'User 789 logged out')

time.sleep(2)  # Allow messages to be processed
sub_thread.join()

Multiple Subscribers

import fakeredis
import threading
import time

client = fakeredis.FakeRedis()

def create_subscriber(name, channels):
    """Create a subscriber for specific channels"""
    def subscriber():
        pubsub = client.pubsub()
        pubsub.subscribe(*channels)
        
        # Skip subscription confirmations
        for _ in channels:
            pubsub.get_message()
        
        print(f"Subscriber {name} ready, listening to {channels}")
        
        # Listen for messages
        start_time = time.time()
        while time.time() - start_time < 5:  # Listen for 5 seconds
            message = pubsub.get_message(timeout=0.1)
            if message and message['type'] == 'message':
                channel = message['channel'].decode()
                data = message['data'].decode()
                print(f"[{name}] {channel}: {data}")
        
        pubsub.close()
        print(f"Subscriber {name} finished")
    
    return subscriber

# Create multiple subscribers
subscribers = [
    threading.Thread(target=create_subscriber('News Reader', ['news', 'alerts'])),
    threading.Thread(target=create_subscriber('Sports Fan', ['sports', 'alerts'])),
    threading.Thread(target=create_subscriber('Tech Enthusiast', ['tech', 'alerts']))
]

# Start all subscribers
for sub in subscribers:
    sub.start()

time.sleep(1)  # Let subscribers connect

# Publish messages to different channels
messages = [
    ('news', 'Election results announced'),
    ('sports', 'Championship game tonight'),
    ('tech', 'New AI breakthrough'),
    ('alerts', 'System maintenance in 1 hour'),  # All subscribers get this
    ('news', 'Weather update: sunny skies'),
    ('sports', 'Trade deadline approaching')
]

for channel, message in messages:
    subscribers_count = client.publish(channel, message)
    print(f"Published to {channel}: '{message}' ({subscribers_count} subscribers)")
    time.sleep(0.5)

# Wait for all subscribers to finish
for sub in subscribers:
    sub.join()

Channel Information and Monitoring

import fakeredis
import time

client = fakeredis.FakeRedis()

# Create some subscribers
pubsub1 = client.pubsub()
pubsub2 = client.pubsub()
pubsub3 = client.pubsub()

# Subscribe to various channels
pubsub1.subscribe('news', 'sports')
pubsub2.subscribe('news', 'tech')
pubsub3.psubscribe('user:*', 'system:*')

# Skip subscription confirmations
for pubsub in [pubsub1, pubsub2, pubsub3]:
    while True:
        msg = pubsub.get_message(timeout=0.1)
        if not msg:
            break

# Check active channels
channels = client.pubsub_channels()
print(f"Active channels: {[ch.decode() for ch in channels]}")

# Check specific channel patterns
news_channels = client.pubsub_channels('news*')
print(f"News channels: {[ch.decode() for ch in news_channels]}")

# Check subscriber counts for specific channels
numsub = client.pubsub_numsub('news', 'sports', 'tech')
for channel, count in numsub:
    print(f"Channel {channel.decode()}: {count} subscribers")

# Check pattern subscription count
pattern_count = client.pubsub_numpat()
print(f"Active pattern subscriptions: {pattern_count}")

# Test publishing and monitoring
print("\nPublishing test messages:")
for channel in ['news', 'sports', 'tech']:
    count = client.publish(channel, f'Test message for {channel}')
    print(f"  {channel}: reached {count} subscribers")

# Cleanup
pubsub1.close()
pubsub2.close()
pubsub3.close()

Sharded Pub/Sub (Redis 7.0+)

import fakeredis

# Create client with Redis 7.0+ for sharded pub/sub support
client = fakeredis.FakeRedis(version=(7, 0))

# Sharded pub/sub provides better performance for high-throughput scenarios
# by distributing channels across Redis cluster shards

def test_sharded_pubsub():
    # Create sharded pub/sub clients
    pubsub1 = client.pubsub()
    pubsub2 = client.pubsub()
    
    # Subscribe to sharded channels
    pubsub1.ssubscribe('shard:1', 'shard:2')
    pubsub2.ssubscribe('shard:2', 'shard:3')
    
    # Skip subscription confirmations
    for pubsub in [pubsub1, pubsub2]:
        while True:
            msg = pubsub.get_sharded_message(timeout=0.1)
            if not msg:
                break
    
    print("Sharded subscribers ready")
    
    # Publish to sharded channels
    for i in range(3):
        channel = f'shard:{i+1}'
        count = client.spublish(channel, f'Sharded message {i+1}')
        print(f"Published to {channel}: {count} subscribers")
    
    # Read sharded messages
    print("\nReceived messages:")
    for name, pubsub in [('Sub1', pubsub1), ('Sub2', pubsub2)]:
        for _ in range(3):  # Try to read multiple messages
            msg = pubsub.get_sharded_message(timeout=0.1)
            if msg and msg['type'] == 'smessage':
                channel = msg['channel'].decode()
                data = msg['data'].decode()
                print(f"  [{name}] {channel}: {data}")
    
    # Check sharded channel info
    shard_channels = client.pubsub_shardchannels()
    print(f"\nActive sharded channels: {[ch.decode() for ch in shard_channels]}")
    
    shard_numsub = client.pubsub_shardnumsub('shard:1', 'shard:2', 'shard:3')
    for channel, count in shard_numsub:
        print(f"Shard {channel.decode()}: {count} subscribers")
    
    # Cleanup
    pubsub1.close()
    pubsub2.close()

test_sharded_pubsub()

Message Listener with Automatic Reconnection

import fakeredis
import time
import threading
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ResilientSubscriber:
    def __init__(self, client, channels=None, patterns=None):
        self.client = client
        self.channels = channels or []
        self.patterns = patterns or []
        self.pubsub = None
        self.running = False
        
    def connect(self):
        """Establish pub/sub connection and subscriptions"""
        self.pubsub = self.client.pubsub()
        
        if self.channels:
            self.pubsub.subscribe(*self.channels)
            logger.info(f"Subscribed to channels: {self.channels}")
            
        if self.patterns:
            self.pubsub.psubscribe(*self.patterns)
            logger.info(f"Subscribed to patterns: {self.patterns}")
        
        # Skip subscription confirmation messages
        expected_confirmations = len(self.channels) + len(self.patterns)
        for _ in range(expected_confirmations):
            self.pubsub.get_message(timeout=1.0)
    
    def start(self):
        """Start listening for messages"""
        self.running = True
        self.connect()
        
        logger.info("Starting message listener...")
        
        while self.running:
            try:
                message = self.pubsub.get_message(timeout=1.0)
                if message:
                    self.handle_message(message)
                    
            except Exception as e:
                logger.error(f"Error receiving message: {e}")
                # Attempt to reconnect
                try:
                    self.pubsub.close()
                    time.sleep(1)
                    self.connect()
                    logger.info("Reconnected successfully")
                except Exception as reconnect_error:
                    logger.error(f"Reconnection failed: {reconnect_error}")
                    time.sleep(5)
    
    def handle_message(self, message):
        """Process received message"""
        msg_type = message['type']
        
        if msg_type == 'message':
            channel = message['channel'].decode()
            data = message['data'].decode()
            logger.info(f"Channel message - {channel}: {data}")
            
        elif msg_type == 'pmessage':
            pattern = message['pattern'].decode()
            channel = message['channel'].decode()
            data = message['data'].decode()
            logger.info(f"Pattern message - {pattern} -> {channel}: {data}")
    
    def stop(self):
        """Stop the message listener"""
        self.running = False
        if self.pubsub:
            self.pubsub.close()
        logger.info("Message listener stopped")

# Usage example
client = fakeredis.FakeRedis()

# Create resilient subscriber
subscriber = ResilientSubscriber(
    client,
    channels=['notifications', 'alerts'],
    patterns=['user:*', 'system:*']
)

# Start subscriber in background thread
sub_thread = threading.Thread(target=subscriber.start)
sub_thread.start()

time.sleep(1)  # Let subscriber initialize

# Simulate message publishing
test_messages = [
    ('notifications', 'New notification available'),
    ('user:123', 'User 123 logged in'),
    ('alerts', 'Critical system alert'),
    ('system:backup', 'Backup completed successfully'),
    ('notifications', 'Another notification'),
    ('user:456', 'User 456 updated settings')
]

for channel, message in test_messages:
    client.publish(channel, message)
    time.sleep(0.5)

# Let messages be processed
time.sleep(2)

# Stop subscriber
subscriber.stop()
sub_thread.join()

Pattern: Event Bus

import fakeredis
import json
import time
import threading
from typing import Callable, Dict, Any
from dataclasses import dataclass

@dataclass
class Event:
    type: str
    data: Dict[str, Any]
    timestamp: float
    source: str = 'unknown'

class EventBus:
    def __init__(self, client: fakeredis.FakeRedis):
        self.client = client
        self.handlers: Dict[str, list] = {}
        self.running = False
        self.pubsub = None
        
    def register_handler(self, event_type: str, handler: Callable[[Event], None]):
        """Register an event handler for specific event type"""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
        
    def publish_event(self, event: Event) -> int:
        """Publish an event to the event bus"""
        channel = f"events:{event.type}"
        event_data = {
            'type': event.type,
            'data': event.data,
            'timestamp': event.timestamp,
            'source': event.source
        }
        return self.client.publish(channel, json.dumps(event_data))
    
    def start_listening(self):
        """Start listening for events"""
        if not self.handlers:
            return
            
        self.running = True
        self.pubsub = self.client.pubsub()
        
        # Subscribe to all event types we have handlers for
        channels = [f"events:{event_type}" for event_type in self.handlers.keys()]
        self.pubsub.subscribe(*channels)
        
        # Skip subscription confirmations
        for _ in channels:
            self.pubsub.get_message(timeout=1.0)
        
        print(f"Event bus listening for: {list(self.handlers.keys())}")
        
        while self.running:
            message = self.pubsub.get_message(timeout=1.0)
            if message and message['type'] == 'message':
                try:
                    channel = message['channel'].decode()
                    event_type = channel.replace('events:', '')
                    event_data = json.loads(message['data'].decode())
                    
                    event = Event(
                        type=event_data['type'],
                        data=event_data['data'],
                        timestamp=event_data['timestamp'],
                        source=event_data['source']
                    )
                    
                    # Call all handlers for this event type
                    for handler in self.handlers.get(event_type, []):
                        try:
                            handler(event)
                        except Exception as e:
                            print(f"Error in event handler: {e}")
                            
                except Exception as e:
                    print(f"Error processing event: {e}")
    
    def stop_listening(self):
        """Stop listening for events"""
        self.running = False
        if self.pubsub:
            self.pubsub.close()

# Event handlers
def user_login_handler(event: Event):
    print(f"User {event.data['user_id']} logged in at {event.timestamp}")

def user_logout_handler(event: Event):
    print(f"User {event.data['user_id']} logged out")

def order_created_handler(event: Event):
    print(f"Order {event.data['order_id']} created for ${event.data['amount']}")

def audit_handler(event: Event):
    """Generic audit handler that logs all events"""
    print(f"AUDIT: {event.type} from {event.source} at {event.timestamp}")

# Usage example
client = fakeredis.FakeRedis()
event_bus = EventBus(client)

# Register event handlers
event_bus.register_handler('user_login', user_login_handler)
event_bus.register_handler('user_login', audit_handler)  # Multiple handlers for same event
event_bus.register_handler('user_logout', user_logout_handler)
event_bus.register_handler('user_logout', audit_handler)
event_bus.register_handler('order_created', order_created_handler)
event_bus.register_handler('order_created', audit_handler)

# Start event bus in background
event_thread = threading.Thread(target=event_bus.start_listening)
event_thread.start()

time.sleep(1)  # Let event bus initialize

# Publish events
events = [
    Event('user_login', {'user_id': '123', 'ip': '192.168.1.1'}, time.time(), 'auth_service'),
    Event('order_created', {'order_id': 'ORD001', 'amount': 99.99, 'user_id': '123'}, time.time(), 'order_service'),
    Event('user_logout', {'user_id': '123'}, time.time(), 'auth_service'),
]

for event in events:
    subscribers = event_bus.publish_event(event)
    print(f"Published {event.type} to {subscribers} subscribers")
    time.sleep(1)

# Let events be processed
time.sleep(2)

# Stop event bus
event_bus.stop_listening()
event_thread.join()

Pattern: Real-time Notifications

import fakeredis
import json
import time
import threading
from enum import Enum
from dataclasses import dataclass, asdict
from typing import List, Optional

class NotificationPriority(Enum):
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    URGENT = "urgent"

@dataclass
class Notification:
    id: str
    user_id: str
    title: str
    message: str
    priority: NotificationPriority
    timestamp: float
    read: bool = False
    category: str = "general"

class NotificationService:
    def __init__(self, client: fakeredis.FakeRedis):
        self.client = client
        
    def send_notification(self, notification: Notification):
        """Send notification to specific user"""
        channel = f"notifications:user:{notification.user_id}"
        data = asdict(notification)
        data['priority'] = notification.priority.value  # Serialize enum
        
        return self.client.publish(channel, json.dumps(data))
    
    def send_broadcast(self, title: str, message: str, priority: NotificationPriority = NotificationPriority.NORMAL):
        """Send broadcast notification to all users"""
        notification_data = {
            'id': f"broadcast_{int(time.time() * 1000)}",
            'title': title,
            'message': message,
            'priority': priority.value,
            'timestamp': time.time(),
            'category': 'broadcast',
            'read': False
        }
        
        return self.client.publish("notifications:broadcast", json.dumps(notification_data))
    
    def send_admin_alert(self, message: str):
        """Send urgent alert to admin channels"""
        alert_data = {
            'id': f"alert_{int(time.time() * 1000)}",
            'message': message,
            'timestamp': time.time(),
            'severity': 'urgent'
        }
        
        return self.client.publish("notifications:admin:alerts", json.dumps(alert_data))

class NotificationClient:
    def __init__(self, client: fakeredis.FakeRedis, user_id: str):
        self.client = client
        self.user_id = user_id
        self.pubsub = None
        self.running = False
        self.notifications: List[Notification] = []
        
    def start_listening(self, include_broadcasts: bool = True, is_admin: bool = False):
        """Start listening for notifications"""
        self.running = True
        self.pubsub = self.client.pubsub()
        
        # Subscribe to user-specific notifications
        channels = [f"notifications:user:{self.user_id}"]
        
        # Subscribe to broadcasts if requested
        if include_broadcasts:
            channels.append("notifications:broadcast")
        
        # Subscribe to admin alerts if user is admin
        if is_admin:
            channels.append("notifications:admin:alerts")
        
        self.pubsub.subscribe(*channels)
        
        # Skip subscription confirmations
        for _ in channels:
            self.pubsub.get_message(timeout=1.0)
        
        print(f"NotificationClient for user {self.user_id} listening on: {channels}")
        
        while self.running:
            message = self.pubsub.get_message(timeout=1.0)
            if message and message['type'] == 'message':
                self.handle_notification(message)
    
    def handle_notification(self, message):
        """Process incoming notification"""
        channel = message['channel'].decode()
        data = json.loads(message['data'].decode())
        
        if 'admin:alerts' in channel:
            # Handle admin alert
            print(f"🚨 ADMIN ALERT: {data['message']}")
            
        elif 'broadcast' in channel:
            # Handle broadcast notification
            priority_emoji = self.get_priority_emoji(data['priority'])
            print(f"{priority_emoji} BROADCAST: {data['title']} - {data['message']}")
            
        else:
            # Handle personal notification
            notification = Notification(
                id=data['id'],
                user_id=data['user_id'],
                title=data['title'],
                message=data['message'],
                priority=NotificationPriority(data['priority']),
                timestamp=data['timestamp'],
                read=data['read'],
                category=data['category']
            )
            
            self.notifications.append(notification)
            priority_emoji = self.get_priority_emoji(notification.priority.value)
            print(f"{priority_emoji} {notification.title}: {notification.message}")
    
    def get_priority_emoji(self, priority: str) -> str:
        """Get emoji for notification priority"""
        emojis = {
            'low': '🔵',
            'normal': '🟢', 
            'high': '🟡',
            'urgent': '🔴'
        }
        return emojis.get(priority, '⚪')
    
    def get_unread_count(self) -> int:
        """Get count of unread notifications"""
        return len([n for n in self.notifications if not n.read])
    
    def mark_all_read(self):
        """Mark all notifications as read"""
        for notification in self.notifications:
            notification.read = True
    
    def stop_listening(self):
        """Stop listening for notifications"""
        self.running = False
        if self.pubsub:
            self.pubsub.close()

# Usage example
client = fakeredis.FakeRedis()
notification_service = NotificationService(client)

# Create notification clients for different users
users = ['user123', 'user456', 'admin789']
clients = {}

for user_id in users:
    client_obj = NotificationClient(client, user_id)
    clients[user_id] = client_obj
    
    # Start listening (admin gets admin alerts)
    is_admin = user_id.startswith('admin')
    thread = threading.Thread(
        target=client_obj.start_listening,
        args=(True, is_admin)
    )
    thread.start()

time.sleep(1)  # Let clients initialize

# Send various notifications
print("Sending notifications...")

# Personal notifications
notification_service.send_notification(Notification(
    id="notif_001",
    user_id="user123",
    title="Welcome!",
    message="Welcome to our platform",
    priority=NotificationPriority.NORMAL,
    timestamp=time.time()
))

notification_service.send_notification(Notification(
    id="notif_002", 
    user_id="user456",
    title="Payment Due",
    message="Your payment is due in 3 days",
    priority=NotificationPriority.HIGH,
    timestamp=time.time()
))

time.sleep(1)

# Broadcast notifications
notification_service.send_broadcast(
    "System Maintenance",
    "Scheduled maintenance tonight from 2-4 AM",
    NotificationPriority.HIGH
)

time.sleep(1)

# Admin alert
notification_service.send_admin_alert("High CPU usage detected on server cluster")

# Let notifications be processed
time.sleep(2)

# Show unread counts
for user_id, client_obj in clients.items():
    unread = client_obj.get_unread_count()
    print(f"User {user_id} has {unread} unread notifications")

# Stop all clients
for client_obj in clients.values():
    client_obj.stop_listening()

time.sleep(1)  # Allow threads to finish

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json