0
# Publish-Subscribe Messaging
1
2
Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering. Publishers broadcast messages with optional topics, while subscribers receive messages matching their topic subscriptions. This pattern is ideal for event distribution, news feeds, and real-time updates.
3
4
## Capabilities
5
6
### Publisher Connection
7
8
Broadcasts messages to multiple subscribers with optional topic-based routing. Publishers don't know or track individual subscribers.
9
10
```python { .api }
11
class ZmqPubConnection(ZmqConnection):
12
"""
13
Publisher connection for broadcasting messages.
14
15
Uses ZeroMQ PUB socket type for one-to-many message distribution.
16
Messages are sent to all connected subscribers matching the topic filter.
17
"""
18
19
socketType = constants.PUB
20
21
def publish(self, message, tag=b''):
22
"""
23
Publish message with optional topic tag.
24
25
Args:
26
message (bytes): Message content to broadcast
27
tag (bytes): Topic tag for message filtering (default: empty)
28
Subscribers must subscribe to this tag to receive message
29
30
Note:
31
Topic matching is prefix-based. Tag b'news' matches subscriptions
32
to b'news', b'new', b'ne', b'n', and b'' (empty).
33
"""
34
```
35
36
#### Publisher Usage Example
37
38
```python
39
from twisted.internet import reactor
40
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection
41
42
# Create publisher
43
factory = ZmqFactory()
44
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
45
publisher = ZmqPubConnection(factory, endpoint)
46
47
# Publish messages with different topics
48
publisher.publish(b"Breaking news: Market update", b"news")
49
publisher.publish(b"Weather: Sunny, 25C", b"weather")
50
publisher.publish(b"Sports: Team wins championship", b"sports")
51
52
# Publish message to all subscribers (no topic filter)
53
publisher.publish(b"System maintenance in 1 hour", b"")
54
55
# High-frequency publishing example
56
def publish_stock_prices():
57
publisher.publish(b'{"symbol":"AAPL","price":150.25,"volume":1000}', b"stocks.AAPL")
58
publisher.publish(b'{"symbol":"GOOGL","price":2800.50,"volume":500}', b"stocks.GOOGL")
59
reactor.callLater(1.0, publish_stock_prices) # Publish every second
60
61
publish_stock_prices()
62
reactor.run()
63
```
64
65
### Subscriber Connection
66
67
Receives messages from publishers based on topic subscriptions. Subscribers can subscribe to multiple topics and receive messages matching any subscription.
68
69
```python { .api }
70
class ZmqSubConnection(ZmqConnection):
71
"""
72
Subscriber connection for receiving published messages.
73
74
Uses ZeroMQ SUB socket type. Must subscribe to topics to receive messages.
75
Implements topic-based filtering on the subscriber side.
76
"""
77
78
socketType = constants.SUB
79
80
def subscribe(self, tag):
81
"""
82
Subscribe to messages with specified topic prefix.
83
84
Args:
85
tag (bytes): Topic prefix to subscribe to
86
Empty bytes (b'') subscribes to all messages
87
Prefix matching: b'news' receives b'news.*' topics
88
89
Note:
90
Can be called multiple times to subscribe to multiple topics.
91
Subscriptions are cumulative - messages matching any subscription are received.
92
"""
93
94
def unsubscribe(self, tag):
95
"""
96
Unsubscribe from messages with specified topic prefix.
97
98
Args:
99
tag (bytes): Topic prefix to unsubscribe from
100
Must match exactly the tag used in subscribe()
101
"""
102
103
def gotMessage(self, message, tag):
104
"""
105
Abstract method called when subscribed message is received.
106
107
Must be implemented by subclasses to handle incoming messages.
108
109
Args:
110
message (bytes): Message content from publisher
111
tag (bytes): Topic tag that message was published with
112
"""
113
```
114
115
#### Subscriber Usage Example
116
117
```python
118
from twisted.internet import reactor
119
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqSubConnection
120
121
class NewsSubscriber(ZmqSubConnection):
122
def gotMessage(self, message, tag):
123
print(f"News [{tag.decode()}]: {message.decode()}")
124
125
class WeatherSubscriber(ZmqSubConnection):
126
def gotMessage(self, message, tag):
127
print(f"Weather Update: {message.decode()}")
128
129
class StockSubscriber(ZmqSubConnection):
130
def gotMessage(self, message, tag):
131
import json
132
data = json.loads(message.decode())
133
print(f"Stock {data['symbol']}: ${data['price']} (Volume: {data['volume']})")
134
135
# Create subscribers
136
factory = ZmqFactory()
137
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
138
139
# News subscriber - only news topics
140
news_sub = NewsSubscriber(factory, endpoint)
141
news_sub.subscribe(b"news")
142
143
# Weather subscriber - only weather topics
144
weather_sub = WeatherSubscriber(factory, endpoint)
145
weather_sub.subscribe(b"weather")
146
147
# Stock subscriber - all stock topics
148
stock_sub = StockSubscriber(factory, endpoint)
149
stock_sub.subscribe(b"stocks") # Receives stocks.AAPL, stocks.GOOGL, etc.
150
151
# Multi-topic subscriber
152
class MultiSubscriber(ZmqSubConnection):
153
def gotMessage(self, message, tag):
154
print(f"Multi [{tag.decode()}]: {message.decode()}")
155
156
multi_sub = MultiSubscriber(factory, endpoint)
157
multi_sub.subscribe(b"news")
158
multi_sub.subscribe(b"weather")
159
multi_sub.subscribe(b"") # Subscribe to all messages (including untagged)
160
161
reactor.run()
162
```
163
164
### Topic Filtering and Patterns
165
166
Topic-based message filtering using prefix matching for efficient message routing and selective message consumption.
167
168
#### Topic Matching Rules
169
170
```python
171
# Topic matching is prefix-based
172
publisher.publish(b"content", b"news.breaking")
173
publisher.publish(b"content", b"news.sports")
174
publisher.publish(b"content", b"weather.local")
175
publisher.publish(b"content", b"stocks.AAPL")
176
177
# Subscription patterns and what they match:
178
subscriber.subscribe(b"") # Matches ALL messages
179
subscriber.subscribe(b"news") # Matches: news.breaking, news.sports
180
subscriber.subscribe(b"news.sports") # Matches: news.sports only
181
subscriber.subscribe(b"weather") # Matches: weather.local
182
subscriber.subscribe(b"stocks") # Matches: stocks.AAPL
183
```
184
185
#### Advanced Filtering Example
186
187
```python
188
class SmartSubscriber(ZmqSubConnection):
189
def __init__(self, factory, endpoint):
190
super().__init__(factory, endpoint)
191
self.handlers = {}
192
193
def add_topic_handler(self, topic_prefix, handler_func):
194
"""Add handler for specific topic prefix."""
195
self.handlers[topic_prefix] = handler_func
196
self.subscribe(topic_prefix)
197
198
def gotMessage(self, message, tag):
199
"""Route messages to appropriate handlers based on topic."""
200
tag_str = tag.decode()
201
202
# Find most specific matching handler
203
best_match = b""
204
handler = None
205
206
for topic_prefix, topic_handler in self.handlers.items():
207
if tag_str.startswith(topic_prefix.decode()) and len(topic_prefix) > len(best_match):
208
best_match = topic_prefix
209
handler = topic_handler
210
211
if handler:
212
handler(message, tag)
213
else:
214
print(f"Unhandled message [{tag_str}]: {message.decode()}")
215
216
# Usage
217
def handle_breaking_news(message, tag):
218
print(f"🚨 BREAKING: {message.decode()}")
219
220
def handle_sports(message, tag):
221
print(f"âš½ Sports: {message.decode()}")
222
223
def handle_weather(message, tag):
224
print(f"🌤 Weather: {message.decode()}")
225
226
factory = ZmqFactory()
227
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
228
subscriber = SmartSubscriber(factory, endpoint)
229
230
# Register topic-specific handlers
231
subscriber.add_topic_handler(b"news.breaking", handle_breaking_news)
232
subscriber.add_topic_handler(b"news.sports", handle_sports)
233
subscriber.add_topic_handler(b"weather", handle_weather)
234
```
235
236
### Message Serialization and Formats
237
238
Common patterns for encoding structured data in pub/sub messages.
239
240
```python
241
import json
242
import pickle
243
from datetime import datetime
244
245
class DataPublisher(ZmqPubConnection):
246
def publish_json(self, data, topic):
247
"""Publish data as JSON."""
248
message = json.dumps(data).encode('utf-8')
249
self.publish(message, topic)
250
251
def publish_timestamped(self, data, topic):
252
"""Publish data with timestamp."""
253
timestamped = {
254
'timestamp': datetime.utcnow().isoformat(),
255
'data': data
256
}
257
self.publish_json(timestamped, topic)
258
259
class DataSubscriber(ZmqSubConnection):
260
def gotMessage(self, message, tag):
261
try:
262
# Try to parse as JSON first
263
data = json.loads(message.decode('utf-8'))
264
self.handle_json_message(data, tag)
265
except (json.JSONDecodeError, UnicodeDecodeError):
266
# Fall back to raw bytes
267
self.handle_raw_message(message, tag)
268
269
def handle_json_message(self, data, tag):
270
if 'timestamp' in data:
271
print(f"[{data['timestamp']}] {tag.decode()}: {data['data']}")
272
else:
273
print(f"{tag.decode()}: {data}")
274
275
def handle_raw_message(self, message, tag):
276
print(f"{tag.decode()}: {message}")
277
278
# Usage example
279
factory = ZmqFactory()
280
pub_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
281
publisher = DataPublisher(factory, pub_endpoint)
282
283
# Publish structured data
284
publisher.publish_timestamped(
285
{"temperature": 25.5, "humidity": 60, "location": "NYC"},
286
b"weather.NYC"
287
)
288
289
publisher.publish_json(
290
{"symbol": "AAPL", "price": 150.25, "volume": 1000},
291
b"stocks.AAPL"
292
)
293
```