Library for communicating with Redis Clusters. Built on top of redis-py lib
—
Cluster-aware publish/subscribe functionality that provides Redis pub/sub operations within a Redis Cluster environment. The ClusterPubSub class extends the standard Redis PubSub functionality to work correctly with cluster topology and node routing.
Cluster-aware pub/sub interface that handles channel subscriptions and message publishing across cluster nodes.
class ClusterPubSub(PubSub):
def __init__(self, connection_pool, shard_hint=None, ignore_subscribe_messages=False):
"""
Initialize cluster pub/sub interface.
Parameters:
- connection_pool (ClusterConnectionPool): Connection pool instance
- shard_hint (str, optional): Hint for connection routing
- ignore_subscribe_messages (bool): Ignore subscription confirmation messages
"""
def execute_command(self, *args, **kwargs):
"""
Execute pub/sub command with cluster routing.
Parameters:
- *args: Command name and arguments
- **kwargs: Additional command options
Returns:
Any: Command response
"""Methods for subscribing to channels and patterns in cluster environment.
def subscribe(self, *args, **kwargs):
"""
Subscribe to one or more channels.
Parameters:
- *args: Channel names to subscribe to
- **kwargs: Additional subscription options
Returns:
None
"""
def psubscribe(self, *args, **kwargs):
"""
Subscribe to channels matching patterns.
Parameters:
- *args: Channel patterns to subscribe to
- **kwargs: Additional subscription options
Returns:
None
"""
def unsubscribe(self, *args):
"""
Unsubscribe from channels.
Parameters:
- *args: Channel names to unsubscribe from (all if none specified)
Returns:
None
"""
def punsubscribe(self, *args):
"""
Unsubscribe from channel patterns.
Parameters:
- *args: Channel patterns to unsubscribe from (all if none specified)
Returns:
None
"""Methods for receiving and processing messages from subscribed channels.
def get_message(self, timeout=0, ignore_subscribe_messages=False):
"""
Get next message from subscribed channels.
Parameters:
- timeout (float): Timeout in seconds (0 for non-blocking, None for blocking)
- ignore_subscribe_messages (bool): Skip subscription confirmation messages
Returns:
dict|None: Message dictionary or None if no message available
"""
def listen(self):
"""
Generator that yields messages from subscribed channels.
Yields:
dict: Message dictionaries as they arrive
"""Methods for managing pub/sub connection lifecycle.
def close(self):
"""
Close pub/sub connection and clean up resources.
"""
def reset(self):
"""
Reset pub/sub state and close existing connection.
"""from rediscluster import RedisCluster
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
# Get pub/sub interface
pubsub = rc.pubsub()
# Subscribe to channels
pubsub.subscribe('news', 'updates', 'alerts')
# Listen for messages
for message in pubsub.listen():
if message['type'] == 'message':
channel = message['channel'].decode('utf-8')
data = message['data'].decode('utf-8')
print(f"Received on {channel}: {data}")
# Clean up
pubsub.close()# Subscribe to channel patterns
pubsub = rc.pubsub()
pubsub.psubscribe('news.*', 'events.*')
for message in pubsub.listen():
if message['type'] == 'pmessage':
pattern = message['pattern'].decode('utf-8')
channel = message['channel'].decode('utf-8')
data = message['data'].decode('utf-8')
print(f"Pattern {pattern} matched {channel}: {data}")import time
pubsub = rc.pubsub()
pubsub.subscribe('status')
while True:
message = pubsub.get_message(timeout=1.0)
if message:
if message['type'] == 'message':
data = message['data'].decode('utf-8')
print(f"Status update: {data}")
else:
print("No message received, continuing...")
time.sleep(0.1)# Publishing is done through the main client, not pub/sub interface
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
# Publish to channels
rc.publish('news', 'Breaking: Redis Cluster is awesome!')
rc.publish('updates', 'System maintenance scheduled for tonight')
rc.publish('alerts', 'High memory usage detected')
# Publishing returns number of subscribers that received the message
subscribers = rc.publish('notifications', 'Hello subscribers!')
print(f"Message delivered to {subscribers} subscribers")# Use context manager for automatic cleanup
with rc.pubsub() as pubsub:
pubsub.subscribe('commands')
for message in pubsub.listen():
if message['type'] == 'message':
command = message['data'].decode('utf-8')
print(f"Received command: {command}")
if command == 'quit':
break
# Connection automatically closed when exiting context# Pub/sub in cluster environment considerations
pubsub = rc.pubsub()
# Channels are bound to specific nodes based on channel name hash
# All subscribers to a channel connect to the same node
pubsub.subscribe('global-channel')
# Pattern subscriptions work across the cluster
pubsub.psubscribe('node-*')
# Publishing reaches all subscribers regardless of which node you publish to
rc.publish('global-channel', 'Message from any node')
# Get pub/sub info for monitoring
channels = rc.pubsub_channels()
print(f"Active channels: {channels}")
num_subscribers = rc.pubsub_numsub('global-channel')
print(f"Subscribers to global-channel: {num_subscribers}")Install with Tessl CLI
npx tessl i tessl/pypi-redis-py-cluster