CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis-py-cluster

Library for communicating with Redis Clusters. Built on top of redis-py lib

Pending
Overview
Eval results
Files

pubsub.mddocs/

Cluster Pub/Sub

Cluster-aware publish/subscribe functionality that provides Redis pub/sub operations within a Redis Cluster environment. The ClusterPubSub class extends the standard Redis PubSub functionality to work correctly with cluster topology and node routing.

Capabilities

ClusterPubSub Class

Cluster-aware pub/sub interface that handles channel subscriptions and message publishing across cluster nodes.

class ClusterPubSub(PubSub):
    def __init__(self, connection_pool, shard_hint=None, ignore_subscribe_messages=False):
        """
        Initialize cluster pub/sub interface.
        
        Parameters:
        - connection_pool (ClusterConnectionPool): Connection pool instance
        - shard_hint (str, optional): Hint for connection routing
        - ignore_subscribe_messages (bool): Ignore subscription confirmation messages
        """

    def execute_command(self, *args, **kwargs):
        """
        Execute pub/sub command with cluster routing.
        
        Parameters:
        - *args: Command name and arguments
        - **kwargs: Additional command options
        
        Returns:
        Any: Command response
        """

Subscription Management

Methods for subscribing to channels and patterns in cluster environment.

def subscribe(self, *args, **kwargs):
    """
    Subscribe to one or more channels.
    
    Parameters:
    - *args: Channel names to subscribe to
    - **kwargs: Additional subscription options
    
    Returns:
    None
    """

def psubscribe(self, *args, **kwargs):
    """
    Subscribe to channels matching patterns.
    
    Parameters:
    - *args: Channel patterns to subscribe to
    - **kwargs: Additional subscription options
    
    Returns:
    None
    """

def unsubscribe(self, *args):
    """
    Unsubscribe from channels.
    
    Parameters:
    - *args: Channel names to unsubscribe from (all if none specified)
    
    Returns:
    None
    """

def punsubscribe(self, *args):
    """
    Unsubscribe from channel patterns.
    
    Parameters:
    - *args: Channel patterns to unsubscribe from (all if none specified)
    
    Returns:
    None
    """

Message Handling

Methods for receiving and processing messages from subscribed channels.

def get_message(self, timeout=0, ignore_subscribe_messages=False):
    """
    Get next message from subscribed channels.
    
    Parameters:
    - timeout (float): Timeout in seconds (0 for non-blocking, None for blocking)
    - ignore_subscribe_messages (bool): Skip subscription confirmation messages
    
    Returns:
    dict|None: Message dictionary or None if no message available
    """

def listen(self):
    """
    Generator that yields messages from subscribed channels.
    
    Yields:
    dict: Message dictionaries as they arrive
    """

Connection Management

Methods for managing pub/sub connection lifecycle.

def close(self):
    """
    Close pub/sub connection and clean up resources.
    """

def reset(self):
    """
    Reset pub/sub state and close existing connection.
    """

Usage Examples

Basic Pub/Sub Usage

from rediscluster import RedisCluster

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

# Get pub/sub interface
pubsub = rc.pubsub()

# Subscribe to channels
pubsub.subscribe('news', 'updates', 'alerts')

# Listen for messages
for message in pubsub.listen():
    if message['type'] == 'message':
        channel = message['channel'].decode('utf-8')
        data = message['data'].decode('utf-8')
        print(f"Received on {channel}: {data}")

# Clean up
pubsub.close()

Pattern Subscriptions

# Subscribe to channel patterns
pubsub = rc.pubsub()
pubsub.psubscribe('news.*', 'events.*')

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        pattern = message['pattern'].decode('utf-8')
        channel = message['channel'].decode('utf-8')
        data = message['data'].decode('utf-8')
        print(f"Pattern {pattern} matched {channel}: {data}")

Non-blocking Message Retrieval

import time

pubsub = rc.pubsub()
pubsub.subscribe('status')

while True:
    message = pubsub.get_message(timeout=1.0)
    if message:
        if message['type'] == 'message':
            data = message['data'].decode('utf-8')
            print(f"Status update: {data}")
    else:
        print("No message received, continuing...")
        time.sleep(0.1)

Publishing Messages

# Publishing is done through the main client, not pub/sub interface
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

# Publish to channels
rc.publish('news', 'Breaking: Redis Cluster is awesome!')
rc.publish('updates', 'System maintenance scheduled for tonight')
rc.publish('alerts', 'High memory usage detected')

# Publishing returns number of subscribers that received the message
subscribers = rc.publish('notifications', 'Hello subscribers!')
print(f"Message delivered to {subscribers} subscribers")

Context Manager Usage

# Use context manager for automatic cleanup
with rc.pubsub() as pubsub:
    pubsub.subscribe('commands')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            command = message['data'].decode('utf-8')
            print(f"Received command: {command}")
            
            if command == 'quit':
                break
# Connection automatically closed when exiting context

Cluster-Specific Considerations

# Pub/sub in cluster environment considerations
pubsub = rc.pubsub()

# Channels are bound to specific nodes based on channel name hash
# All subscribers to a channel connect to the same node
pubsub.subscribe('global-channel')

# Pattern subscriptions work across the cluster
pubsub.psubscribe('node-*')

# Publishing reaches all subscribers regardless of which node you publish to
rc.publish('global-channel', 'Message from any node')

# Get pub/sub info for monitoring
channels = rc.pubsub_channels()
print(f"Active channels: {channels}")

num_subscribers = rc.pubsub_numsub('global-channel')
print(f"Subscribers to global-channel: {num_subscribers}")

Install with Tessl CLI

npx tessl i tessl/pypi-redis-py-cluster

docs

client.md

connections.md

exceptions.md

index.md

pipeline.md

pubsub.md

tile.json