CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

handlers.mddocs/

Handler System

Zenoh's handler system provides flexible mechanisms for processing asynchronous data streams from subscribers, queryables, and other communication patterns. The handler system supports both Rust-style channels and Python-style callbacks, enabling efficient data processing patterns suitable for different application architectures.

Capabilities

Handler Interface

The base handler interface that all concrete handlers implement.

from zenoh.handlers import Handler

class Handler:
    """Base handler interface for processing data streams"""
    
    def try_recv(self):
        """
        Try to receive data without blocking.
        
        Returns:
        Data item if available, or raises exception if no data
        """
    
    def recv(self):
        """
        Receive data (blocking operation).
        
        Returns:
        Next data item from the handler
        """
    
    def __iter__(self):
        """
        Iterate over received data items.
        
        Returns:
        Iterator that yields data items as they arrive
        """
    
    def __next__(self):
        """
        Get next data item for iterator protocol.
        
        Returns:
        Next data item or raises StopIteration
        """

Default Handler

FIFO queue handler with automatic capacity management.

from zenoh.handlers import DefaultHandler

class DefaultHandler:
    """Default FIFO handler with unlimited capacity"""
    
    def try_recv(self):
        """Try to receive without blocking"""
    
    def recv(self):
        """Receive data (blocking)"""
    
    def __iter__(self):
        """Iterate over received data"""
    
    def __next__(self):
        """Iterator protocol implementation"""

FIFO Channel Handler

First-in-first-out channel with configurable capacity.

from zenoh.handlers import FifoChannel

class FifoChannel:
    """FIFO channel handler with configurable capacity"""
    
    def __init__(self, capacity: int):
        """
        Create FIFO channel with specified capacity.
        
        Parameters:
        - capacity: Maximum number of items to buffer
        """
    
    def try_recv(self):
        """Try to receive without blocking"""
    
    def recv(self):
        """Receive data (blocking)"""
    
    def __iter__(self):
        """Iterate over received data"""
    
    def __next__(self):
        """Iterator protocol implementation"""

Ring Channel Handler

Ring buffer handler that overwrites oldest data when capacity is exceeded.

from zenoh.handlers import RingChannel

class RingChannel:
    """Ring buffer handler with configurable capacity"""
    
    def __init__(self, capacity: int):
        """
        Create ring buffer channel with specified capacity.
        
        Parameters:
        - capacity: Maximum number of items to buffer (older items overwritten)
        """
    
    def try_recv(self):
        """Try to receive without blocking"""
    
    def recv(self):
        """Receive data (blocking)"""
    
    def __iter__(self):
        """Iterate over received data"""
    
    def __next__(self):
        """Iterator protocol implementation"""

Callback Handler

Python callback-based handler for immediate processing.

from zenoh.handlers import Callback

class Callback:
    """Callback handler for immediate data processing"""
    
    def __init__(
        self,
        callback: callable,
        drop: callable = None,
        indirect: bool = True
    ):
        """
        Create callback handler.
        
        Parameters:
        - callback: Function to call for each data item
        - drop: Optional cleanup function when handler is dropped
        - indirect: Whether to use indirect callback invocation
        """
    
    @property
    def callback(self) -> callable:
        """Get the callback function"""
    
    @property
    def drop(self) -> callable:
        """Get the drop callback function"""
    
    @property
    def indirect(self) -> bool:
        """Get indirect flag"""
    
    def try_recv(self):
        """Try to receive without blocking (not applicable for callbacks)"""
    
    def recv(self):
        """Receive data (not applicable for callbacks)"""
    
    def __iter__(self):
        """Iterate over received data (not applicable for callbacks)"""
    
    def __next__(self):
        """Iterator protocol (not applicable for callbacks)"""

Handler Usage in Zenoh Operations

Handlers are used throughout the Zenoh API for asynchronous data processing.

# Handler usage patterns in Zenoh operations

# Subscriber with different handler types
def declare_subscriber(self, key_expr, handler=None, **kwargs):
    """
    Declare subscriber with flexible handler options.
    
    Handler options:
    - None: Returns subscriber with DefaultHandler
    - callable: Python function - creates Callback handler
    - tuple(callable, handler): Python callback with custom handler
    - Handler instance: Uses provided handler directly
    """

# Queryable with handler
def declare_queryable(self, key_expr, handler, **kwargs):
    """
    Declare queryable with handler for processing queries.
    
    Handler receives Query objects for processing and replying.
    """

# Scout with handler
def scout(what=None, timeout=None, handler=None):
    """
    Scout with handler for discovery messages.
    
    Handler receives Hello messages from discovered nodes.
    """

Usage Examples

Default Handler Usage

import zenoh
from zenoh.handlers import DefaultHandler

session = zenoh.open()

# Subscriber with default handler (implicit)
subscriber = session.declare_subscriber("sensors/temperature")

