0
# Cluster Pub/Sub
1
2
Cluster-aware publish/subscribe functionality that provides Redis pub/sub operations within a Redis Cluster environment. The ClusterPubSub class extends the standard Redis PubSub functionality to work correctly with cluster topology and node routing.
3
4
## Capabilities
5
6
### ClusterPubSub Class
7
8
Cluster-aware pub/sub interface that handles channel subscriptions and message publishing across cluster nodes.
9
10
```python { .api }
11
class ClusterPubSub(PubSub):
12
def __init__(self, connection_pool, shard_hint=None, ignore_subscribe_messages=False):
13
"""
14
Initialize cluster pub/sub interface.
15
16
Parameters:
17
- connection_pool (ClusterConnectionPool): Connection pool instance
18
- shard_hint (str, optional): Hint for connection routing
19
- ignore_subscribe_messages (bool): Ignore subscription confirmation messages
20
"""
21
22
def execute_command(self, *args, **kwargs):
23
"""
24
Execute pub/sub command with cluster routing.
25
26
Parameters:
27
- *args: Command name and arguments
28
- **kwargs: Additional command options
29
30
Returns:
31
Any: Command response
32
"""
33
```
34
35
### Subscription Management
36
37
Methods for subscribing to channels and patterns in cluster environment.
38
39
```python { .api }
40
def subscribe(self, *args, **kwargs):
41
"""
42
Subscribe to one or more channels.
43
44
Parameters:
45
- *args: Channel names to subscribe to
46
- **kwargs: Additional subscription options
47
48
Returns:
49
None
50
"""
51
52
def psubscribe(self, *args, **kwargs):
53
"""
54
Subscribe to channels matching patterns.
55
56
Parameters:
57
- *args: Channel patterns to subscribe to
58
- **kwargs: Additional subscription options
59
60
Returns:
61
None
62
"""
63
64
def unsubscribe(self, *args):
65
"""
66
Unsubscribe from channels.
67
68
Parameters:
69
- *args: Channel names to unsubscribe from (all if none specified)
70
71
Returns:
72
None
73
"""
74
75
def punsubscribe(self, *args):
76
"""
77
Unsubscribe from channel patterns.
78
79
Parameters:
80
- *args: Channel patterns to unsubscribe from (all if none specified)
81
82
Returns:
83
None
84
"""
85
```
86
87
### Message Handling
88
89
Methods for receiving and processing messages from subscribed channels.
90
91
```python { .api }
92
def get_message(self, timeout=0, ignore_subscribe_messages=False):
93
"""
94
Get next message from subscribed channels.
95
96
Parameters:
97
- timeout (float): Timeout in seconds (0 for non-blocking, None for blocking)
98
- ignore_subscribe_messages (bool): Skip subscription confirmation messages
99
100
Returns:
101
dict|None: Message dictionary or None if no message available
102
"""
103
104
def listen(self):
105
"""
106
Generator that yields messages from subscribed channels.
107
108
Yields:
109
dict: Message dictionaries as they arrive
110
"""
111
```
112
113
### Connection Management
114
115
Methods for managing pub/sub connection lifecycle.
116
117
```python { .api }
118
def close(self):
119
"""
120
Close pub/sub connection and clean up resources.
121
"""
122
123
def reset(self):
124
"""
125
Reset pub/sub state and close existing connection.
126
"""
127
```
128
129
## Usage Examples
130
131
### Basic Pub/Sub Usage
132
133
```python
134
from rediscluster import RedisCluster
135
136
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
137
138
# Get pub/sub interface
139
pubsub = rc.pubsub()
140
141
# Subscribe to channels
142
pubsub.subscribe('news', 'updates', 'alerts')
143
144
# Listen for messages
145
for message in pubsub.listen():
146
if message['type'] == 'message':
147
channel = message['channel'].decode('utf-8')
148
data = message['data'].decode('utf-8')
149
print(f"Received on {channel}: {data}")
150
151
# Clean up
152
pubsub.close()
153
```
154
155
### Pattern Subscriptions
156
157
```python
158
# Subscribe to channel patterns
159
pubsub = rc.pubsub()
160
pubsub.psubscribe('news.*', 'events.*')
161
162
for message in pubsub.listen():
163
if message['type'] == 'pmessage':
164
pattern = message['pattern'].decode('utf-8')
165
channel = message['channel'].decode('utf-8')
166
data = message['data'].decode('utf-8')
167
print(f"Pattern {pattern} matched {channel}: {data}")
168
```
169
170
### Non-blocking Message Retrieval
171
172
```python
173
import time
174
175
pubsub = rc.pubsub()
176
pubsub.subscribe('status')
177
178
while True:
179
message = pubsub.get_message(timeout=1.0)
180
if message:
181
if message['type'] == 'message':
182
data = message['data'].decode('utf-8')
183
print(f"Status update: {data}")
184
else:
185
print("No message received, continuing...")
186
time.sleep(0.1)
187
```
188
189
### Publishing Messages
190
191
```python
192
# Publishing is done through the main client, not pub/sub interface
193
rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])
194
195
# Publish to channels
196
rc.publish('news', 'Breaking: Redis Cluster is awesome!')
197
rc.publish('updates', 'System maintenance scheduled for tonight')
198
rc.publish('alerts', 'High memory usage detected')
199
200
# Publishing returns number of subscribers that received the message
201
subscribers = rc.publish('notifications', 'Hello subscribers!')
202
print(f"Message delivered to {subscribers} subscribers")
203
```
204
205
### Context Manager Usage
206
207
```python
208
# Use context manager for automatic cleanup
209
with rc.pubsub() as pubsub:
210
pubsub.subscribe('commands')
211
212
for message in pubsub.listen():
213
if message['type'] == 'message':
214
command = message['data'].decode('utf-8')
215
print(f"Received command: {command}")
216
217
if command == 'quit':
218
break
219
# Connection automatically closed when exiting context
220
```
221
222
### Cluster-Specific Considerations
223
224
```python
225
# Pub/sub in cluster environment considerations
226
pubsub = rc.pubsub()
227
228
# Channels are bound to specific nodes based on channel name hash
229
# All subscribers to a channel connect to the same node
230
pubsub.subscribe('global-channel')
231
232
# Pattern subscriptions work across the cluster
233
pubsub.psubscribe('node-*')
234
235
# Publishing reaches all subscribers regardless of which node you publish to
236
rc.publish('global-channel', 'Message from any node')
237
238
# Get pub/sub info for monitoring
239
channels = rc.pubsub_channels()
240
print(f"Active channels: {channels}")
241
242
num_subscribers = rc.pubsub_numsub('global-channel')
243
print(f"Subscribers to global-channel: {num_subscribers}")
244
```