or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconnections.mdexceptions.mdindex.mdpipeline.mdpubsub.md

pubsub.mddocs/

0

# Cluster Pub/Sub

1

2

Cluster-aware publish/subscribe functionality that provides Redis pub/sub operations within a Redis Cluster environment. The ClusterPubSub class extends the standard Redis PubSub functionality to work correctly with cluster topology and node routing.

3

4

## Capabilities

5

6

### ClusterPubSub Class

7

8

Cluster-aware pub/sub interface that handles channel subscriptions and message publishing across cluster nodes.

9

10

```python { .api }

11

class ClusterPubSub(PubSub):

12

def __init__(self, connection_pool, shard_hint=None, ignore_subscribe_messages=False):

13

"""

14

Initialize cluster pub/sub interface.

15

16

Parameters:

17

- connection_pool (ClusterConnectionPool): Connection pool instance

18

- shard_hint (str, optional): Hint for connection routing

19

- ignore_subscribe_messages (bool): Ignore subscription confirmation messages

20

"""

21

22

def execute_command(self, *args, **kwargs):

23

"""

24

Execute pub/sub command with cluster routing.

25

26

Parameters:

27

- *args: Command name and arguments

28

- **kwargs: Additional command options

29

30

Returns:

31

Any: Command response

32

"""

33

```

34

35

### Subscription Management

36

37

Methods for subscribing to channels and patterns in cluster environment.

38

39

```python { .api }

40

def subscribe(self, *args, **kwargs):

41

"""

42

Subscribe to one or more channels.

43

44

Parameters:

45

- *args: Channel names to subscribe to

46

- **kwargs: Additional subscription options

47

48

Returns:

49

None

50

"""

51

52

def psubscribe(self, *args, **kwargs):

53

"""

54

Subscribe to channels matching patterns.

55

56

Parameters:

57

- *args: Channel patterns to subscribe to

58

- **kwargs: Additional subscription options

59

60

Returns:

61

None

62

"""

63

64

def unsubscribe(self, *args):

65

"""

66

Unsubscribe from channels.

67

68

Parameters:

69

- *args: Channel names to unsubscribe from (all if none specified)

70

71

Returns:

72

None

73

"""

74

75

def punsubscribe(self, *args):

76

"""

77

Unsubscribe from channel patterns.

78

79

Parameters:

80

- *args: Channel patterns to unsubscribe from (all if none specified)

81

82

Returns:

83

None

84

"""

85

```

86

87

### Message Handling

88

89

Methods for receiving and processing messages from subscribed channels.

90

91

```python { .api }

92

def get_message(self, timeout=0, ignore_subscribe_messages=False):

93

"""

94

Get next message from subscribed channels.

95

96

Parameters:

97

- timeout (float): Timeout in seconds (0 for non-blocking, None for blocking)

98

- ignore_subscribe_messages (bool): Skip subscription confirmation messages

99

100

Returns:

101

dict|None: Message dictionary or None if no message available

102

"""

103

104

def listen(self):

105

"""

106

Generator that yields messages from subscribed channels.

107

108

Yields:

109

dict: Message dictionaries as they arrive

110

"""

111

```

112

113

### Connection Management

114

115

Methods for managing pub/sub connection lifecycle.

116

117

```python { .api }

118

def close(self):

119

"""

120

Close pub/sub connection and clean up resources.

121

"""

122

123

def reset(self):

124

"""

125

Reset pub/sub state and close existing connection.

126

"""

127

```

128

129

## Usage Examples

130

131

### Basic Pub/Sub Usage

132

133

```python

134

from rediscluster import RedisCluster

135

136

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

137

138

# Get pub/sub interface

139

pubsub = rc.pubsub()

140

141

# Subscribe to channels

142

pubsub.subscribe('news', 'updates', 'alerts')

143

144

# Listen for messages

145

for message in pubsub.listen():

146

if message['type'] == 'message':

147

channel = message['channel'].decode('utf-8')

148

data = message['data'].decode('utf-8')

149

print(f"Received on {channel}: {data}")

150

151

# Clean up

152

pubsub.close()

153

```

154

155

### Pattern Subscriptions

156

157

```python

158

# Subscribe to channel patterns

159

pubsub = rc.pubsub()

160

pubsub.psubscribe('news.*', 'events.*')

161

162

for message in pubsub.listen():

163

if message['type'] == 'pmessage':

164

pattern = message['pattern'].decode('utf-8')

165

channel = message['channel'].decode('utf-8')

166

data = message['data'].decode('utf-8')

167

print(f"Pattern {pattern} matched {channel}: {data}")

168

```

169

170

### Non-blocking Message Retrieval

171

172

```python

173

import time

174

175

pubsub = rc.pubsub()

176

pubsub.subscribe('status')

177

178

while True:

179

message = pubsub.get_message(timeout=1.0)

180

if message:

181

if message['type'] == 'message':

182

data = message['data'].decode('utf-8')

183

print(f"Status update: {data}")

184

else:

185

print("No message received, continuing...")

186

time.sleep(0.1)

187

```

188

189

### Publishing Messages

190

191

```python

192

# Publishing is done through the main client, not pub/sub interface

193

rc = RedisCluster(startup_nodes=[{"host": "127.0.0.1", "port": "7000"}])

194

195

# Publish to channels

196

rc.publish('news', 'Breaking: Redis Cluster is awesome!')

197

rc.publish('updates', 'System maintenance scheduled for tonight')

198

rc.publish('alerts', 'High memory usage detected')

199

200

# Publishing returns number of subscribers that received the message

201

subscribers = rc.publish('notifications', 'Hello subscribers!')

202

print(f"Message delivered to {subscribers} subscribers")

203

```

204

205

### Context Manager Usage

206

207

```python

208

# Use context manager for automatic cleanup

209

with rc.pubsub() as pubsub:

210

pubsub.subscribe('commands')

211

212

for message in pubsub.listen():

213

if message['type'] == 'message':

214

command = message['data'].decode('utf-8')

215

print(f"Received command: {command}")

216

217

if command == 'quit':

218

break

219

# Connection automatically closed when exiting context

220

```

221

222

### Cluster-Specific Considerations

223

224

```python

225

# Pub/sub in cluster environment considerations

226

pubsub = rc.pubsub()

227

228

# Channels are bound to specific nodes based on channel name hash

229

# All subscribers to a channel connect to the same node

230

pubsub.subscribe('global-channel')

231

232

# Pattern subscriptions work across the cluster

233

pubsub.psubscribe('node-*')

234

235

# Publishing reaches all subscribers regardless of which node you publish to

236

rc.publish('global-channel', 'Message from any node')

237

238

# Get pub/sub info for monitoring

239

channels = rc.pubsub_channels()

240

print(f"Active channels: {channels}")

241

242

num_subscribers = rc.pubsub_numsub('global-channel')

243

print(f"Subscribers to global-channel: {num_subscribers}")

244

```