# Process data using iterator pattern
print("Listening for temperature data...")
for sample in subscriber:
    temp = float(sample.payload.to_string())
    print(f"Temperature: {temp}°C from {sample.key_expr}")
    
    if temp > 30:  # Stop listening if too hot
        break

subscriber.undeclare()
session.close()

FIFO Channel Handler

import zenoh
from zenoh.handlers import FifoChannel

session = zenoh.open()

# Create FIFO channel with limited capacity
fifo_handler = FifoChannel(capacity=10)

# Subscriber with FIFO handler
subscriber = session.declare_subscriber("data/stream", fifo_handler)

print("Processing with FIFO channel...")

# Non-blocking processing
while True:
    try:
        sample = subscriber.try_recv()
        data = sample.payload.to_string()
        print(f"Processed: {data}")
    except:
        print("No data available, doing other work...")
        import time
        time.sleep(0.1)
        break

# Blocking processing
print("Switching to blocking mode...")
for i in range(5):
    sample = subscriber.recv()  # Blocks until data available
    print(f"Received: {sample.payload.to_string()}")

subscriber.undeclare()
session.close()

Ring Channel Handler

import zenoh
from zenoh.handlers import RingChannel
import time

session = zenoh.open()

# Create ring buffer - newest data overwrites oldest
ring_handler = RingChannel(capacity=5)

# Subscriber with ring buffer handler
subscriber = session.declare_subscriber("high_frequency/data", ring_handler)

# Publisher to generate high-frequency data
publisher = session.declare_publisher("high_frequency/data")

# Generate data faster than we can process
def generate_data():
    for i in range(20):
        publisher.put(f"data_point_{i}")
        time.sleep(0.01)  # Very fast publishing

import threading
generator = threading.Thread(target=generate_data)
generator.start()

# Slow processing - ring buffer will drop old data
time.sleep(0.5)  # Let data accumulate

print("Processing with ring buffer (may miss some data):")
while True:
    try:
        sample = subscriber.try_recv()
        data = sample.payload.to_string()
        print(f"Got: {data}")
        time.sleep(0.1)  # Slow processing
    except:
        break

generator.join()
publisher.undeclare()
subscriber.undeclare()
session.close()

Callback Handler

import zenoh
from zenoh.handlers import Callback

session = zenoh.open()

# Create callback for immediate processing
def data_callback(sample):
    data = sample.payload.to_string()
    timestamp = sample.timestamp
    print(f"Immediate processing: {data} at {timestamp}")
    
    # Process data immediately in callback context
    if "error" in data.lower():
        print("ERROR DETECTED - taking immediate action!")

def cleanup_callback():
    print("Callback handler cleanup")

# Create callback handler
callback_handler = Callback(
    callback=data_callback,
    drop=cleanup_callback,
    indirect=True
)

# Subscriber with callback handler
subscriber = session.declare_subscriber("alerts/system", callback_handler)

# Simulate some data
publisher = session.declare_publisher("alerts/system")

import time
publisher.put("System status: OK")
time.sleep(0.1)
publisher.put("System status: ERROR detected")
time.sleep(0.1)
publisher.put("System status: Recovered")
time.sleep(0.5)

publisher.undeclare()
subscriber.undeclare()  # Will trigger cleanup_callback
session.close()

Python Function as Handler

import zenoh

session = zenoh.open()

# Simple Python function as handler
def temperature_handler(sample):
    temp = float(sample.payload.to_string())
    location = str(sample.key_expr).split('/')[-1]
    
    print(f"Temperature in {location}: {temp}°C")
    
    if temp > 25:
        print(f"  -> {location} is warm!")
    elif temp < 15:
        print(f"  -> {location} is cold!")

# Zenoh automatically creates Callback handler
subscriber = session.declare_subscriber(
    "sensors/temperature/*", 
    temperature_handler
)

# Test data
publisher = session.declare_publisher("sensors/temperature/room1")
publisher.put("23.5")

publisher2 = session.declare_publisher("sensors/temperature/outside")
publisher2.put("12.3")

publisher3 = session.declare_publisher("sensors/temperature/office")
publisher3.put("26.8")

import time
time.sleep(1)

publisher.undeclare()
publisher2.undeclare()
publisher3.undeclare()
subscriber.undeclare()
session.close()

Handler with Custom Processing Logic

import zenoh
from zenoh.handlers import FifoChannel
import json
import threading
import queue

