The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
The Zenoh extensions module (zenoh.ext) provides additional functionality including type-safe serialization utilities, custom numeric types, and advanced publisher/subscriber features with enhanced reliability and caching capabilities. These extensions enable more sophisticated data handling and communication patterns while maintaining compatibility with the core Zenoh API.
Type-safe serialization and deserialization of Python objects to/from ZBytes.
from zenoh.ext import z_serialize, z_deserialize, ZDeserializeError
def z_serialize(obj) -> ZBytes:
"""
Serialize Python objects to ZBytes with automatic type detection.
Parameters:
- obj: Python object to serialize (supports basic types, collections, custom objects)
Returns:
ZBytes containing serialized data
Raises:
ZError: If serialization fails
"""
def z_deserialize(target_type, data: ZBytes):
"""
Deserialize ZBytes to Python objects with type validation.
Parameters:
- target_type: Expected type for deserialization validation
- data: ZBytes containing serialized data
Returns:
Deserialized Python object of specified type
Raises:
ZDeserializeError: If deserialization fails or type doesn't match
"""
class ZDeserializeError(Exception):
"""Exception raised when deserialization fails"""
passPrecise numeric types for cross-platform data exchange and scientific computing.
# Signed Integer Types
class Int8:
"""8-bit signed integer (-128 to 127)"""
def __init__(self, value: int): ...
class Int16:
"""16-bit signed integer (-32,768 to 32,767)"""
def __init__(self, value: int): ...
class Int32:
"""32-bit signed integer (-2,147,483,648 to 2,147,483,647)"""
def __init__(self, value: int): ...
class Int64:
"""64-bit signed integer"""
def __init__(self, value: int): ...
class Int128:
"""128-bit signed integer for very large numbers"""
def __init__(self, value: int): ...
# Unsigned Integer Types
class UInt8:
"""8-bit unsigned integer (0 to 255)"""
def __init__(self, value: int): ...
class UInt16:
"""16-bit unsigned integer (0 to 65,535)"""
def __init__(self, value: int): ...
class UInt32:
"""32-bit unsigned integer (0 to 4,294,967,295)"""
def __init__(self, value: int): ...
class UInt64:
"""64-bit unsigned integer"""
def __init__(self, value: int): ...
class UInt128:
"""128-bit unsigned integer for very large numbers"""
def __init__(self, value: int): ...
# Floating Point Types
class Float32:
"""32-bit IEEE 754 floating point number"""
def __init__(self, value: float): ...
class Float64:
"""64-bit IEEE 754 floating point number"""
def __init__(self, value: float): ...Enhanced publisher with additional features for reliable communication.
def declare_advanced_publisher(
session,
key_expr,
encoding: Encoding = None,
congestion_control: CongestionControl = None,
priority: Priority = None,
cache: CacheConfig = None,
subscriber_detection: bool = None
) -> AdvancedPublisher:
"""
Declare an advanced publisher with enhanced features (unstable).
Parameters:
- session: Zenoh session
- key_expr: Key expression to publish on
- encoding: Data encoding specification
- congestion_control: Congestion control mode
- priority: Message priority
- cache: Cache configuration for late-joining subscribers
- subscriber_detection: Enable subscriber detection
Returns:
AdvancedPublisher with enhanced capabilities
"""
class AdvancedPublisher:
"""Advanced publisher with additional features (unstable)"""
@property
def key_expr(self) -> KeyExpr:
"""Get the publisher's key expression"""
@property
def encoding(self) -> Encoding:
"""Get the publisher's encoding"""
@property
def congestion_control(self) -> CongestionControl:
"""Get congestion control setting"""
@property
def priority(self) -> Priority:
"""Get priority setting"""
def put(
self,
payload,
encoding: Encoding = None,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""
Send data through the advanced publisher.
Parameters:
- payload: Data to send
- encoding: Override default encoding
- timestamp: Custom timestamp
- attachment: Additional metadata
"""
def delete(
self,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""Send a delete operation"""
def undeclare(self) -> None:
"""Undeclare the advanced publisher"""Enhanced subscriber with miss detection, recovery, and publisher detection.
def declare_advanced_subscriber(
session,
key_expr,
handler = None,
reliability: Reliability = None,
recovery: RecoveryConfig = None,
history: HistoryConfig = None,
miss_detection: MissDetectionConfig = None
) -> AdvancedSubscriber:
"""
Declare an advanced subscriber with enhanced features (unstable).
Parameters:
- session: Zenoh session
- key_expr: Key expression pattern to subscribe to
- handler: Handler for received samples
- reliability: Reliability mode
- recovery: Recovery configuration for missed samples
- history: History configuration for late-joining
- miss_detection: Configuration for detecting missed samples
Returns:
AdvancedSubscriber with enhanced capabilities
"""
class AdvancedSubscriber:
"""Advanced subscriber with additional features (unstable)"""
@property
def key_expr(self) -> KeyExpr:
"""Get the subscriber's key expression"""
@property
def handler(self):
"""Get the subscriber's handler"""
def sample_miss_listener(self, handler) -> SampleMissListener:
"""
Declare a listener for sample miss detection.
Parameters:
- handler: Handler for miss notifications
Returns:
SampleMissListener for monitoring missed samples
"""
def detect_publishers(self, handler):
"""
Enable publisher detection with callback.
Parameters:
- handler: Handler for publisher detection events
"""
def undeclare(self) -> None:
"""Undeclare the advanced subscriber"""
def try_recv(self):
"""Try to receive a sample without blocking"""
def recv(self):
"""Receive a sample (blocking)"""
def __iter__(self):
"""Iterate over received samples"""Configuration objects for advanced publisher/subscriber features.
class CacheConfig:
"""Cache configuration for late-joining subscribers (unstable)"""
# Configuration details depend on implementation
class HistoryConfig:
"""History configuration for subscriber catch-up (unstable)"""
# Configuration details depend on implementation
class MissDetectionConfig:
"""Configuration for detecting missed samples (unstable)"""
# Configuration details depend on implementation
class RecoveryConfig:
"""Recovery configuration for missed sample recovery (unstable)"""
# Configuration details depend on implementation
class RepliesConfig:
"""Configuration for query replies handling (unstable)"""
# Configuration details depend on implementationMonitor and handle missed samples in data streams.
class Miss:
"""Information about missed samples (unstable)"""
@property
def source(self) -> ZenohId:
"""Source that missed samples are from"""
@property
def nb(self) -> int:
"""Number of missed samples"""
class SampleMissListener:
"""Listener for sample miss detection (unstable)"""
def undeclare(self) -> None:
"""Undeclare the miss listener"""
def try_recv(self):
"""Try to receive a miss notification without blocking"""
def recv(self):
"""Receive a miss notification (blocking)"""
def __iter__(self):
"""Iterate over miss notifications"""import zenoh.ext as zext
import zenoh
# Serialize various data types
data_list = [1, 2, 3, "hello", {"key": "value"}]
serialized = zext.z_serialize(data_list)
# Deserialize with type checking
try:
deserialized = zext.z_deserialize(list, serialized)
print(f"Deserialized: {deserialized}")
except zext.ZDeserializeError as e:
print(f"Deserialization failed: {e}")
# Use in Zenoh communication
session = zenoh.open()
publisher = session.declare_publisher("data/serialized")
# Publish serialized data
publisher.put(serialized)
def handler(sample):
try:
data = zext.z_deserialize(list, sample.payload)
print(f"Received: {data}")
except zext.ZDeserializeError as e:
print(f"Failed to deserialize: {e}")
subscriber = session.declare_subscriber("data/serialized", handler)
# Cleanup
import time
time.sleep(1)
subscriber.undeclare()
publisher.undeclare()
session.close()import zenoh.ext as zext
import zenoh
session = zenoh.open()
# Create precise numeric values
temperature = zext.Float32(23.567) # 32-bit precision
sensor_id = zext.UInt16(1024) # 16-bit unsigned
timestamp = zext.Int64(1640995200) # 64-bit signed
# Serialize numeric data
sensor_data = {
"temperature": temperature,
"sensor_id": sensor_id,
"timestamp": timestamp
}
serialized_data = zext.z_serialize(sensor_data)
# Publish precise sensor data
publisher = session.declare_publisher("sensors/precise",
encoding=zenoh.Encoding.ZENOH_SERIALIZED)
publisher.put(serialized_data)
def precise_handler(sample):
try:
data = zext.z_deserialize(dict, sample.payload)
print(f"Precise sensor reading:")
print(f" Temperature: {data['temperature']} (32-bit float)")
print(f" Sensor ID: {data['sensor_id']} (16-bit uint)")
print(f" Timestamp: {data['timestamp']} (64-bit int)")
except zext.ZDeserializeError as e:
print(f"Failed to deserialize sensor data: {e}")
subscriber = session.declare_subscriber("sensors/precise", precise_handler)
time.sleep(1)
subscriber.undeclare()
publisher.undeclare()
session.close()import zenoh
import zenoh.ext as zext
import time
session = zenoh.open()
# Note: This is unstable API - actual configuration may differ
try:
# Declare advanced publisher with caching for late joiners
advanced_pub = zext.declare_advanced_publisher(
session,
"data/cached",
cache=zext.CacheConfig(), # Enable caching
subscriber_detection=True # Detect subscribers
)
print("Advanced publisher declared with caching")
# Publish data that will be cached
for i in range(5):
data = {"message": f"Cached message {i}", "timestamp": time.time()}
serialized = zext.z_serialize(data)
advanced_pub.put(serialized)
print(f"Published cached message {i}")
time.sleep(1)
# Late-joining subscriber should receive cached data
def late_subscriber_handler(sample):
try:
data = zext.z_deserialize(dict, sample.payload)
print(f"Late subscriber received: {data['message']}")
except zext.ZDeserializeError:
print("Failed to deserialize cached data")
print("Creating late-joining subscriber...")
late_subscriber = session.declare_subscriber("data/cached", late_subscriber_handler)
time.sleep(2)
# Cleanup
late_subscriber.undeclare()
advanced_pub.undeclare()
except Exception as e:
print(f"Advanced publisher features may not be available: {e}")
session.close()import zenoh
import zenoh.ext as zext
import time
import threading
session = zenoh.open()
# Note: This is unstable API - actual configuration may differ
try:
def sample_handler(sample):
try:
data = zext.z_deserialize(dict, sample.payload)
print(f"Received sample: {data['sequence']}")
except zext.ZDeserializeError:
print("Failed to deserialize sample")
def miss_handler(miss):
print(f"MISSED {miss.nb} samples from source {miss.source}")
# Declare advanced subscriber with miss detection
advanced_sub = zext.declare_advanced_subscriber(
session,
"data/reliable",
handler=sample_handler,
miss_detection=zext.MissDetectionConfig()
)
# Setup miss detection listener
miss_listener = advanced_sub.sample_miss_listener(miss_handler)
print("Advanced subscriber with miss detection ready")
# Simulate publisher sending data with intentional gaps
def publisher_thread():
pub = session.declare_publisher("data/reliable")
for i in range(10):
if i == 5: # Skip message 5 to simulate loss
continue
data = {"sequence": i, "payload": f"data_{i}"}
serialized = zext.z_serialize(data)
pub.put(serialized)
time.sleep(0.5)
pub.undeclare()
# Start publisher in separate thread
pub_thread = threading.Thread(target=publisher_thread)
pub_thread.start()
# Let it run
time.sleep(6)
# Cleanup
pub_thread.join()
miss_listener.undeclare()
advanced_sub.undeclare()
except Exception as e:
print(f"Advanced subscriber features may not be available: {e}")
session.close()import zenoh
import zenoh.ext as zext
import threading
import time
import random
class AdvancedDataProcessor:
"""Example using multiple extension features"""
def __init__(self, processor_id: int):
self.processor_id = processor_id
self.session = zenoh.open()
self.running = False
# Use precise numeric types
self.id = zext.UInt16(processor_id)
self.processed_count = zext.UInt64(0)
def start_processing(self):
"""Start the data processing service"""
self.running = True
try:
# Setup advanced publisher
self.publisher = zext.declare_advanced_publisher(
self.session,
f"results/processor_{self.processor_id}",
cache=zext.CacheConfig() # Cache results for late subscribers
)
# Setup advanced subscriber with miss detection
def input_handler(sample):
try:
data = zext.z_deserialize(dict, sample.payload)
self.process_data(data)
except zext.ZDeserializeError as e:
print(f"Processor {self.processor_id}: Deserialization error: {e}")
def miss_handler(miss):
print(f"Processor {self.processor_id}: MISSED {miss.nb} inputs!")
self.subscriber = zext.declare_advanced_subscriber(
self.session,
"input/data",
handler=input_handler,
miss_detection=zext.MissDetectionConfig()
)
self.miss_listener = self.subscriber.sample_miss_listener(miss_handler)
print(f"Advanced processor {self.processor_id} started")
except Exception as e:
print(f"Failed to start advanced features: {e}")
# Fallback to basic functionality
self.publisher = self.session.declare_publisher(f"results/processor_{self.processor_id}")
self.subscriber = self.session.declare_subscriber("input/data", self.basic_handler)
def basic_handler(self, sample):
"""Fallback handler for basic functionality"""
try:
data = zext.z_deserialize(dict, sample.payload)
self.process_data(data)
except zext.ZDeserializeError as e:
print(f"Processor {self.processor_id}: Basic deserialization error: {e}")
def process_data(self, input_data):
"""Process input data and publish results"""
# Simulate processing time
time.sleep(random.uniform(0.1, 0.3))
# Increment counter with precise type
self.processed_count = zext.UInt64(int(self.processed_count) + 1)
# Create result with precise numeric types
result = {
"processor_id": self.id,
"input_sequence": input_data.get("sequence", 0),
"processed_count": self.processed_count,
"processing_time": zext.Float32(random.uniform(0.1, 0.3)),
"result": f"PROCESSED_{input_data.get('value', 'unknown')}"
}
# Serialize and publish result
serialized_result = zext.z_serialize(result)
self.publisher.put(serialized_result)
print(f"Processor {self.processor_id}: Processed #{int(self.processed_count)}")
def stop(self):
"""Stop the processor and cleanup"""
self.running = False
if hasattr(self, 'miss_listener'):
self.miss_listener.undeclare()
if hasattr(self, 'subscriber'):
self.subscriber.undeclare()
if hasattr(self, 'publisher'):
self.publisher.undeclare()
self.session.close()
print(f"Processor {self.processor_id} stopped")
def data_generator():
"""Generate test data"""
session = zenoh.open()
publisher = session.declare_publisher("input/data")
for i in range(20):
data = {
"sequence": zext.UInt32(i),
"value": f"data_item_{i}",
"timestamp": zext.Float64(time.time())
}
serialized = zext.z_serialize(data)
publisher.put(serialized)
print(f"Generated data item {i}")
time.sleep(0.5)
publisher.undeclare()
session.close()
print("Data generator finished")
def result_monitor():
"""Monitor processing results"""
session = zenoh.open()
def result_handler(sample):
try:
result = zext.z_deserialize(dict, sample.payload)
print(f"MONITOR: Processor {int(result['processor_id'])} "
f"completed #{int(result['processed_count'])}: {result['result']}")
except zext.ZDeserializeError as e:
print(f"Monitor deserialization error: {e}")
subscriber = session.declare_subscriber("results/**", result_handler)
time.sleep(15) # Monitor for 15 seconds
subscriber.undeclare()
session.close()
print("Result monitor stopped")
def main():
"""Main example execution"""
print("Starting advanced Zenoh extension example...")
# Create processors
processor1 = AdvancedDataProcessor(1)
processor2 = AdvancedDataProcessor(2)
try:
# Start processors
processor1.start_processing()
processor2.start_processing()
# Start monitoring and data generation in separate threads
monitor_thread = threading.Thread(target=result_monitor)
generator_thread = threading.Thread(target=data_generator)
monitor_thread.start()
time.sleep(1) # Let monitor start first
generator_thread.start()
# Wait for completion
generator_thread.join()
monitor_thread.join(timeout=5)
except KeyboardInterrupt:
print("Shutting down...")
finally:
processor1.stop()
processor2.stop()
print("Advanced extension example completed")
if __name__ == "__main__":
main()Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh