CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

advanced.mddocs/

Advanced Features

Advanced features in Zenoh provide sophisticated networking capabilities including liveliness detection, matching status monitoring, custom handler systems, and network administration tools. These features enable building robust, self-monitoring distributed systems with fine-grained control over communication patterns.

Capabilities

Liveliness Management

Monitor and advertise the availability of distributed system components.

class Liveliness:
    """Liveliness management for distributed system components"""
    
    def declare_token(self, key_expr) -> LivelinessToken:
        """
        Declare a liveliness token for a key expression.
        
        Parameters:
        - key_expr: Key expression to declare liveliness for
        
        Returns:
        LivelinessToken that maintains liveliness while active
        """
    
    def get(
        self,
        key_expr,
        handler = None,
        timeout: float = None
    ):
        """
        Query current liveliness status.
        
        Parameters:
        - key_expr: Key expression pattern to query
        - handler: Handler for liveliness replies
        - timeout: Query timeout in seconds
        
        Returns:
        Iterator over liveliness replies if no handler provided
        """
    
    def declare_subscriber(
        self,
        key_expr,
        handler,
        history: bool = False
    ) -> Subscriber:
        """
        Subscribe to liveliness changes.
        
        Parameters:
        - key_expr: Key expression pattern to monitor
        - handler: Handler for liveliness change notifications
        - history: Whether to receive historical liveliness data
        
        Returns:
        Subscriber for liveliness changes
        """

class LivelinessToken:
    """Liveliness token that maintains component availability"""
    
    def __enter__(self):
        """Context manager entry"""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - automatically undeclares token"""
    
    def undeclare(self) -> None:
        """Undeclare the liveliness token"""

Matching Status Monitoring

Monitor whether publishers/subscribers and queriers/queryables have matching peers.

class MatchingStatus:
    """Entity matching status"""
    
    @property
    def matching(self) -> bool:
        """Whether there are matching entities on the network"""

class MatchingListener:
    """Matching status listener for monitoring connection changes"""
    
    @property
    def handler(self):
        """Get the listener's handler"""
    
    def undeclare(self) -> None:
        """Undeclare the matching listener"""
    
    def try_recv(self):
        """Try to receive matching status update without blocking"""
    
    def recv(self):
        """Receive matching status update (blocking)"""
    
    def __iter__(self):
        """Iterate over matching status updates"""

Handler System

Flexible handler system supporting various callback patterns and channel types.

from zenoh.handlers import Handler, DefaultHandler, FifoChannel, RingChannel, Callback

class Handler:
    """Generic handler interface for receiving data"""
    
    def try_recv(self):
        """Try to receive data without blocking"""
    
    def recv(self):
        """Receive data (blocking)"""
    
    def __iter__(self):
        """Iterate over received data"""
    
    def __next__(self):
        """Get next item in iteration"""

class DefaultHandler:
    """Default FIFO handler with unlimited capacity"""
    pass

class FifoChannel:
    """FIFO channel with configurable capacity"""
    
    def __init__(self, capacity: int):
        """
        Create FIFO channel with specified capacity.
        
        Parameters:
        - capacity: Maximum number of items to buffer
        """

class RingChannel:
    """Ring channel that overwrites oldest data when full"""
    
    def __init__(self, capacity: int):
        """
        Create ring channel with fixed capacity.
        
        Parameters:
        - capacity: Fixed size of the ring buffer
        """

class Callback:
    """Callback handler wrapper for Python functions"""
    
    @property
    def callback(self):
        """Get the callback function"""
    
    @property
    def drop(self):
        """Get the drop callback (called when handler is dropped)"""
    
    @property
    def indirect(self) -> bool:
        """Whether callback is called indirectly"""
    
    def __init__(
        self,
        callback,
        drop = None,
        indirect: bool = False
    ):
        """
        Create callback handler.
        
        Parameters:
        - callback: Function to call with received data
        - drop: Optional cleanup function
        - indirect: Whether to use indirect calling
        """
    
    def __call__(self, *args, **kwargs):
        """Call the wrapped callback function"""

Network Administration

Tools for network configuration and monitoring.

# Access session's liveliness interface
@property
def liveliness(self) -> Liveliness:
    """Get session's liveliness interface"""

# Session operations for entity management
def undeclare(self, entity) -> None:
    """
    Undeclare any Zenoh entity (publisher, subscriber, etc.)
    
    Parameters:
    - entity: The entity to undeclare
    """

def declare_keyexpr(self, key_expr: str) -> KeyExpr:
    """
    Declare a key expression for optimized repeated usage.
    
    Parameters:
    - key_expr: Key expression string to optimize
    
    Returns:
    Optimized KeyExpr object
    """

Error Handling

Comprehensive error handling for robust distributed applications.

class ZError(Exception):
    """Base exception for all Zenoh errors"""
    pass

Usage Examples

Basic Liveliness

import zenoh
import time

session = zenoh.open()

# Declare liveliness token
token = session.liveliness.declare_token("services/temperature_monitor")

print("Service is alive...")

# Service runs for some time
time.sleep(10)

# Cleanup - this signals that service is no longer alive
token.undeclare()

session.close()

Liveliness with Context Manager

import zenoh
import time

session = zenoh.open()

# Use context manager for automatic cleanup
with session.liveliness.declare_token("services/data_processor") as token:
    print("Data processor service is running...")
    
    # Simulate service work
    for i in range(5):
        print(f"Processing batch {i+1}...")
        time.sleep(2)
    
    print("Service work complete")
# Token is automatically undeclared when exiting the context

session.close()

Monitoring Liveliness

import zenoh
import time

def liveliness_handler(sample):
    if sample.kind == zenoh.SampleKind.PUT:
        print(f"Service ONLINE: {sample.key_expr}")
    elif sample.kind == zenoh.SampleKind.DELETE:
        print(f"Service OFFLINE: {sample.key_expr}")

session = zenoh.open()

# Subscribe to liveliness changes for all services
subscriber = session.liveliness.declare_subscriber(
    "services/**", 
    liveliness_handler,
    history=True  # Get current liveliness state
)

print("Monitoring service liveliness...")

# Let it monitor for a while
time.sleep(30)

subscriber.undeclare()
session.close()

Querying Liveliness Status

import zenoh

session = zenoh.open()

# Query current liveliness status
print("Querying current service status...")
replies = session.liveliness.get("services/**", timeout=5.0)

active_services = []
for reply in replies:
    if reply.ok:
        service_key = str(reply.ok.key_expr)
        active_services.append(service_key)
        print(f"ACTIVE: {service_key}")

print(f"\nTotal active services: {len(active_services)}")

session.close()

Matching Status Monitoring

import zenoh
import time

def matching_handler(status):
    if status.matching:
        print("Publisher has subscribers!")
    else:
        print("No subscribers found")

session = zenoh.open()

# Create publisher with matching monitoring
publisher = session.declare_publisher("sensor/data")

# Monitor matching status
listener = publisher.declare_matching_listener(matching_handler)

# Check initial status
print(f"Initial matching status: {publisher.matching_status.matching}")

# Publish some data
for i in range(5):
    publisher.put(f"Data point {i}")
    time.sleep(1)

# Cleanup
listener.undeclare()
publisher.undeclare()
session.close()

Custom Handler Examples

import zenoh
from zenoh.handlers import FifoChannel, RingChannel, Callback
import threading
import time

session = zenoh.open()

# Example 1: FIFO Channel Handler
print("Example 1: FIFO Channel")
fifo_handler = FifoChannel(capacity=10)
subscriber1 = session.declare_subscriber("data/fifo", fifo_handler)

# Simulate some data
publisher = session.declare_publisher("data/fifo")
for i in range(5):
    publisher.put(f"FIFO message {i}")

# Receive from FIFO
try:
    sample = subscriber1.recv()  # Blocks until data available
    print(f"FIFO received: {sample.payload.to_string()}")
except:
    pass

subscriber1.undeclare()

# Example 2: Ring Channel Handler
print("\nExample 2: Ring Channel")
ring_handler = RingChannel(capacity=3)
subscriber2 = session.declare_subscriber("data/ring", ring_handler)

# Send more data than ring capacity
for i in range(5):
    publisher.put(f"Ring message {i}")

# Ring channel only keeps latest 3 messages
for sample in subscriber2:
    print(f"Ring received: {sample.payload.to_string()}")
    break  # Just get first one for demo

subscriber2.undeclare()

# Example 3: Callback Handler
print("\nExample 3: Callback Handler")
def my_callback(sample):
    print(f"Callback received: {sample.payload.to_string()}")

def cleanup_callback():
    print("Callback handler cleanup")

callback_handler = Callback(
    callback=my_callback,
    drop=cleanup_callback
)

subscriber3 = session.declare_subscriber("data/callback", callback_handler)

# Send test data
publisher.put("Callback test message")
time.sleep(0.1)  # Let callback process

subscriber3.undeclare()  # This will trigger cleanup_callback

publisher.undeclare()
session.close()

Advanced Error Handling

import zenoh
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def robust_session_example():
    session = None
    publisher = None
    
    try:
        # Open session with error handling
        config = zenoh.Config()
        session = zenoh.open(config)
        logger.info("Session opened successfully")
        
        # Declare publisher
        publisher = session.declare_publisher("robust/example")
        logger.info("Publisher declared")
        
        # Publish with error handling
        for i in range(10):
            try:
                publisher.put(f"Message {i}")
                logger.debug(f"Published message {i}")
            except zenoh.ZError as e:
                logger.error(f"Failed to publish message {i}: {e}")
                continue
    
    except zenoh.ZError as e:
        logger.error(f"Zenoh error: {e}")
        return False
    
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return False
    
    finally:
        # Cleanup with error handling
        if publisher:
            try:
                publisher.undeclare()
                logger.info("Publisher undeclared")
            except zenoh.ZError as e:
                logger.error(f"Error undeclaring publisher: {e}")
        
        if session:
            try:
                session.close()
                logger.info("Session closed")
            except zenoh.ZError as e:
                logger.error(f"Error closing session: {e}")
    
    return True

# Run the robust example
success = robust_session_example()
print(f"Example completed successfully: {success}")

Complete Advanced Features Example