class ProcessingHandler:
    """Custom handler with background processing"""
    
    def __init__(self, max_batch_size=5):
        self.fifo = FifoChannel(capacity=100)
        self.batch_queue = queue.Queue()
        self.max_batch_size = max_batch_size
        self.processing_thread = None
        self.running = False
    
    def start_processing(self):
        """Start background processing thread"""
        self.running = True
        self.processing_thread = threading.Thread(target=self._process_batches)
        self.processing_thread.start()
    
    def stop_processing(self):
        """Stop background processing"""
        self.running = False
        if self.processing_thread:
            self.processing_thread.join()
    
    def _process_batches(self):
        """Background thread for batch processing"""
        batch = []
        
        while self.running:
            try:
                # Collect samples into batches
                sample = self.fifo.try_recv()
                batch.append(sample)
                
                if len(batch) >= self.max_batch_size:
                    self._process_batch(batch)
                    batch = []
                    
            except:
                # No data available, process current batch if any
                if batch:
                    self._process_batch(batch)
                    batch = []
                import time
                time.sleep(0.1)
    
    def _process_batch(self, batch):
        """Process a batch of samples"""
        values = []
        for sample in batch:
            try:
                data = json.loads(sample.payload.to_string())
                values.append(data['value'])
            except:
                continue
        
        if values:
            avg = sum(values) / len(values)
            print(f"Batch processed: {len(values)} samples, average = {avg:.2f}")

# Usage
session = zenoh.open()

# Create custom processing handler
processor = ProcessingHandler(max_batch_size=3)

# Use the FIFO part of our custom handler
subscriber = session.declare_subscriber("data/batch", processor.fifo)

# Start background processing
processor.start_processing()

# Generate test data
publisher = session.declare_publisher("data/batch")

for i in range(10):
    data = {"sequence": i, "value": i * 2.5 + 10}
    publisher.put(json.dumps(data))
    import time
    time.sleep(0.2)

# Let processing complete
time.sleep(2)

# Cleanup
processor.stop_processing()
publisher.undeclare()
subscriber.undeclare()
session.close()

Advanced Handler Composition

import zenoh
from zenoh.handlers import FifoChannel, Callback
import threading
import time

class CompositeHandler:
    """Handler that combines multiple processing strategies"""
    
    def __init__(self):
        # Primary handler for normal processing
        self.primary = FifoChannel(capacity=50)
        
        # Secondary handler for immediate alerts
        self.alert_callback = Callback(
            callback=self._handle_alert,
            indirect=True
        )
        
        self.alert_keywords = ["error", "critical", "failure"]
    
    def _handle_alert(self, sample):
        """Immediate alert processing"""
        data = sample.payload.to_string().lower()
        
        for keyword in self.alert_keywords:
            if keyword in data:
                print(f"🚨 ALERT: {sample.payload.to_string()}")
                # Could send notifications, emails, etc.
                break
    
    def get_primary_handler(self):
        """Get handler for normal data processing"""
        return self.primary
    
    def get_alert_handler(self):
        """Get handler for alert processing"""
        return self.alert_callback

# Usage
session = zenoh.open()

# Create composite handler
composite = CompositeHandler()

# Subscribe to normal data stream
data_subscriber = session.declare_subscriber(
    "system/logs", 
    composite.get_primary_handler()
)

# Subscribe to same stream for alerts (separate subscription)
alert_subscriber = session.declare_subscriber(
    "system/logs", 
    composite.get_alert_handler()
)

# Background thread for normal processing
def normal_processing():
    print("Normal processing started...")
    
    while True:
        try:
            sample = data_subscriber.try_recv()
            data = sample.payload.to_string()
            
            # Normal processing (logging, analysis, etc.)
            if "error" not in data.lower():
                print(f"Processing: {data}")
            
        except:
            time.sleep(0.1)
            continue

# Start normal processing thread
processing_thread = threading.Thread(target=normal_processing)
processing_thread.daemon = True
processing_thread.start()

# Generate test data
publisher = session.declare_publisher("system/logs")

test_messages = [
    "System startup completed",
    "User login: alice",
    "ERROR: Database connection failed",
    "Processing batch job #1234",
    "CRITICAL: Disk space low",
    "User logout: alice",
    "System maintenance scheduled"
]

for msg in test_messages:
    publisher.put(msg)
    time.sleep(0.5)

# Let processing continue
time.sleep(3)

# Cleanup
publisher.undeclare()
data_subscriber.undeclare()
alert_subscriber.undeclare()
session.close()

Handler Selection Guidelines

When to Use Each Handler Type

DefaultHandler:

  • General-purpose data processing
  • Simple applications with moderate data rates
  • When you don't need specific capacity control

FifoChannel:

  • When you need bounded memory usage
  • Applications that can tolerate data loss under high load
  • Batch processing scenarios

RingChannel:

  • High-frequency data where only recent values matter
  • Memory-constrained environments
  • Real-time systems prioritizing latest data

Callback:

  • Immediate processing requirements
  • Event-driven architectures
  • Alert and notification systems
  • Low-latency response needs

Python Functions:

  • Simple processing logic
  • Rapid prototyping
  • Educational examples
  • When callback functionality is sufficient

Performance Considerations

  1. Callback handlers have the lowest latency but block the receiving thread
  2. Channel handlers provide buffering but add some overhead
  3. Ring channels are most memory-efficient for high-frequency data
  4. FIFO channels provide guaranteed ordering but may consume more memory
  5. Custom handlers allow optimization for specific use cases but require more implementation effort

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json