CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-eclipse-zenoh

The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication

Pending
Overview
Eval results
Files

extensions.mddocs/

Extensions

The Zenoh extensions module (zenoh.ext) provides additional functionality including type-safe serialization utilities, custom numeric types, and advanced publisher/subscriber features with enhanced reliability and caching capabilities. These extensions enable more sophisticated data handling and communication patterns while maintaining compatibility with the core Zenoh API.

Capabilities

Serialization System

Type-safe serialization and deserialization of Python objects to/from ZBytes.

from zenoh.ext import z_serialize, z_deserialize, ZDeserializeError

def z_serialize(obj) -> ZBytes:
    """
    Serialize Python objects to ZBytes with automatic type detection.
    
    Parameters:
    - obj: Python object to serialize (supports basic types, collections, custom objects)
    
    Returns:
    ZBytes containing serialized data
    
    Raises:
    ZError: If serialization fails
    """

def z_deserialize(target_type, data: ZBytes):
    """
    Deserialize ZBytes to Python objects with type validation.
    
    Parameters:
    - target_type: Expected type for deserialization validation
    - data: ZBytes containing serialized data
    
    Returns:
    Deserialized Python object of specified type
    
    Raises:
    ZDeserializeError: If deserialization fails or type doesn't match
    """

class ZDeserializeError(Exception):
    """Exception raised when deserialization fails"""
    pass

Custom Numeric Types

Precise numeric types for cross-platform data exchange and scientific computing.

# Signed Integer Types
class Int8:
    """8-bit signed integer (-128 to 127)"""
    def __init__(self, value: int): ...

class Int16:
    """16-bit signed integer (-32,768 to 32,767)"""
    def __init__(self, value: int): ...

class Int32:
    """32-bit signed integer (-2,147,483,648 to 2,147,483,647)"""
    def __init__(self, value: int): ...

class Int64:
    """64-bit signed integer"""
    def __init__(self, value: int): ...

class Int128:
    """128-bit signed integer for very large numbers"""
    def __init__(self, value: int): ...

# Unsigned Integer Types
class UInt8:
    """8-bit unsigned integer (0 to 255)"""
    def __init__(self, value: int): ...

class UInt16:
    """16-bit unsigned integer (0 to 65,535)"""
    def __init__(self, value: int): ...

class UInt32:
    """32-bit unsigned integer (0 to 4,294,967,295)"""
    def __init__(self, value: int): ...

class UInt64:
    """64-bit unsigned integer"""
    def __init__(self, value: int): ...

class UInt128:
    """128-bit unsigned integer for very large numbers"""
    def __init__(self, value: int): ...

# Floating Point Types
class Float32:
    """32-bit IEEE 754 floating point number"""
    def __init__(self, value: float): ...

class Float64:
    """64-bit IEEE 754 floating point number"""
    def __init__(self, value: float): ...

Advanced Publisher (Unstable)

Enhanced publisher with additional features for reliable communication.

def declare_advanced_publisher(
    session,
    key_expr,
    encoding: Encoding = None,
    congestion_control: CongestionControl = None,
    priority: Priority = None,
    cache: CacheConfig = None,
    subscriber_detection: bool = None
) -> AdvancedPublisher:
    """
    Declare an advanced publisher with enhanced features (unstable).
    
    Parameters:
    - session: Zenoh session
    - key_expr: Key expression to publish on
    - encoding: Data encoding specification
    - congestion_control: Congestion control mode
    - priority: Message priority
    - cache: Cache configuration for late-joining subscribers
    - subscriber_detection: Enable subscriber detection
    
    Returns:
    AdvancedPublisher with enhanced capabilities
    """

class AdvancedPublisher:
    """Advanced publisher with additional features (unstable)"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the publisher's key expression"""
    
    @property
    def encoding(self) -> Encoding:
        """Get the publisher's encoding"""
    
    @property
    def congestion_control(self) -> CongestionControl:
        """Get congestion control setting"""
    
    @property
    def priority(self) -> Priority:
        """Get priority setting"""
    
    def put(
        self,
        payload,
        encoding: Encoding = None,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """
        Send data through the advanced publisher.
        
        Parameters:
        - payload: Data to send
        - encoding: Override default encoding
        - timestamp: Custom timestamp
        - attachment: Additional metadata
        """
    
    def delete(
        self,
        timestamp: Timestamp = None,
        attachment = None
    ) -> None:
        """Send a delete operation"""
    
    def undeclare(self) -> None:
        """Undeclare the advanced publisher"""

Advanced Subscriber (Unstable)

Enhanced subscriber with miss detection, recovery, and publisher detection.

def declare_advanced_subscriber(
    session,
    key_expr,
    handler = None,
    reliability: Reliability = None,
    recovery: RecoveryConfig = None,
    history: HistoryConfig = None,
    miss_detection: MissDetectionConfig = None
) -> AdvancedSubscriber:
    """
    Declare an advanced subscriber with enhanced features (unstable).
    
    Parameters:
    - session: Zenoh session
    - key_expr: Key expression pattern to subscribe to
    - handler: Handler for received samples
    - reliability: Reliability mode
    - recovery: Recovery configuration for missed samples
    - history: History configuration for late-joining
    - miss_detection: Configuration for detecting missed samples
    
    Returns:
    AdvancedSubscriber with enhanced capabilities
    """

class AdvancedSubscriber:
    """Advanced subscriber with additional features (unstable)"""
    
    @property
    def key_expr(self) -> KeyExpr:
        """Get the subscriber's key expression"""
    
    @property
    def handler(self):
        """Get the subscriber's handler"""
    
    def sample_miss_listener(self, handler) -> SampleMissListener:
        """
        Declare a listener for sample miss detection.
        
        Parameters:
        - handler: Handler for miss notifications
        
        Returns:
        SampleMissListener for monitoring missed samples
        """
    
    def detect_publishers(self, handler):
        """
        Enable publisher detection with callback.
        
        Parameters:
        - handler: Handler for publisher detection events
        """
    
    def undeclare(self) -> None:
        """Undeclare the advanced subscriber"""
    
    def try_recv(self):
        """Try to receive a sample without blocking"""
    
    def recv(self):
        """Receive a sample (blocking)"""
    
    def __iter__(self):
        """Iterate over received samples"""

Advanced Configuration

Configuration objects for advanced publisher/subscriber features.

class CacheConfig:
    """Cache configuration for late-joining subscribers (unstable)"""
    # Configuration details depend on implementation

class HistoryConfig:
    """History configuration for subscriber catch-up (unstable)"""
    # Configuration details depend on implementation

class MissDetectionConfig:
    """Configuration for detecting missed samples (unstable)"""
    # Configuration details depend on implementation

class RecoveryConfig:
    """Recovery configuration for missed sample recovery (unstable)"""
    # Configuration details depend on implementation

class RepliesConfig:
    """Configuration for query replies handling (unstable)"""
    # Configuration details depend on implementation

Miss Detection

Monitor and handle missed samples in data streams.

class Miss:
    """Information about missed samples (unstable)"""
    
    @property
    def source(self) -> ZenohId:
        """Source that missed samples are from"""
    
    @property
    def nb(self) -> int:
        """Number of missed samples"""

class SampleMissListener:
    """Listener for sample miss detection (unstable)"""
    
    def undeclare(self) -> None:
        """Undeclare the miss listener"""
    
    def try_recv(self):
        """Try to receive a miss notification without blocking"""
    
    def recv(self):
        """Receive a miss notification (blocking)"""
    
    def __iter__(self):
        """Iterate over miss notifications"""

Usage Examples

Basic Serialization

import zenoh.ext as zext
import zenoh

# Serialize various data types
data_list = [1, 2, 3, "hello", {"key": "value"}]
serialized = zext.z_serialize(data_list)

# Deserialize with type checking
try:
    deserialized = zext.z_deserialize(list, serialized)
    print(f"Deserialized: {deserialized}")
except zext.ZDeserializeError as e:
    print(f"Deserialization failed: {e}")

# Use in Zenoh communication
session = zenoh.open()
publisher = session.declare_publisher("data/serialized")

# Publish serialized data
publisher.put(serialized)

def handler(sample):
    try:
        data = zext.z_deserialize(list, sample.payload)
        print(f"Received: {data}")
    except zext.ZDeserializeError as e:
        print(f"Failed to deserialize: {e}")

subscriber = session.declare_subscriber("data/serialized", handler)

# Cleanup
import time
time.sleep(1)
subscriber.undeclare()
publisher.undeclare()
session.close()

Custom Numeric Types

import zenoh.ext as zext
import zenoh

session = zenoh.open()

# Create precise numeric values
temperature = zext.Float32(23.567)  # 32-bit precision
sensor_id = zext.UInt16(1024)       # 16-bit unsigned
timestamp = zext.Int64(1640995200)  # 64-bit signed

# Serialize numeric data
sensor_data = {
    "temperature": temperature,
    "sensor_id": sensor_id,
    "timestamp": timestamp
}

serialized_data = zext.z_serialize(sensor_data)

# Publish precise sensor data
publisher = session.declare_publisher("sensors/precise", 
                                    encoding=zenoh.Encoding.ZENOH_SERIALIZED)
publisher.put(serialized_data)

def precise_handler(sample):
    try:
        data = zext.z_deserialize(dict, sample.payload)
        print(f"Precise sensor reading:")
        print(f"  Temperature: {data['temperature']} (32-bit float)")
        print(f"  Sensor ID: {data['sensor_id']} (16-bit uint)")
        print(f"  Timestamp: {data['timestamp']} (64-bit int)")
    except zext.ZDeserializeError as e:
        print(f"Failed to deserialize sensor data: {e}")

subscriber = session.declare_subscriber("sensors/precise", precise_handler)

time.sleep(1)
subscriber.undeclare()
publisher.undeclare()
session.close()

Advanced Publisher with Cache

import zenoh
import zenoh.ext as zext
import time

session = zenoh.open()

# Note: This is unstable API - actual configuration may differ
try:
    # Declare advanced publisher with caching for late joiners
    advanced_pub = zext.declare_advanced_publisher(
        session,
        "data/cached",
        cache=zext.CacheConfig(),  # Enable caching
        subscriber_detection=True   # Detect subscribers
    )
    
    print("Advanced publisher declared with caching")
    
    # Publish data that will be cached
    for i in range(5):
        data = {"message": f"Cached message {i}", "timestamp": time.time()}
        serialized = zext.z_serialize(data)
        advanced_pub.put(serialized)
        print(f"Published cached message {i}")
        time.sleep(1)
    
    # Late-joining subscriber should receive cached data
    def late_subscriber_handler(sample):
        try:
            data = zext.z_deserialize(dict, sample.payload)
            print(f"Late subscriber received: {data['message']}")
        except zext.ZDeserializeError:
            print("Failed to deserialize cached data")
    
    print("Creating late-joining subscriber...")
    late_subscriber = session.declare_subscriber("data/cached", late_subscriber_handler)
    
    time.sleep(2)
    
    # Cleanup
    late_subscriber.undeclare()
    advanced_pub.undeclare()

except Exception as e:
    print(f"Advanced publisher features may not be available: {e}")

session.close()

Advanced Subscriber with Miss Detection

import zenoh
import zenoh.ext as zext
import time
import threading

session = zenoh.open()

# Note: This is unstable API - actual configuration may differ
try:
    def sample_handler(sample):
        try:
            data = zext.z_deserialize(dict, sample.payload)
            print(f"Received sample: {data['sequence']}")
        except zext.ZDeserializeError:
            print("Failed to deserialize sample")
    
    def miss_handler(miss):
        print(f"MISSED {miss.nb} samples from source {miss.source}")
    
    # Declare advanced subscriber with miss detection
    advanced_sub = zext.declare_advanced_subscriber(
        session,
        "data/reliable",
        handler=sample_handler,
        miss_detection=zext.MissDetectionConfig()
    )
    
    # Setup miss detection listener
    miss_listener = advanced_sub.sample_miss_listener(miss_handler)
    
    print("Advanced subscriber with miss detection ready")
    
    # Simulate publisher sending data with intentional gaps
    def publisher_thread():
        pub = session.declare_publisher("data/reliable")
        
        for i in range(10):
            if i == 5:  # Skip message 5 to simulate loss
                continue
                
            data = {"sequence": i, "payload": f"data_{i}"}
            serialized = zext.z_serialize(data)
            pub.put(serialized)
            time.sleep(0.5)
        
        pub.undeclare()
    
    # Start publisher in separate thread
    pub_thread = threading.Thread(target=publisher_thread)
    pub_thread.start()
    
    # Let it run
    time.sleep(6)
    
    # Cleanup
    pub_thread.join()
    miss_listener.undeclare()
    advanced_sub.undeclare()

except Exception as e:
    print(f"Advanced subscriber features may not be available: {e}")

session.close()

Complete Extension Example

import zenoh
import zenoh.ext as zext
import threading
import time
import random

class AdvancedDataProcessor:
    """Example using multiple extension features"""
    
    def __init__(self, processor_id: int):
        self.processor_id = processor_id
        self.session = zenoh.open()
        self.running = False
        
        # Use precise numeric types
        self.id = zext.UInt16(processor_id)
        self.processed_count = zext.UInt64(0)
    
    def start_processing(self):
        """Start the data processing service"""
        self.running = True
        
        try:
            # Setup advanced publisher
            self.publisher = zext.declare_advanced_publisher(
                self.session,
                f"results/processor_{self.processor_id}",
                cache=zext.CacheConfig()  # Cache results for late subscribers
            )
            
            # Setup advanced subscriber with miss detection
            def input_handler(sample):
                try:
                    data = zext.z_deserialize(dict, sample.payload)
                    self.process_data(data)
                except zext.ZDeserializeError as e:
                    print(f"Processor {self.processor_id}: Deserialization error: {e}")
            
            def miss_handler(miss):
                print(f"Processor {self.processor_id}: MISSED {miss.nb} inputs!")
            
            self.subscriber = zext.declare_advanced_subscriber(
                self.session,
                "input/data",
                handler=input_handler,
                miss_detection=zext.MissDetectionConfig()
            )
            
            self.miss_listener = self.subscriber.sample_miss_listener(miss_handler)
            
            print(f"Advanced processor {self.processor_id} started")
            
        except Exception as e:
            print(f"Failed to start advanced features: {e}")
            # Fallback to basic functionality
            self.publisher = self.session.declare_publisher(f"results/processor_{self.processor_id}")
            self.subscriber = self.session.declare_subscriber("input/data", self.basic_handler)
    
    def basic_handler(self, sample):
        """Fallback handler for basic functionality"""
        try:
            data = zext.z_deserialize(dict, sample.payload)
            self.process_data(data)
        except zext.ZDeserializeError as e:
            print(f"Processor {self.processor_id}: Basic deserialization error: {e}")
    
    def process_data(self, input_data):
        """Process input data and publish results"""
        # Simulate processing time
        time.sleep(random.uniform(0.1, 0.3))
        
        # Increment counter with precise type
        self.processed_count = zext.UInt64(int(self.processed_count) + 1)
        
        # Create result with precise numeric types
        result = {
            "processor_id": self.id,
            "input_sequence": input_data.get("sequence", 0),
            "processed_count": self.processed_count,
            "processing_time": zext.Float32(random.uniform(0.1, 0.3)),
            "result": f"PROCESSED_{input_data.get('value', 'unknown')}"
        }
        
        # Serialize and publish result
        serialized_result = zext.z_serialize(result)
        self.publisher.put(serialized_result)
        
        print(f"Processor {self.processor_id}: Processed #{int(self.processed_count)}")
    
    def stop(self):
        """Stop the processor and cleanup"""
        self.running = False
        
        if hasattr(self, 'miss_listener'):
            self.miss_listener.undeclare()
        if hasattr(self, 'subscriber'):
            self.subscriber.undeclare()
        if hasattr(self, 'publisher'):
            self.publisher.undeclare()
        
        self.session.close()
        print(f"Processor {self.processor_id} stopped")

def data_generator():
    """Generate test data"""
    session = zenoh.open()
    publisher = session.declare_publisher("input/data")
    
    for i in range(20):
        data = {
            "sequence": zext.UInt32(i),
            "value": f"data_item_{i}",
            "timestamp": zext.Float64(time.time())
        }
        
        serialized = zext.z_serialize(data)
        publisher.put(serialized)
        
        print(f"Generated data item {i}")
        time.sleep(0.5)
    
    publisher.undeclare()
    session.close()
    print("Data generator finished")

def result_monitor():
    """Monitor processing results"""
    session = zenoh.open()
    
    def result_handler(sample):
        try:
            result = zext.z_deserialize(dict, sample.payload)
            print(f"MONITOR: Processor {int(result['processor_id'])} "
                  f"completed #{int(result['processed_count'])}: {result['result']}")
        except zext.ZDeserializeError as e:
            print(f"Monitor deserialization error: {e}")
    
    subscriber = session.declare_subscriber("results/**", result_handler)
    
    time.sleep(15)  # Monitor for 15 seconds
    
    subscriber.undeclare()
    session.close()
    print("Result monitor stopped")

def main():
    """Main example execution"""
    print("Starting advanced Zenoh extension example...")
    
    # Create processors
    processor1 = AdvancedDataProcessor(1)
    processor2 = AdvancedDataProcessor(2)
    
    try:
        # Start processors
        processor1.start_processing()
        processor2.start_processing()
        
        # Start monitoring and data generation in separate threads
        monitor_thread = threading.Thread(target=result_monitor)
        generator_thread = threading.Thread(target=data_generator)
        
        monitor_thread.start()
        time.sleep(1)  # Let monitor start first
        generator_thread.start()
        
        # Wait for completion
        generator_thread.join()
        monitor_thread.join(timeout=5)
        
    except KeyboardInterrupt:
        print("Shutting down...")
    
    finally:
        processor1.stop()
        processor2.stop()
    
    print("Advanced extension example completed")

if __name__ == "__main__":
    main()

Install with Tessl CLI

npx tessl i tessl/pypi-eclipse-zenoh

docs

advanced.md

data-types.md

extensions.md

handlers.md

index.md

pubsub.md

query.md

session-management.md

tile.json