CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Overview
Eval results
Files

pubsub.mddocs/

Publish-Subscribe Messaging

Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering. Publishers broadcast messages with optional topics, while subscribers receive messages matching their topic subscriptions. This pattern is ideal for event distribution, news feeds, and real-time updates.

Capabilities

Publisher Connection

Broadcasts messages to multiple subscribers with optional topic-based routing. Publishers don't know or track individual subscribers.

class ZmqPubConnection(ZmqConnection):
    """
    Publisher connection for broadcasting messages.
    
    Uses ZeroMQ PUB socket type for one-to-many message distribution.
    Messages are sent to all connected subscribers matching the topic filter.
    """
    
    socketType = constants.PUB
    
    def publish(self, message, tag=b''):
        """
        Publish message with optional topic tag.
        
        Args:
            message (bytes): Message content to broadcast
            tag (bytes): Topic tag for message filtering (default: empty)
                        Subscribers must subscribe to this tag to receive message
        
        Note:
            Topic matching is prefix-based. Tag b'news' matches subscriptions
            to b'news', b'new', b'ne', b'n', and b'' (empty).
        """

Publisher Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection

# Create publisher
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
publisher = ZmqPubConnection(factory, endpoint)

# Publish messages with different topics
publisher.publish(b"Breaking news: Market update", b"news")
publisher.publish(b"Weather: Sunny, 25C", b"weather")
publisher.publish(b"Sports: Team wins championship", b"sports")

# Publish message to all subscribers (no topic filter)
publisher.publish(b"System maintenance in 1 hour", b"")

# High-frequency publishing example
def publish_stock_prices():
    publisher.publish(b'{"symbol":"AAPL","price":150.25,"volume":1000}', b"stocks.AAPL")
    publisher.publish(b'{"symbol":"GOOGL","price":2800.50,"volume":500}', b"stocks.GOOGL")
    reactor.callLater(1.0, publish_stock_prices)  # Publish every second

publish_stock_prices()
reactor.run()

Subscriber Connection

Receives messages from publishers based on topic subscriptions. Subscribers can subscribe to multiple topics and receive messages matching any subscription.

class ZmqSubConnection(ZmqConnection):
    """
    Subscriber connection for receiving published messages.
    
    Uses ZeroMQ SUB socket type. Must subscribe to topics to receive messages.
    Implements topic-based filtering on the subscriber side.
    """
    
    socketType = constants.SUB
    
    def subscribe(self, tag):
        """
        Subscribe to messages with specified topic prefix.
        
        Args:
            tag (bytes): Topic prefix to subscribe to
                        Empty bytes (b'') subscribes to all messages
                        Prefix matching: b'news' receives b'news.*' topics
        
        Note:
            Can be called multiple times to subscribe to multiple topics.
            Subscriptions are cumulative - messages matching any subscription are received.
        """
    
    def unsubscribe(self, tag):
        """
        Unsubscribe from messages with specified topic prefix.
        
        Args:
            tag (bytes): Topic prefix to unsubscribe from
                        Must match exactly the tag used in subscribe()
        """
    
    def gotMessage(self, message, tag):
        """
        Abstract method called when subscribed message is received.
        
        Must be implemented by subclasses to handle incoming messages.
        
        Args:
            message (bytes): Message content from publisher
            tag (bytes): Topic tag that message was published with
        """

Subscriber Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqSubConnection

class NewsSubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        print(f"News [{tag.decode()}]: {message.decode()}")

class WeatherSubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        print(f"Weather Update: {message.decode()}")

class StockSubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        import json
        data = json.loads(message.decode())
        print(f"Stock {data['symbol']}: ${data['price']} (Volume: {data['volume']})")

# Create subscribers
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

# News subscriber - only news topics
news_sub = NewsSubscriber(factory, endpoint)
news_sub.subscribe(b"news")

# Weather subscriber - only weather topics  
weather_sub = WeatherSubscriber(factory, endpoint)
weather_sub.subscribe(b"weather")

# Stock subscriber - all stock topics
stock_sub = StockSubscriber(factory, endpoint)
stock_sub.subscribe(b"stocks")  # Receives stocks.AAPL, stocks.GOOGL, etc.

# Multi-topic subscriber
class MultiSubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        print(f"Multi [{tag.decode()}]: {message.decode()}")

multi_sub = MultiSubscriber(factory, endpoint)
multi_sub.subscribe(b"news")
multi_sub.subscribe(b"weather")
multi_sub.subscribe(b"")  # Subscribe to all messages (including untagged)

reactor.run()

Topic Filtering and Patterns

Topic-based message filtering using prefix matching for efficient message routing and selective message consumption.

Topic Matching Rules

# Topic matching is prefix-based
publisher.publish(b"content", b"news.breaking")
publisher.publish(b"content", b"news.sports")  
publisher.publish(b"content", b"weather.local")
publisher.publish(b"content", b"stocks.AAPL")

# Subscription patterns and what they match:
subscriber.subscribe(b"")           # Matches ALL messages
subscriber.subscribe(b"news")       # Matches: news.breaking, news.sports
subscriber.subscribe(b"news.sports") # Matches: news.sports only
subscriber.subscribe(b"weather")    # Matches: weather.local
subscriber.subscribe(b"stocks")     # Matches: stocks.AAPL

Advanced Filtering Example

class SmartSubscriber(ZmqSubConnection):
    def __init__(self, factory, endpoint):
        super().__init__(factory, endpoint)
        self.handlers = {}
    
    def add_topic_handler(self, topic_prefix, handler_func):
        """Add handler for specific topic prefix."""
        self.handlers[topic_prefix] = handler_func
        self.subscribe(topic_prefix)
    
    def gotMessage(self, message, tag):
        """Route messages to appropriate handlers based on topic."""
        tag_str = tag.decode()
        
        # Find most specific matching handler
        best_match = b""
        handler = None
        
        for topic_prefix, topic_handler in self.handlers.items():
            if tag_str.startswith(topic_prefix.decode()) and len(topic_prefix) > len(best_match):
                best_match = topic_prefix
                handler = topic_handler
        
        if handler:
            handler(message, tag)
        else:
            print(f"Unhandled message [{tag_str}]: {message.decode()}")

# Usage
def handle_breaking_news(message, tag):
    print(f"🚨 BREAKING: {message.decode()}")

def handle_sports(message, tag):
    print(f"⚽ Sports: {message.decode()}")

def handle_weather(message, tag):
    print(f"🌤 Weather: {message.decode()}")

factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
subscriber = SmartSubscriber(factory, endpoint)

# Register topic-specific handlers
subscriber.add_topic_handler(b"news.breaking", handle_breaking_news)
subscriber.add_topic_handler(b"news.sports", handle_sports)
subscriber.add_topic_handler(b"weather", handle_weather)

Message Serialization and Formats

Common patterns for encoding structured data in pub/sub messages.

import json
import pickle
from datetime import datetime

class DataPublisher(ZmqPubConnection):
    def publish_json(self, data, topic):
        """Publish data as JSON."""
        message = json.dumps(data).encode('utf-8')
        self.publish(message, topic)
    
    def publish_timestamped(self, data, topic):
        """Publish data with timestamp."""
        timestamped = {
            'timestamp': datetime.utcnow().isoformat(),
            'data': data
        }
        self.publish_json(timestamped, topic)

class DataSubscriber(ZmqSubConnection):
    def gotMessage(self, message, tag):
        try:
            # Try to parse as JSON first
            data = json.loads(message.decode('utf-8'))
            self.handle_json_message(data, tag)
        except (json.JSONDecodeError, UnicodeDecodeError):
            # Fall back to raw bytes
            self.handle_raw_message(message, tag)
    
    def handle_json_message(self, data, tag):
        if 'timestamp' in data:
            print(f"[{data['timestamp']}] {tag.decode()}: {data['data']}")
        else:
            print(f"{tag.decode()}: {data}")
    
    def handle_raw_message(self, message, tag):
        print(f"{tag.decode()}: {message}")

# Usage example
factory = ZmqFactory()
pub_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
publisher = DataPublisher(factory, pub_endpoint)

# Publish structured data
publisher.publish_timestamped(
    {"temperature": 25.5, "humidity": 60, "location": "NYC"}, 
    b"weather.NYC"
)

publisher.publish_json(
    {"symbol": "AAPL", "price": 150.25, "volume": 1000},
    b"stocks.AAPL"
)

Install with Tessl CLI

npx tessl i tessl/pypi-txzmq

docs

factory-connection.md

index.md

pubsub.md

pushpull.md

reqrep.md

router-dealer.md

tile.json