The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
Advanced features in Zenoh provide sophisticated networking capabilities including liveliness detection, matching status monitoring, custom handler systems, and network administration tools. These features enable building robust, self-monitoring distributed systems with fine-grained control over communication patterns.
Monitor and advertise the availability of distributed system components.
class Liveliness:
"""Liveliness management for distributed system components"""
def declare_token(self, key_expr) -> LivelinessToken:
"""
Declare a liveliness token for a key expression.
Parameters:
- key_expr: Key expression to declare liveliness for
Returns:
LivelinessToken that maintains liveliness while active
"""
def get(
self,
key_expr,
handler = None,
timeout: float = None
):
"""
Query current liveliness status.
Parameters:
- key_expr: Key expression pattern to query
- handler: Handler for liveliness replies
- timeout: Query timeout in seconds
Returns:
Iterator over liveliness replies if no handler provided
"""
def declare_subscriber(
self,
key_expr,
handler,
history: bool = False
) -> Subscriber:
"""
Subscribe to liveliness changes.
Parameters:
- key_expr: Key expression pattern to monitor
- handler: Handler for liveliness change notifications
- history: Whether to receive historical liveliness data
Returns:
Subscriber for liveliness changes
"""
class LivelinessToken:
"""Liveliness token that maintains component availability"""
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - automatically undeclares token"""
def undeclare(self) -> None:
"""Undeclare the liveliness token"""Monitor whether publishers/subscribers and queriers/queryables have matching peers.
class MatchingStatus:
"""Entity matching status"""
@property
def matching(self) -> bool:
"""Whether there are matching entities on the network"""
class MatchingListener:
"""Matching status listener for monitoring connection changes"""
@property
def handler(self):
"""Get the listener's handler"""
def undeclare(self) -> None:
"""Undeclare the matching listener"""
def try_recv(self):
"""Try to receive matching status update without blocking"""
def recv(self):
"""Receive matching status update (blocking)"""
def __iter__(self):
"""Iterate over matching status updates"""Flexible handler system supporting various callback patterns and channel types.
from zenoh.handlers import Handler, DefaultHandler, FifoChannel, RingChannel, Callback
class Handler:
"""Generic handler interface for receiving data"""
def try_recv(self):
"""Try to receive data without blocking"""
def recv(self):
"""Receive data (blocking)"""
def __iter__(self):
"""Iterate over received data"""
def __next__(self):
"""Get next item in iteration"""
class DefaultHandler:
"""Default FIFO handler with unlimited capacity"""
pass
class FifoChannel:
"""FIFO channel with configurable capacity"""
def __init__(self, capacity: int):
"""
Create FIFO channel with specified capacity.
Parameters:
- capacity: Maximum number of items to buffer
"""
class RingChannel:
"""Ring channel that overwrites oldest data when full"""
def __init__(self, capacity: int):
"""
Create ring channel with fixed capacity.
Parameters:
- capacity: Fixed size of the ring buffer
"""
class Callback:
"""Callback handler wrapper for Python functions"""
@property
def callback(self):
"""Get the callback function"""
@property
def drop(self):
"""Get the drop callback (called when handler is dropped)"""
@property
def indirect(self) -> bool:
"""Whether callback is called indirectly"""
def __init__(
self,
callback,
drop = None,
indirect: bool = False
):
"""
Create callback handler.
Parameters:
- callback: Function to call with received data
- drop: Optional cleanup function
- indirect: Whether to use indirect calling
"""
def __call__(self, *args, **kwargs):
"""Call the wrapped callback function"""Tools for network configuration and monitoring.
# Access session's liveliness interface
@property
def liveliness(self) -> Liveliness:
"""Get session's liveliness interface"""
# Session operations for entity management
def undeclare(self, entity) -> None:
"""
Undeclare any Zenoh entity (publisher, subscriber, etc.)
Parameters:
- entity: The entity to undeclare
"""
def declare_keyexpr(self, key_expr: str) -> KeyExpr:
"""
Declare a key expression for optimized repeated usage.
Parameters:
- key_expr: Key expression string to optimize
Returns:
Optimized KeyExpr object
"""Comprehensive error handling for robust distributed applications.
class ZError(Exception):
"""Base exception for all Zenoh errors"""
passimport zenoh
import time
session = zenoh.open()
# Declare liveliness token
token = session.liveliness.declare_token("services/temperature_monitor")
print("Service is alive...")
# Service runs for some time
time.sleep(10)
# Cleanup - this signals that service is no longer alive
token.undeclare()
session.close()import zenoh
import time
session = zenoh.open()
# Use context manager for automatic cleanup
with session.liveliness.declare_token("services/data_processor") as token:
print("Data processor service is running...")
# Simulate service work
for i in range(5):
print(f"Processing batch {i+1}...")
time.sleep(2)
print("Service work complete")
# Token is automatically undeclared when exiting the context
session.close()import zenoh
import time
def liveliness_handler(sample):
if sample.kind == zenoh.SampleKind.PUT:
print(f"Service ONLINE: {sample.key_expr}")
elif sample.kind == zenoh.SampleKind.DELETE:
print(f"Service OFFLINE: {sample.key_expr}")
session = zenoh.open()
# Subscribe to liveliness changes for all services
subscriber = session.liveliness.declare_subscriber(
"services/**",
liveliness_handler,
history=True # Get current liveliness state
)
print("Monitoring service liveliness...")
# Let it monitor for a while
time.sleep(30)
subscriber.undeclare()
session.close()import zenoh
session = zenoh.open()
# Query current liveliness status
print("Querying current service status...")
replies = session.liveliness.get("services/**", timeout=5.0)
active_services = []
for reply in replies:
if reply.ok:
service_key = str(reply.ok.key_expr)
active_services.append(service_key)
print(f"ACTIVE: {service_key}")
print(f"\nTotal active services: {len(active_services)}")
session.close()import zenoh
import time
def matching_handler(status):
if status.matching:
print("Publisher has subscribers!")
else:
print("No subscribers found")
session = zenoh.open()
# Create publisher with matching monitoring
publisher = session.declare_publisher("sensor/data")
# Monitor matching status
listener = publisher.declare_matching_listener(matching_handler)
# Check initial status
print(f"Initial matching status: {publisher.matching_status.matching}")
# Publish some data
for i in range(5):
publisher.put(f"Data point {i}")
time.sleep(1)
# Cleanup
listener.undeclare()
publisher.undeclare()
session.close()import zenoh
from zenoh.handlers import FifoChannel, RingChannel, Callback
import threading
import time
session = zenoh.open()
# Example 1: FIFO Channel Handler
print("Example 1: FIFO Channel")
fifo_handler = FifoChannel(capacity=10)
subscriber1 = session.declare_subscriber("data/fifo", fifo_handler)
# Simulate some data
publisher = session.declare_publisher("data/fifo")
for i in range(5):
publisher.put(f"FIFO message {i}")
# Receive from FIFO
try:
sample = subscriber1.recv() # Blocks until data available
print(f"FIFO received: {sample.payload.to_string()}")
except:
pass
subscriber1.undeclare()
# Example 2: Ring Channel Handler
print("\nExample 2: Ring Channel")
ring_handler = RingChannel(capacity=3)
subscriber2 = session.declare_subscriber("data/ring", ring_handler)
# Send more data than ring capacity
for i in range(5):
publisher.put(f"Ring message {i}")
# Ring channel only keeps latest 3 messages
for sample in subscriber2:
print(f"Ring received: {sample.payload.to_string()}")
break # Just get first one for demo
subscriber2.undeclare()
# Example 3: Callback Handler
print("\nExample 3: Callback Handler")
def my_callback(sample):
print(f"Callback received: {sample.payload.to_string()}")
def cleanup_callback():
print("Callback handler cleanup")
callback_handler = Callback(
callback=my_callback,
drop=cleanup_callback
)
subscriber3 = session.declare_subscriber("data/callback", callback_handler)
# Send test data
publisher.put("Callback test message")
time.sleep(0.1) # Let callback process
subscriber3.undeclare() # This will trigger cleanup_callback
publisher.undeclare()
session.close()import zenoh
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_session_example():
session = None
publisher = None
try:
# Open session with error handling
config = zenoh.Config()
session = zenoh.open(config)
logger.info("Session opened successfully")
# Declare publisher
publisher = session.declare_publisher("robust/example")
logger.info("Publisher declared")
# Publish with error handling
for i in range(10):
try:
publisher.put(f"Message {i}")
logger.debug(f"Published message {i}")
except zenoh.ZError as e:
logger.error(f"Failed to publish message {i}: {e}")
continue
except zenoh.ZError as e:
logger.error(f"Zenoh error: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
finally:
# Cleanup with error handling
if publisher:
try:
publisher.undeclare()
logger.info("Publisher undeclared")
except zenoh.ZError as e:
logger.error(f"Error undeclaring publisher: {e}")
if session:
try:
session.close()
logger.info("Session closed")
except zenoh.ZError as e:
logger.error(f"Error closing session: {e}")
return True
# Run the robust example
success = robust_session_example()
print(f"Example completed successfully: {success}")import zenoh
from zenoh.handlers import FifoChannel, Callback
import threading
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DistributedService:
def __init__(self, service_name: str):
self.service_name = service_name
self.session = None
self.liveliness_token = None
self.publisher = None
self.subscriber = None
self.matching_listener = None
self.running = False
def start(self):
"""Start the distributed service with full monitoring"""
try:
# Open session
self.session = zenoh.open()
logger.info(f"Service {self.service_name} session started")
# Declare liveliness
self.liveliness_token = self.session.liveliness.declare_token(
f"services/{self.service_name}"
)
logger.info(f"Liveliness declared for {self.service_name}")
# Setup publisher with matching monitoring
self.publisher = self.session.declare_publisher(
f"output/{self.service_name}"
)
def matching_handler(status):
if status.matching:
logger.info(f"{self.service_name}: Consumers connected")
else:
logger.warning(f"{self.service_name}: No consumers")
self.matching_listener = self.publisher.declare_matching_listener(
matching_handler
)
# Setup subscriber with custom handler
handler = FifoChannel(capacity=100)
self.subscriber = self.session.declare_subscriber(
f"input/{self.service_name}",
handler
)
self.running = True
logger.info(f"Service {self.service_name} fully started")
except zenoh.ZError as e:
logger.error(f"Failed to start service {self.service_name}: {e}")
self.stop()
raise
def process_data(self):
"""Main processing loop"""
while self.running:
try:
# Try to receive input data
sample = self.subscriber.try_recv()
if sample:
input_data = sample.payload.to_string()
logger.debug(f"{self.service_name} processing: {input_data}")
# Process the data (simulate some work)
processed = f"PROCESSED[{self.service_name}]: {input_data.upper()}"
# Publish result
self.publisher.put(processed)
logger.debug(f"{self.service_name} output: {processed}")
else:
# No data available, short sleep
time.sleep(0.1)
except zenoh.ZError as e:
logger.error(f"Processing error in {self.service_name}: {e}")
time.sleep(1) # Brief pause before retry
except KeyboardInterrupt:
logger.info(f"Shutdown requested for {self.service_name}")
break
def stop(self):
"""Stop the service and cleanup resources"""
self.running = False
# Cleanup in reverse order
if self.matching_listener:
try:
self.matching_listener.undeclare()
except zenoh.ZError as e:
logger.error(f"Error undeclaring matching listener: {e}")
if self.subscriber:
try:
self.subscriber.undeclare()
except zenoh.ZError as e:
logger.error(f"Error undeclaring subscriber: {e}")
if self.publisher:
try:
self.publisher.undeclare()
except zenoh.ZError as e:
logger.error(f"Error undeclaring publisher: {e}")
if self.liveliness_token:
try:
self.liveliness_token.undeclare()
except zenoh.ZError as e:
logger.error(f"Error undeclaring liveliness: {e}")
if self.session:
try:
self.session.close()
except zenoh.ZError as e:
logger.error(f"Error closing session: {e}")
logger.info(f"Service {self.service_name} stopped")
def monitor_services():
"""Monitor all service liveliness"""
session = zenoh.open()
def liveliness_handler(sample):
service_name = str(sample.key_expr).split('/')[-1]
if sample.kind == zenoh.SampleKind.PUT:
logger.info(f"MONITOR: Service {service_name} is ONLINE")
else:
logger.warning(f"MONITOR: Service {service_name} went OFFLINE")
# Subscribe to all service liveliness with history
monitor = session.liveliness.declare_subscriber(
"services/**",
liveliness_handler,
history=True
)
# Let it monitor
time.sleep(20)
monitor.undeclare()
session.close()
logger.info("Service monitor stopped")
def main():
# Start service monitor in background
monitor_thread = threading.Thread(target=monitor_services)
monitor_thread.start()
# Create and start services
service1 = DistributedService("processor_a")
service2 = DistributedService("processor_b")
try:
service1.start()
service2.start()
# Start processing in separate threads
thread1 = threading.Thread(target=service1.process_data)
thread2 = threading.Thread(target=service2.process_data)
thread1.start()
thread2.start()
# Send some test data
test_session = zenoh.open()
test_pub = test_session.declare_publisher("input/processor_a")
for i in range(5):
test_pub.put(f"test_data_{i}")
time.sleep(2)
test_pub.undeclare()
test_session.close()
# Let services run for a bit
time.sleep(5)
except KeyboardInterrupt:
logger.info("Shutdown requested")
finally:
# Stop services
service1.stop()
service2.stop()
# Wait for threads to finish
monitor_thread.join(timeout=5)
logger.info("All services stopped")
if __name__ == "__main__":
main()Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh