Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering. Publishers broadcast messages with optional topics, while subscribers receive messages matching their topic subscriptions. This pattern is ideal for event distribution, news feeds, and real-time updates.
Broadcasts messages to multiple subscribers with optional topic-based routing. Publishers don't know or track individual subscribers.
class ZmqPubConnection(ZmqConnection):
"""
Publisher connection for broadcasting messages.
Uses ZeroMQ PUB socket type for one-to-many message distribution.
Messages are sent to all connected subscribers matching the topic filter.
"""
socketType = constants.PUB
def publish(self, message, tag=b''):
"""
Publish message with optional topic tag.
Args:
message (bytes): Message content to broadcast
tag (bytes): Topic tag for message filtering (default: empty)
Subscribers must subscribe to this tag to receive message
Note:
Topic matching is prefix-based. Tag b'news' matches subscriptions
to b'news', b'new', b'ne', b'n', and b'' (empty).
"""from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection
# Create publisher
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
publisher = ZmqPubConnection(factory, endpoint)
# Publish messages with different topics
publisher.publish(b"Breaking news: Market update", b"news")
publisher.publish(b"Weather: Sunny, 25C", b"weather")
publisher.publish(b"Sports: Team wins championship", b"sports")
# Publish message to all subscribers (no topic filter)
publisher.publish(b"System maintenance in 1 hour", b"")
# High-frequency publishing example
def publish_stock_prices():
publisher.publish(b'{"symbol":"AAPL","price":150.25,"volume":1000}', b"stocks.AAPL")
publisher.publish(b'{"symbol":"GOOGL","price":2800.50,"volume":500}', b"stocks.GOOGL")
reactor.callLater(1.0, publish_stock_prices) # Publish every second
publish_stock_prices()
reactor.run()Receives messages from publishers based on topic subscriptions. Subscribers can subscribe to multiple topics and receive messages matching any subscription.
class ZmqSubConnection(ZmqConnection):
"""
Subscriber connection for receiving published messages.
Uses ZeroMQ SUB socket type. Must subscribe to topics to receive messages.
Implements topic-based filtering on the subscriber side.
"""
socketType = constants.SUB
def subscribe(self, tag):
"""
Subscribe to messages with specified topic prefix.
Args:
tag (bytes): Topic prefix to subscribe to
Empty bytes (b'') subscribes to all messages
Prefix matching: b'news' receives b'news.*' topics
Note:
Can be called multiple times to subscribe to multiple topics.
Subscriptions are cumulative - messages matching any subscription are received.
"""
def unsubscribe(self, tag):
"""
Unsubscribe from messages with specified topic prefix.
Args:
tag (bytes): Topic prefix to unsubscribe from
Must match exactly the tag used in subscribe()
"""
def gotMessage(self, message, tag):
"""
Abstract method called when subscribed message is received.
Must be implemented by subclasses to handle incoming messages.
Args:
message (bytes): Message content from publisher
tag (bytes): Topic tag that message was published with
"""from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqSubConnection
class NewsSubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
print(f"News [{tag.decode()}]: {message.decode()}")
class WeatherSubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
print(f"Weather Update: {message.decode()}")
class StockSubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
import json
data = json.loads(message.decode())
print(f"Stock {data['symbol']}: ${data['price']} (Volume: {data['volume']})")
# Create subscribers
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
# News subscriber - only news topics
news_sub = NewsSubscriber(factory, endpoint)
news_sub.subscribe(b"news")
# Weather subscriber - only weather topics
weather_sub = WeatherSubscriber(factory, endpoint)
weather_sub.subscribe(b"weather")
# Stock subscriber - all stock topics
stock_sub = StockSubscriber(factory, endpoint)
stock_sub.subscribe(b"stocks") # Receives stocks.AAPL, stocks.GOOGL, etc.
# Multi-topic subscriber
class MultiSubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
print(f"Multi [{tag.decode()}]: {message.decode()}")
multi_sub = MultiSubscriber(factory, endpoint)
multi_sub.subscribe(b"news")
multi_sub.subscribe(b"weather")
multi_sub.subscribe(b"") # Subscribe to all messages (including untagged)
reactor.run()Topic-based message filtering using prefix matching for efficient message routing and selective message consumption.
# Topic matching is prefix-based
publisher.publish(b"content", b"news.breaking")
publisher.publish(b"content", b"news.sports")
publisher.publish(b"content", b"weather.local")
publisher.publish(b"content", b"stocks.AAPL")
# Subscription patterns and what they match:
subscriber.subscribe(b"") # Matches ALL messages
subscriber.subscribe(b"news") # Matches: news.breaking, news.sports
subscriber.subscribe(b"news.sports") # Matches: news.sports only
subscriber.subscribe(b"weather") # Matches: weather.local
subscriber.subscribe(b"stocks") # Matches: stocks.AAPLclass SmartSubscriber(ZmqSubConnection):
def __init__(self, factory, endpoint):
super().__init__(factory, endpoint)
self.handlers = {}
def add_topic_handler(self, topic_prefix, handler_func):
"""Add handler for specific topic prefix."""
self.handlers[topic_prefix] = handler_func
self.subscribe(topic_prefix)
def gotMessage(self, message, tag):
"""Route messages to appropriate handlers based on topic."""
tag_str = tag.decode()
# Find most specific matching handler
best_match = b""
handler = None
for topic_prefix, topic_handler in self.handlers.items():
if tag_str.startswith(topic_prefix.decode()) and len(topic_prefix) > len(best_match):
best_match = topic_prefix
handler = topic_handler
if handler:
handler(message, tag)
else:
print(f"Unhandled message [{tag_str}]: {message.decode()}")
# Usage
def handle_breaking_news(message, tag):
print(f"🚨 BREAKING: {message.decode()}")
def handle_sports(message, tag):
print(f"⚽ Sports: {message.decode()}")
def handle_weather(message, tag):
print(f"🌤 Weather: {message.decode()}")
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
subscriber = SmartSubscriber(factory, endpoint)
# Register topic-specific handlers
subscriber.add_topic_handler(b"news.breaking", handle_breaking_news)
subscriber.add_topic_handler(b"news.sports", handle_sports)
subscriber.add_topic_handler(b"weather", handle_weather)Common patterns for encoding structured data in pub/sub messages.
import json
import pickle
from datetime import datetime
class DataPublisher(ZmqPubConnection):
def publish_json(self, data, topic):
"""Publish data as JSON."""
message = json.dumps(data).encode('utf-8')
self.publish(message, topic)
def publish_timestamped(self, data, topic):
"""Publish data with timestamp."""
timestamped = {
'timestamp': datetime.utcnow().isoformat(),
'data': data
}
self.publish_json(timestamped, topic)
class DataSubscriber(ZmqSubConnection):
def gotMessage(self, message, tag):
try:
# Try to parse as JSON first
data = json.loads(message.decode('utf-8'))
self.handle_json_message(data, tag)
except (json.JSONDecodeError, UnicodeDecodeError):
# Fall back to raw bytes
self.handle_raw_message(message, tag)
def handle_json_message(self, data, tag):
if 'timestamp' in data:
print(f"[{data['timestamp']}] {tag.decode()}: {data['data']}")
else:
print(f"{tag.decode()}: {data}")
def handle_raw_message(self, message, tag):
print(f"{tag.decode()}: {message}")
# Usage example
factory = ZmqFactory()
pub_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
publisher = DataPublisher(factory, pub_endpoint)
# Publish structured data
publisher.publish_timestamped(
{"temperature": 25.5, "humidity": 60, "location": "NYC"},
b"weather.NYC"
)
publisher.publish_json(
{"symbol": "AAPL", "price": 150.25, "volume": 1000},
b"stocks.AAPL"
)Install with Tessl CLI
npx tessl i tessl/pypi-txzmq