CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

pubsub.mddocs/

Publisher/Subscriber Pattern

The Publisher/Subscriber pattern enables real-time data streaming in Zenoh applications. Publishers send data to specific key expressions, while Subscribers receive data matching their subscription patterns. This decoupled messaging pattern supports high-throughput, low-latency communication with flexible quality of service controls.

Capabilities

Publisher

Publishers send data to specific key expressions with configurable quality of service parameters.

def declare_publisher(
    self, 
    key_expr, 
    encoding: Encoding = None,
    congestion_control: CongestionControl = None,
    priority: Priority = None,
    reliability: Reliability = None
) -> Publisher:
    """
    Declare a publisher for a key expression.
    
    Parameters:
    - key_expr: Key expression to publish on
    - encoding: Data encoding specification
    - congestion_control: How to handle network congestion
    - priority: Message priority level
    - reliability: Reliability mode for message delivery
    
    Returns:
    Publisher object for sending data
    """

class Publisher:
    """Publisher for data streams"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the publisher's key expression"""
    
    @property
    def encoding(self) -> Encoding:
        """Get the publisher's encoding"""
    
    @property
    def congestion_control(self) -> CongestionControl:
        """Get congestion control setting"""
    
    @property
    def priority(self) -> Priority:
        """Get priority setting"""
    
    @property
    def reliability(self) -> Reliability:
        """Get reliability setting"""
    
    @property
    def matching_status(self) -> MatchingStatus:
        """Get current matching status"""
    
    def put(
        self, 
        payload, 
        encoding: Encoding = None,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """
        Send data through the publisher.
        
        Parameters:
        - payload: Data to send (str, bytes, or ZBytes)
        - encoding: Override default encoding
        - timestamp: Custom timestamp for the data
        - attachment: Additional metadata
        """
    
    def delete(
        self,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """
        Send a delete operation.
        
        Parameters:
        - timestamp: Custom timestamp for the delete
        - attachment: Additional metadata
        """
    
    def undeclare(self) -> None:
        """Undeclare the publisher and release resources"""
    
    def declare_matching_listener(self, handler) -> MatchingListener:
        """Declare a listener for matching status changes"""

Subscriber

Subscribers receive data matching their subscription key expressions through configurable handlers.

def declare_subscriber(
    self,
    key_expr,
    handler = None,
    reliability: Reliability = None,
    locality: Locality = None
) -> Subscriber:
    """
    Declare a subscriber for a key expression.
    
    Parameters:
    - key_expr: Key expression pattern to subscribe to
    - handler: Handler for received samples (callback, channel, etc.)
    - reliability: Reliability mode for receiving data
    - locality: Locality constraint for data sources
    
    Returns:
    Subscriber object for receiving data
    """

class Subscriber:
    """Subscriber with generic handler"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the subscriber's key expression"""
    
    @property
    def handler(self):
        """Get the subscriber's handler"""
    
    def undeclare(self) -> None:
        """Undeclare the subscriber and release resources"""
    
    def try_recv(self):
        """Try to receive a sample without blocking"""
    
    def recv(self):
        """Receive a sample (blocking)"""
    
    def __iter__(self):
        """Iterate over received samples"""

Sample Data

Data samples received by subscribers contain the payload and metadata.

class Sample:
    """Data sample"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Key expression where data was published"""
    
    @property
    def payload(self) -> ZBytes:
        """Sample payload data"""
    
    @property
    def kind(self) -> SampleKind:
        """Sample kind (PUT or DELETE)"""
    
    @property
    def encoding(self) -> Encoding:
        """Data encoding"""
    
    @property
    def timestamp(self) -> Timestamp:
        """Sample timestamp"""
    
    @property
    def congestion_control(self) -> CongestionControl:
        """Congestion control setting"""
    
    @property
    def priority(self) -> Priority:
        """Message priority"""
    
    @property
    def express(self) -> bool:
        """Express delivery flag"""
    
    @property
    def attachment(self):
        """Additional metadata attachment"""

class SampleKind:
    """Sample operation type"""
    PUT = ...
    DELETE = ...

Quality of Service Controls

Configure message delivery characteristics and network behavior.

class Priority:
    """Message priority levels"""
    REAL_TIME = ...
    INTERACTIVE_HIGH = ...
    INTERACTIVE_LOW = ...
    DATA_HIGH = ...
    DATA = ...
    DATA_LOW = ...
    BACKGROUND = ...
    
    DEFAULT = ...
    MIN = ...
    MAX = ...

class CongestionControl:
    """Congestion control modes"""
    DROP = ...          # Drop messages when congested
    BLOCK = ...         # Block sender when congested
    BLOCK_FIRST = ...   # Block first message when congested (unstable)
    
    DEFAULT = ...

class Reliability:
    """Reliability modes (unstable)"""
    BEST_EFFORT = ...   # Best effort delivery
    RELIABLE = ...      # Reliable delivery
    
class Locality:
    """Origin/destination locality"""
    SESSION_LOCAL = ... # Only local session
    REMOTE = ...        # Only remote sources
    ANY = ...           # Any source
    
    DEFAULT = ...

Matching Status

Monitor whether publishers and subscribers are matched with peers.

class MatchingStatus:
    """Entity matching status"""
    
    @property
    def matching(self) -> bool:
        """Whether there are matching entities"""

class MatchingListener:
    """Matching status listener"""
    
    @property
    def handler(self):
        """Get the listener's handler"""
    
    def undeclare(self) -> None:
        """Undeclare the matching listener"""
    
    def try_recv(self):
        """Try to receive a matching status update"""
    
    def recv(self):
        """Receive a matching status update (blocking)"""
    
    def __iter__(self):
        """Iterate over matching status updates"""

Usage Examples

Basic Publisher

import zenoh

session = zenoh.open()

# Declare publisher
publisher = session.declare_publisher("sensors/temperature")

# Send data
publisher.put("25.3")
publisher.put(b"binary_data")

# Send with metadata
publisher.put(
    "26.1",
    timestamp=session.new_timestamp(),
    attachment={"sensor_id": "temp_01"}
)

# Clean up
publisher.undeclare()
session.close()

Publisher with Quality of Service

import zenoh

session = zenoh.open()

# High-priority publisher with reliable delivery
publisher = session.declare_publisher(
    "critical/alerts",
    priority=zenoh.Priority.REAL_TIME,
    congestion_control=zenoh.CongestionControl.BLOCK,
    reliability=zenoh.Reliability.RELIABLE
)

publisher.put("System critical alert!")
publisher.undeclare()
session.close()

Basic Subscriber with Callback

import zenoh

def data_handler(sample):
    print(f"Received on {sample.key_expr}: {sample.payload.to_string()}")
    if sample.kind == zenoh.SampleKind.DELETE:
        print("  -> DELETE operation")

session = zenoh.open()

# Subscribe with callback handler
subscriber = session.declare_subscriber("sensors/**", data_handler)

# Let it run
import time
time.sleep(10)

subscriber.undeclare()
session.close()

Subscriber with Manual Reception

import zenoh

session = zenoh.open()

# Subscribe without callback
subscriber = session.declare_subscriber("data/stream")

# Manual reception
try:
    sample = subscriber.recv()  # Blocking receive
    print(f"Got: {sample.payload.to_string()}")
except KeyboardInterrupt:
    pass

# Non-blocking reception
sample = subscriber.try_recv()
if sample is not None:
    print(f"Got: {sample.payload.to_string()}")

# Iterator style
for sample in subscriber:
    print(f"Sample: {sample.payload.to_string()}")
    if some_condition:
        break

subscriber.undeclare()
session.close()

Matching Status Monitoring

import zenoh

session = zenoh.open()

publisher = session.declare_publisher("demo/pub")

def matching_handler(status):
    if status.matching:
        print("Publisher has matching subscribers!")
    else:
        print("No matching subscribers")

# Monitor matching status
listener = publisher.declare_matching_listener(matching_handler)

# Check current status
print(f"Currently matching: {publisher.matching_status.matching}")

# Clean up
listener.undeclare()
publisher.undeclare()
session.close()

Complete Pub/Sub Example

import zenoh
import threading
import time

def publisher_thread():
    session = zenoh.open()
    publisher = session.declare_publisher("demo/example")
    
    for i in range(10):
        publisher.put(f"Message {i}")
        time.sleep(1)
    
    publisher.undeclare()
    session.close()

def subscriber_thread():
    def handler(sample):
        print(f"Subscriber received: {sample.payload.to_string()}")
    
    session = zenoh.open()
    subscriber = session.declare_subscriber("demo/example", handler)
    
    time.sleep(12)  # Let it run
    
    subscriber.undeclare()
    session.close()

# Run both threads
pub_thread = threading.Thread(target=publisher_thread)
sub_thread = threading.Thread(target=subscriber_thread)

pub_thread.start()
sub_thread.start()

pub_thread.join()
sub_thread.join()

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json