The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
Zenoh's handler system provides flexible mechanisms for processing asynchronous data streams from subscribers, queryables, and other communication patterns. The handler system supports both Rust-style channels and Python-style callbacks, enabling efficient data processing patterns suitable for different application architectures.
The base handler interface that all concrete handlers implement.
from zenoh.handlers import Handler
class Handler:
"""Base handler interface for processing data streams"""
def try_recv(self):
"""
Try to receive data without blocking.
Returns:
Data item if available, or raises exception if no data
"""
def recv(self):
"""
Receive data (blocking operation).
Returns:
Next data item from the handler
"""
def __iter__(self):
"""
Iterate over received data items.
Returns:
Iterator that yields data items as they arrive
"""
def __next__(self):
"""
Get next data item for iterator protocol.
Returns:
Next data item or raises StopIteration
"""FIFO queue handler with automatic capacity management.
from zenoh.handlers import DefaultHandler
class DefaultHandler:
"""Default FIFO handler with unlimited capacity"""
def try_recv(self):
"""Try to receive without blocking"""
def recv(self):
"""Receive data (blocking)"""
def __iter__(self):
"""Iterate over received data"""
def __next__(self):
"""Iterator protocol implementation"""First-in-first-out channel with configurable capacity.
from zenoh.handlers import FifoChannel
class FifoChannel:
"""FIFO channel handler with configurable capacity"""
def __init__(self, capacity: int):
"""
Create FIFO channel with specified capacity.
Parameters:
- capacity: Maximum number of items to buffer
"""
def try_recv(self):
"""Try to receive without blocking"""
def recv(self):
"""Receive data (blocking)"""
def __iter__(self):
"""Iterate over received data"""
def __next__(self):
"""Iterator protocol implementation"""Ring buffer handler that overwrites oldest data when capacity is exceeded.
from zenoh.handlers import RingChannel
class RingChannel:
"""Ring buffer handler with configurable capacity"""
def __init__(self, capacity: int):
"""
Create ring buffer channel with specified capacity.
Parameters:
- capacity: Maximum number of items to buffer (older items overwritten)
"""
def try_recv(self):
"""Try to receive without blocking"""
def recv(self):
"""Receive data (blocking)"""
def __iter__(self):
"""Iterate over received data"""
def __next__(self):
"""Iterator protocol implementation"""Python callback-based handler for immediate processing.
from zenoh.handlers import Callback
class Callback:
"""Callback handler for immediate data processing"""
def __init__(
self,
callback: callable,
drop: callable = None,
indirect: bool = True
):
"""
Create callback handler.
Parameters:
- callback: Function to call for each data item
- drop: Optional cleanup function when handler is dropped
- indirect: Whether to use indirect callback invocation
"""
@property
def callback(self) -> callable:
"""Get the callback function"""
@property
def drop(self) -> callable:
"""Get the drop callback function"""
@property
def indirect(self) -> bool:
"""Get indirect flag"""
def try_recv(self):
"""Try to receive without blocking (not applicable for callbacks)"""
def recv(self):
"""Receive data (not applicable for callbacks)"""
def __iter__(self):
"""Iterate over received data (not applicable for callbacks)"""
def __next__(self):
"""Iterator protocol (not applicable for callbacks)"""Handlers are used throughout the Zenoh API for asynchronous data processing.
# Handler usage patterns in Zenoh operations
# Subscriber with different handler types
def declare_subscriber(self, key_expr, handler=None, **kwargs):
"""
Declare subscriber with flexible handler options.
Handler options:
- None: Returns subscriber with DefaultHandler
- callable: Python function - creates Callback handler
- tuple(callable, handler): Python callback with custom handler
- Handler instance: Uses provided handler directly
"""
# Queryable with handler
def declare_queryable(self, key_expr, handler, **kwargs):
"""
Declare queryable with handler for processing queries.
Handler receives Query objects for processing and replying.
"""
# Scout with handler
def scout(what=None, timeout=None, handler=None):
"""
Scout with handler for discovery messages.
Handler receives Hello messages from discovered nodes.
"""import zenoh
from zenoh.handlers import DefaultHandler
session = zenoh.open()
# Subscriber with default handler (implicit)
subscriber = session.declare_subscriber("sensors/temperature")
# Process data using iterator pattern
print("Listening for temperature data...")
for sample in subscriber:
temp = float(sample.payload.to_string())
print(f"Temperature: {temp}°C from {sample.key_expr}")
if temp > 30: # Stop listening if too hot
break
subscriber.undeclare()
session.close()import zenoh
from zenoh.handlers import FifoChannel
session = zenoh.open()
# Create FIFO channel with limited capacity
fifo_handler = FifoChannel(capacity=10)
# Subscriber with FIFO handler
subscriber = session.declare_subscriber("data/stream", fifo_handler)
print("Processing with FIFO channel...")
# Non-blocking processing
while True:
try:
sample = subscriber.try_recv()
data = sample.payload.to_string()
print(f"Processed: {data}")
except:
print("No data available, doing other work...")
import time
time.sleep(0.1)
break
# Blocking processing
print("Switching to blocking mode...")
for i in range(5):
sample = subscriber.recv() # Blocks until data available
print(f"Received: {sample.payload.to_string()}")
subscriber.undeclare()
session.close()import zenoh
from zenoh.handlers import RingChannel
import time
session = zenoh.open()
# Create ring buffer - newest data overwrites oldest
ring_handler = RingChannel(capacity=5)
# Subscriber with ring buffer handler
subscriber = session.declare_subscriber("high_frequency/data", ring_handler)
# Publisher to generate high-frequency data
publisher = session.declare_publisher("high_frequency/data")
# Generate data faster than we can process
def generate_data():
for i in range(20):
publisher.put(f"data_point_{i}")
time.sleep(0.01) # Very fast publishing
import threading
generator = threading.Thread(target=generate_data)
generator.start()
# Slow processing - ring buffer will drop old data
time.sleep(0.5) # Let data accumulate
print("Processing with ring buffer (may miss some data):")
while True:
try:
sample = subscriber.try_recv()
data = sample.payload.to_string()
print(f"Got: {data}")
time.sleep(0.1) # Slow processing
except:
break
generator.join()
publisher.undeclare()
subscriber.undeclare()
session.close()import zenoh
from zenoh.handlers import Callback
session = zenoh.open()
# Create callback for immediate processing
def data_callback(sample):
data = sample.payload.to_string()
timestamp = sample.timestamp
print(f"Immediate processing: {data} at {timestamp}")
# Process data immediately in callback context
if "error" in data.lower():
print("ERROR DETECTED - taking immediate action!")
def cleanup_callback():
print("Callback handler cleanup")
# Create callback handler
callback_handler = Callback(
callback=data_callback,
drop=cleanup_callback,
indirect=True
)
# Subscriber with callback handler
subscriber = session.declare_subscriber("alerts/system", callback_handler)
# Simulate some data
publisher = session.declare_publisher("alerts/system")
import time
publisher.put("System status: OK")
time.sleep(0.1)
publisher.put("System status: ERROR detected")
time.sleep(0.1)
publisher.put("System status: Recovered")
time.sleep(0.5)
publisher.undeclare()
subscriber.undeclare() # Will trigger cleanup_callback
session.close()import zenoh
session = zenoh.open()
# Simple Python function as handler
def temperature_handler(sample):
temp = float(sample.payload.to_string())
location = str(sample.key_expr).split('/')[-1]
print(f"Temperature in {location}: {temp}°C")
if temp > 25:
print(f" -> {location} is warm!")
elif temp < 15:
print(f" -> {location} is cold!")
# Zenoh automatically creates Callback handler
subscriber = session.declare_subscriber(
"sensors/temperature/*",
temperature_handler
)
# Test data
publisher = session.declare_publisher("sensors/temperature/room1")
publisher.put("23.5")
publisher2 = session.declare_publisher("sensors/temperature/outside")
publisher2.put("12.3")
publisher3 = session.declare_publisher("sensors/temperature/office")
publisher3.put("26.8")
import time
time.sleep(1)
publisher.undeclare()
publisher2.undeclare()
publisher3.undeclare()
subscriber.undeclare()
session.close()import zenoh
from zenoh.handlers import FifoChannel
import json
import threading
import queue
class ProcessingHandler:
"""Custom handler with background processing"""
def __init__(self, max_batch_size=5):
self.fifo = FifoChannel(capacity=100)
self.batch_queue = queue.Queue()
self.max_batch_size = max_batch_size
self.processing_thread = None
self.running = False
def start_processing(self):
"""Start background processing thread"""
self.running = True
self.processing_thread = threading.Thread(target=self._process_batches)
self.processing_thread.start()
def stop_processing(self):
"""Stop background processing"""
self.running = False
if self.processing_thread:
self.processing_thread.join()
def _process_batches(self):
"""Background thread for batch processing"""
batch = []
while self.running:
try:
# Collect samples into batches
sample = self.fifo.try_recv()
batch.append(sample)
if len(batch) >= self.max_batch_size:
self._process_batch(batch)
batch = []
except:
# No data available, process current batch if any
if batch:
self._process_batch(batch)
batch = []
import time
time.sleep(0.1)
def _process_batch(self, batch):
"""Process a batch of samples"""
values = []
for sample in batch:
try:
data = json.loads(sample.payload.to_string())
values.append(data['value'])
except:
continue
if values:
avg = sum(values) / len(values)
print(f"Batch processed: {len(values)} samples, average = {avg:.2f}")
# Usage
session = zenoh.open()
# Create custom processing handler
processor = ProcessingHandler(max_batch_size=3)
# Use the FIFO part of our custom handler
subscriber = session.declare_subscriber("data/batch", processor.fifo)
# Start background processing
processor.start_processing()
# Generate test data
publisher = session.declare_publisher("data/batch")
for i in range(10):
data = {"sequence": i, "value": i * 2.5 + 10}
publisher.put(json.dumps(data))
import time
time.sleep(0.2)
# Let processing complete
time.sleep(2)
# Cleanup
processor.stop_processing()
publisher.undeclare()
subscriber.undeclare()
session.close()import zenoh
from zenoh.handlers import FifoChannel, Callback
import threading
import time
class CompositeHandler:
"""Handler that combines multiple processing strategies"""
def __init__(self):
# Primary handler for normal processing
self.primary = FifoChannel(capacity=50)
# Secondary handler for immediate alerts
self.alert_callback = Callback(
callback=self._handle_alert,
indirect=True
)
self.alert_keywords = ["error", "critical", "failure"]
def _handle_alert(self, sample):
"""Immediate alert processing"""
data = sample.payload.to_string().lower()
for keyword in self.alert_keywords:
if keyword in data:
print(f"🚨 ALERT: {sample.payload.to_string()}")
# Could send notifications, emails, etc.
break
def get_primary_handler(self):
"""Get handler for normal data processing"""
return self.primary
def get_alert_handler(self):
"""Get handler for alert processing"""
return self.alert_callback
# Usage
session = zenoh.open()
# Create composite handler
composite = CompositeHandler()
# Subscribe to normal data stream
data_subscriber = session.declare_subscriber(
"system/logs",
composite.get_primary_handler()
)
# Subscribe to same stream for alerts (separate subscription)
alert_subscriber = session.declare_subscriber(
"system/logs",
composite.get_alert_handler()
)
# Background thread for normal processing
def normal_processing():
print("Normal processing started...")
while True:
try:
sample = data_subscriber.try_recv()
data = sample.payload.to_string()
# Normal processing (logging, analysis, etc.)
if "error" not in data.lower():
print(f"Processing: {data}")
except:
time.sleep(0.1)
continue
# Start normal processing thread
processing_thread = threading.Thread(target=normal_processing)
processing_thread.daemon = True
processing_thread.start()
# Generate test data
publisher = session.declare_publisher("system/logs")
test_messages = [
"System startup completed",
"User login: alice",
"ERROR: Database connection failed",
"Processing batch job #1234",
"CRITICAL: Disk space low",
"User logout: alice",
"System maintenance scheduled"
]
for msg in test_messages:
publisher.put(msg)
time.sleep(0.5)
# Let processing continue
time.sleep(3)
# Cleanup
publisher.undeclare()
data_subscriber.undeclare()
alert_subscriber.undeclare()
session.close()DefaultHandler:
FifoChannel:
RingChannel:
Callback:
Python Functions:
Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh