or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

pubsub.mddocs/

0

# Publish-Subscribe Messaging

1

2

Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering. Publishers broadcast messages with optional topics, while subscribers receive messages matching their topic subscriptions. This pattern is ideal for event distribution, news feeds, and real-time updates.

3

4

## Capabilities

5

6

### Publisher Connection

7

8

Broadcasts messages to multiple subscribers with optional topic-based routing. Publishers don't know or track individual subscribers.

9

10

```python { .api }

11

class ZmqPubConnection(ZmqConnection):

12

"""

13

Publisher connection for broadcasting messages.

14

15

Uses ZeroMQ PUB socket type for one-to-many message distribution.

16

Messages are sent to all connected subscribers matching the topic filter.

17

"""

18

19

socketType = constants.PUB

20

21

def publish(self, message, tag=b''):

22

"""

23

Publish message with optional topic tag.

24

25

Args:

26

message (bytes): Message content to broadcast

27

tag (bytes): Topic tag for message filtering (default: empty)

28

Subscribers must subscribe to this tag to receive message

29

30

Note:

31

Topic matching is prefix-based. Tag b'news' matches subscriptions

32

to b'news', b'new', b'ne', b'n', and b'' (empty).

33

"""

34

```

35

36

#### Publisher Usage Example

37

38

```python

39

from twisted.internet import reactor

40

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection

41

42

# Create publisher

43

factory = ZmqFactory()

44

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

45

publisher = ZmqPubConnection(factory, endpoint)

46

47

# Publish messages with different topics

48

publisher.publish(b"Breaking news: Market update", b"news")

49

publisher.publish(b"Weather: Sunny, 25C", b"weather")

50

publisher.publish(b"Sports: Team wins championship", b"sports")

51

52

# Publish message to all subscribers (no topic filter)

53

publisher.publish(b"System maintenance in 1 hour", b"")

54

55

# High-frequency publishing example

56

def publish_stock_prices():

57

publisher.publish(b'{"symbol":"AAPL","price":150.25,"volume":1000}', b"stocks.AAPL")

58

publisher.publish(b'{"symbol":"GOOGL","price":2800.50,"volume":500}', b"stocks.GOOGL")

59

reactor.callLater(1.0, publish_stock_prices) # Publish every second

60

61

publish_stock_prices()

62

reactor.run()

63

```

64

65

### Subscriber Connection

66

67

Receives messages from publishers based on topic subscriptions. Subscribers can subscribe to multiple topics and receive messages matching any subscription.

68

69

```python { .api }

70

class ZmqSubConnection(ZmqConnection):

71

"""

72

Subscriber connection for receiving published messages.

73

74

Uses ZeroMQ SUB socket type. Must subscribe to topics to receive messages.

75

Implements topic-based filtering on the subscriber side.

76

"""

77

78

socketType = constants.SUB

79

80

def subscribe(self, tag):

81

"""

82

Subscribe to messages with specified topic prefix.

83

84

Args:

85

tag (bytes): Topic prefix to subscribe to

86

Empty bytes (b'') subscribes to all messages

87

Prefix matching: b'news' receives b'news.*' topics

88

89

Note:

90

Can be called multiple times to subscribe to multiple topics.

91

Subscriptions are cumulative - messages matching any subscription are received.

92

"""

93

94

def unsubscribe(self, tag):

95

"""

96

Unsubscribe from messages with specified topic prefix.

97

98

Args:

99

tag (bytes): Topic prefix to unsubscribe from

100

Must match exactly the tag used in subscribe()

101

"""

102

103

def gotMessage(self, message, tag):

104

"""

105

Abstract method called when subscribed message is received.

106

107

Must be implemented by subclasses to handle incoming messages.

108

109

Args:

110

message (bytes): Message content from publisher

111

tag (bytes): Topic tag that message was published with

112

"""

113

```

114

115

#### Subscriber Usage Example

116

117

```python

118

from twisted.internet import reactor

119

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqSubConnection

120

121

class NewsSubscriber(ZmqSubConnection):

122

def gotMessage(self, message, tag):

123

print(f"News [{tag.decode()}]: {message.decode()}")

124

125

class WeatherSubscriber(ZmqSubConnection):

126

def gotMessage(self, message, tag):

127

print(f"Weather Update: {message.decode()}")

128

129

class StockSubscriber(ZmqSubConnection):

130

def gotMessage(self, message, tag):

131

import json

132

data = json.loads(message.decode())

133

print(f"Stock {data['symbol']}: ${data['price']} (Volume: {data['volume']})")

134

135

# Create subscribers

136

factory = ZmqFactory()

137

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

138

139

# News subscriber - only news topics

140

news_sub = NewsSubscriber(factory, endpoint)

141

news_sub.subscribe(b"news")

142

143

# Weather subscriber - only weather topics

144

weather_sub = WeatherSubscriber(factory, endpoint)

145

weather_sub.subscribe(b"weather")

146

147

# Stock subscriber - all stock topics

148

stock_sub = StockSubscriber(factory, endpoint)

149

stock_sub.subscribe(b"stocks") # Receives stocks.AAPL, stocks.GOOGL, etc.

150

151

# Multi-topic subscriber

152

class MultiSubscriber(ZmqSubConnection):

153

def gotMessage(self, message, tag):

154

print(f"Multi [{tag.decode()}]: {message.decode()}")

155

156

multi_sub = MultiSubscriber(factory, endpoint)

157

multi_sub.subscribe(b"news")

158

multi_sub.subscribe(b"weather")

159

multi_sub.subscribe(b"") # Subscribe to all messages (including untagged)

160

161

reactor.run()

162

```

163

164

### Topic Filtering and Patterns

165

166

Topic-based message filtering using prefix matching for efficient message routing and selective message consumption.

167

168

#### Topic Matching Rules

169

170

```python

171

# Topic matching is prefix-based

172

publisher.publish(b"content", b"news.breaking")

173

publisher.publish(b"content", b"news.sports")

174

publisher.publish(b"content", b"weather.local")

175

publisher.publish(b"content", b"stocks.AAPL")

176

177

# Subscription patterns and what they match:

178

subscriber.subscribe(b"") # Matches ALL messages

179

subscriber.subscribe(b"news") # Matches: news.breaking, news.sports

180

subscriber.subscribe(b"news.sports") # Matches: news.sports only

181

subscriber.subscribe(b"weather") # Matches: weather.local

182

subscriber.subscribe(b"stocks") # Matches: stocks.AAPL

183

```

184

185

#### Advanced Filtering Example

186

187

```python

188

class SmartSubscriber(ZmqSubConnection):

189

def __init__(self, factory, endpoint):

190

super().__init__(factory, endpoint)

191

self.handlers = {}

192

193

def add_topic_handler(self, topic_prefix, handler_func):

194

"""Add handler for specific topic prefix."""

195

self.handlers[topic_prefix] = handler_func

196

self.subscribe(topic_prefix)

197

198

def gotMessage(self, message, tag):

199

"""Route messages to appropriate handlers based on topic."""

200

tag_str = tag.decode()

201

202

# Find most specific matching handler

203

best_match = b""

204

handler = None

205

206

for topic_prefix, topic_handler in self.handlers.items():

207

if tag_str.startswith(topic_prefix.decode()) and len(topic_prefix) > len(best_match):

208

best_match = topic_prefix

209

handler = topic_handler

210

211

if handler:

212

handler(message, tag)

213

else:

214

print(f"Unhandled message [{tag_str}]: {message.decode()}")

215

216

# Usage

217

def handle_breaking_news(message, tag):

218

print(f"🚨 BREAKING: {message.decode()}")

219

220

def handle_sports(message, tag):

221

print(f"âš½ Sports: {message.decode()}")

222

223

def handle_weather(message, tag):

224

print(f"🌤 Weather: {message.decode()}")

225

226

factory = ZmqFactory()

227

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

228

subscriber = SmartSubscriber(factory, endpoint)

229

230

# Register topic-specific handlers

231

subscriber.add_topic_handler(b"news.breaking", handle_breaking_news)

232

subscriber.add_topic_handler(b"news.sports", handle_sports)

233

subscriber.add_topic_handler(b"weather", handle_weather)

234

```

235

236

### Message Serialization and Formats

237

238

Common patterns for encoding structured data in pub/sub messages.

239

240

```python

241

import json

242

import pickle

243

from datetime import datetime

244

245

class DataPublisher(ZmqPubConnection):

246

def publish_json(self, data, topic):

247

"""Publish data as JSON."""

248

message = json.dumps(data).encode('utf-8')

249

self.publish(message, topic)

250

251

def publish_timestamped(self, data, topic):

252

"""Publish data with timestamp."""

253

timestamped = {

254

'timestamp': datetime.utcnow().isoformat(),

255

'data': data

256

}

257

self.publish_json(timestamped, topic)

258

259

class DataSubscriber(ZmqSubConnection):

260

def gotMessage(self, message, tag):

261

try:

262

# Try to parse as JSON first

263

data = json.loads(message.decode('utf-8'))

264

self.handle_json_message(data, tag)

265

except (json.JSONDecodeError, UnicodeDecodeError):

266

# Fall back to raw bytes

267

self.handle_raw_message(message, tag)

268

269

def handle_json_message(self, data, tag):

270

if 'timestamp' in data:

271

print(f"[{data['timestamp']}] {tag.decode()}: {data['data']}")

272

else:

273

print(f"{tag.decode()}: {data}")

274

275

def handle_raw_message(self, message, tag):

276

print(f"{tag.decode()}: {message}")

277

278

# Usage example

279

factory = ZmqFactory()

280

pub_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

281

publisher = DataPublisher(factory, pub_endpoint)

282

283

# Publish structured data

284

publisher.publish_timestamped(

285

{"temperature": 25.5, "humidity": 60, "location": "NYC"},

286

b"weather.NYC"

287

)

288

289

publisher.publish_json(

290

{"symbol": "AAPL", "price": 150.25, "volume": 1000},

291

b"stocks.AAPL"

292

)

293

```