import zenoh
from zenoh.handlers import FifoChannel, Callback
import threading
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DistributedService:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.session = None
        self.liveliness_token = None
        self.publisher = None
        self.subscriber = None
        self.matching_listener = None
        self.running = False
    
    def start(self):
        """Start the distributed service with full monitoring"""
        try:
            # Open session
            self.session = zenoh.open()
            logger.info(f"Service {self.service_name} session started")
            
            # Declare liveliness
            self.liveliness_token = self.session.liveliness.declare_token(
                f"services/{self.service_name}"
            )
            logger.info(f"Liveliness declared for {self.service_name}")
            
            # Setup publisher with matching monitoring
            self.publisher = self.session.declare_publisher(
                f"output/{self.service_name}"
            )
            
            def matching_handler(status):
                if status.matching:
                    logger.info(f"{self.service_name}: Consumers connected")
                else:
                    logger.warning(f"{self.service_name}: No consumers")
            
            self.matching_listener = self.publisher.declare_matching_listener(
                matching_handler
            )
            
            # Setup subscriber with custom handler
            handler = FifoChannel(capacity=100)
            self.subscriber = self.session.declare_subscriber(
                f"input/{self.service_name}",
                handler
            )
            
            self.running = True
            logger.info(f"Service {self.service_name} fully started")
            
        except zenoh.ZError as e:
            logger.error(f"Failed to start service {self.service_name}: {e}")
            self.stop()
            raise
    
    def process_data(self):
        """Main processing loop"""
        while self.running:
            try:
                # Try to receive input data
                sample = self.subscriber.try_recv()
                if sample:
                    input_data = sample.payload.to_string()
                    logger.debug(f"{self.service_name} processing: {input_data}")
                    
                    # Process the data (simulate some work)
                    processed = f"PROCESSED[{self.service_name}]: {input_data.upper()}"
                    
                    # Publish result
                    self.publisher.put(processed)
                    logger.debug(f"{self.service_name} output: {processed}")
                
                else:
                    # No data available, short sleep
                    time.sleep(0.1)
                    
            except zenoh.ZError as e:
                logger.error(f"Processing error in {self.service_name}: {e}")
                time.sleep(1)  # Brief pause before retry
            
            except KeyboardInterrupt:
                logger.info(f"Shutdown requested for {self.service_name}")
                break
    
    def stop(self):
        """Stop the service and cleanup resources"""
        self.running = False
        
        # Cleanup in reverse order
        if self.matching_listener:
            try:
                self.matching_listener.undeclare()
            except zenoh.ZError as e:
                logger.error(f"Error undeclaring matching listener: {e}")
        
        if self.subscriber:
            try:
                self.subscriber.undeclare()
            except zenoh.ZError as e:
                logger.error(f"Error undeclaring subscriber: {e}")
        
        if self.publisher:
            try:
                self.publisher.undeclare()
            except zenoh.ZError as e:
                logger.error(f"Error undeclaring publisher: {e}")
        
        if self.liveliness_token:
            try:
                self.liveliness_token.undeclare()
            except zenoh.ZError as e:
                logger.error(f"Error undeclaring liveliness: {e}")
        
        if self.session:
            try:
                self.session.close()
            except zenoh.ZError as e:
                logger.error(f"Error closing session: {e}")
        
        logger.info(f"Service {self.service_name} stopped")

def monitor_services():
    """Monitor all service liveliness"""
    session = zenoh.open()
    
    def liveliness_handler(sample):
        service_name = str(sample.key_expr).split('/')[-1]
        if sample.kind == zenoh.SampleKind.PUT:
            logger.info(f"MONITOR: Service {service_name} is ONLINE")
        else:
            logger.warning(f"MONITOR: Service {service_name} went OFFLINE")
    
    # Subscribe to all service liveliness with history
    monitor = session.liveliness.declare_subscriber(
        "services/**",
        liveliness_handler,
        history=True
    )
    
    # Let it monitor
    time.sleep(20)
    
    monitor.undeclare()
    session.close()
    logger.info("Service monitor stopped")

def main():
    # Start service monitor in background
    monitor_thread = threading.Thread(target=monitor_services)
    monitor_thread.start()
    
    # Create and start services
    service1 = DistributedService("processor_a")
    service2 = DistributedService("processor_b")
    
    try:
        service1.start()
        service2.start()
        
        # Start processing in separate threads
        thread1 = threading.Thread(target=service1.process_data)
        thread2 = threading.Thread(target=service2.process_data)
        
        thread1.start()
        thread2.start()
        
        # Send some test data
        test_session = zenoh.open()
        test_pub = test_session.declare_publisher("input/processor_a")
        
        for i in range(5):
            test_pub.put(f"test_data_{i}")
            time.sleep(2)
        
        test_pub.undeclare()
        test_session.close()
        
        # Let services run for a bit
        time.sleep(5)
        
    except KeyboardInterrupt:
        logger.info("Shutdown requested")
    
    finally:
        # Stop services
        service1.stop()
        service2.stop()
        
        # Wait for threads to finish
        monitor_thread.join(timeout=5)
        
        logger.info("All services stopped")

if __name__ == "__main__":
    main()

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json