The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
Zenoh provides a rich set of data types for efficient data handling, addressing, and metadata management. These types enable zero-copy operations, flexible encoding specifications, and precise temporal ordering while maintaining type safety across the distributed system.
Core data structures for efficient payload handling and zero-copy operations.
class ZBytes:
"""Serialized bytes container for zero-copy data handling"""
def __init__(self, data):
"""
Create ZBytes from various data types.
Parameters:
- data: str, bytes, bytearray, or other serializable data
"""
def to_bytes(self) -> bytes:
"""Convert to Python bytes"""
def to_string(self) -> str:
"""Convert to Python string (UTF-8 decoded)"""
def __bool__(self) -> bool:
"""Check if ZBytes is non-empty"""
def __len__(self) -> int:
"""Get length in bytes"""
def __bytes__(self) -> bytes:
"""Convert to bytes via bytes() function"""
def __str__(self) -> str:
"""String representation"""
def __eq__(self, other) -> bool:
"""Equality comparison"""
def __hash__(self) -> int:
"""Hash for use in sets and dicts"""Data encoding and schema information for type-safe communication.
class Encoding:
"""Data encoding specification"""
def __init__(self, encoding_type: str):
"""Create encoding from string specification"""
def with_schema(self, schema: str) -> Encoding:
"""Add schema information to encoding"""
def __eq__(self, other) -> bool:
"""Equality comparison"""
def __hash__(self) -> int:
"""Hash for use in sets and dicts"""
def __str__(self) -> str:
"""String representation"""
# Standard encoding constants
ZENOH_BYTES = ... # Raw bytes
ZENOH_STRING = ... # UTF-8 string
ZENOH_SERIALIZED = ... # Zenoh serialized data
APPLICATION_JSON = ... # JSON format
APPLICATION_CBOR = ... # CBOR format
APPLICATION_YAML = ... # YAML format
APPLICATION_PYTHON_SERIALIZED_OBJECT = ... # Python pickle
APPLICATION_PROTOBUF = ... # Protocol Buffers
APPLICATION_JAVA_SERIALIZED_OBJECT = ... # Java serialization
APPLICATION_OPENMETRICS_TEXT = ... # OpenMetrics text format
IMAGE_PNG = ... # PNG image
IMAGE_JPEG = ... # JPEG image
TEXT_PLAIN = ... # Plain text
TEXT_JSON = ... # JSON as text
TEXT_HTML = ... # HTML
TEXT_XML = ... # XML
TEXT_CSS = ... # CSS
TEXT_CSV = ... # CSV
TEXT_JAVASCRIPT = ... # JavaScriptHierarchical addressing scheme for resource identification and pattern matching.
class KeyExpr:
"""Key expression for addressing resources"""
def __init__(self, key: str):
"""Create key expression from string"""
@staticmethod
def autocanonize(key: str) -> KeyExpr:
"""Create and canonicalize key expression"""
def intersects(self, other: KeyExpr) -> bool:
"""Check if this key expression intersects with another"""
def includes(self, other: KeyExpr) -> bool:
"""Check if this key expression includes another"""
def relation_to(self, other: KeyExpr) -> SetIntersectionLevel:
"""Get relationship to another key expression"""
def join(self, other: KeyExpr) -> KeyExpr:
"""Join two key expressions"""
def concat(self, suffix: str) -> KeyExpr:
"""Concatenate string suffix to key expression"""
def __str__(self) -> str:
"""String representation"""
class SetIntersectionLevel:
"""Key expression set relations (unstable)"""
DISJOINT = ... # No intersection
INTERSECTS = ... # Some intersection
INCLUDES = ... # This includes other
EQUALS = ... # Exactly equalKey-value parameters and selector combinations for queries and configuration.
class Parameters:
"""Key-value parameters"""
def __init__(self, params: str = None):
"""Create parameters from string (key1=value1;key2=value2)"""
def is_empty(self) -> bool:
"""Check if parameters are empty"""
def get(self, key: str):
"""Get parameter value by key"""
def values(self, key: str) -> list:
"""Get all values for a key (for multi-value parameters)"""
def insert(self, key: str, value: str) -> Parameters:
"""Insert key-value pair"""
def remove(self, key: str) -> Parameters:
"""Remove parameter by key"""
def extend(self, other: Parameters) -> Parameters:
"""Extend with another Parameters object"""
def is_ordered(self) -> bool:
"""Check if parameters maintain insertion order"""
# Operators for parameter manipulation
def __eq__(self, other) -> bool:
"""Equality comparison"""
def __str__(self) -> str:
"""String representation"""
class Selector:
"""Key expression + parameters combination"""
def __init__(self, selector: str):
"""Create selector from string (keyexpr?param1=val1)"""
@property
def key_expr(self) -> KeyExpr:
"""Get key expression part"""
@key_expr.setter
def key_expr(self, value: KeyExpr):
"""Set key expression part"""
@property
def parameters(self) -> Parameters:
"""Get parameters part"""
@parameters.setter
def parameters(self, value: Parameters):
"""Set parameters part"""
def __str__(self) -> str:
"""String representation"""Temporal ordering and unique identification across the distributed system.
class Timestamp:
"""Timestamping for samples and operations"""
def __init__(self, time: int, id: TimestampId):
"""Create timestamp with time value and ID"""
def get_time(self) -> int:
"""Get time component as NTP64 timestamp"""
def get_id(self) -> TimestampId:
"""Get timestamp ID component"""
def get_diff_duration(self, other: Timestamp) -> float:
"""Get duration difference in seconds"""
def to_string_rfc3339_lossy(self) -> str:
"""Convert to RFC3339 string (may lose precision)"""
@staticmethod
def parse_rfc3339(timestamp: str) -> Timestamp:
"""Parse RFC3339 timestamp string"""
# Comparison operators
def __eq__(self, other) -> bool:
"""Equality comparison"""
def __lt__(self, other) -> bool:
"""Less than comparison"""
def __le__(self, other) -> bool:
"""Less than or equal comparison"""
def __gt__(self, other) -> bool:
"""Greater than comparison"""
def __ge__(self, other) -> bool:
"""Greater than or equal comparison"""
class TimestampId:
"""Timestamp identifier for uniqueness"""
def __init__(self, id_bytes: bytes):
"""Create timestamp ID from bytes"""
def __bytes__(self) -> bytes:
"""Convert to bytes"""
# Comparison operators
def __eq__(self, other) -> bool:
"""Equality comparison"""
def __lt__(self, other) -> bool:
"""Less than comparison"""
class ZenohId:
"""Global unique peer identifier"""
def __str__(self) -> str:
"""String representation of ZenohId"""Unique identifiers for entities within sessions and across the network.
class EntityGlobalId:
"""Entity global identifier (unstable)"""
@property
def zid(self) -> ZenohId:
"""Get ZenohId component"""
@property
def eid(self) -> int:
"""Get entity ID component"""
# Type aliases for input flexibility
EntityId = int # Integer entity identifierType aliases that define acceptable input types for various Zenoh operations.
# Input type aliases for API flexibility
_IntoEncoding = str | Encoding # Accept string or Encoding
_IntoKeyExpr = str | KeyExpr # Accept string or KeyExpr
_IntoParameters = str | Parameters # Accept string or Parameters
_IntoSelector = str | Selector # Accept string or Selector
_IntoTimestampId = bytes | TimestampId # Accept bytes or TimestampId
_IntoWhatAmIMatcher = WhatAmI | WhatAmIMatcher # Accept node type or matcher
_IntoZBytes = str | bytes | bytearray | ZBytes # Accept various data types
_IntoQueryConsolidation = ConsolidationMode | QueryConsolidation # Accept mode or configimport zenoh
# Create ZBytes from different data types
text_data = zenoh.ZBytes("Hello, Zenoh!")
byte_data = zenoh.ZBytes(b"Binary data")
number_data = zenoh.ZBytes(str(42))
# Convert back to Python types
print(text_data.to_string()) # "Hello, Zenoh!"
print(byte_data.to_bytes()) # b"Binary data"
print(len(number_data)) # Length in bytes
# Use in boolean context
if text_data:
print("Data is not empty")
# Use as dictionary key (hashable)
data_cache = {
text_data: "cached_result_1",
byte_data: "cached_result_2"
}import zenoh
# Create encodings
json_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
text_encoding = zenoh.Encoding(zenoh.Encoding.TEXT_PLAIN)
# Encoding with schema
protobuf_encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_PROTOBUF).with_schema("user.proto")
# Use in publisher
session = zenoh.open()
publisher = session.declare_publisher("data/json", encoding=json_encoding)
import json
data = {"temperature": 23.5, "humidity": 65}
publisher.put(json.dumps(data))
publisher.undeclare()
session.close()import zenoh
# Create key expressions
sensor_key = zenoh.KeyExpr("sensors/temperature/room1")
pattern_key = zenoh.KeyExpr("sensors/**")
config_key = zenoh.KeyExpr("config/network")
# Test relationships
print(pattern_key.includes(sensor_key)) # True - pattern includes specific key
print(sensor_key.intersects(config_key)) # False - different hierarchies
# Join key expressions
base_key = zenoh.KeyExpr("sensors")
specific_key = base_key.concat("/temperature/room2")
print(specific_key) # "sensors/temperature/room2"
# Combine keys
room_key = zenoh.KeyExpr("room1")
full_key = sensor_key.join(room_key)import zenoh
# Create parameters
params = zenoh.Parameters("region=north;limit=10;format=json")
print(params.get("region")) # "north"
print(params.get("limit")) # "10"
# Add parameters
new_params = params.insert("timeout", "30")
extended_params = params.extend(zenoh.Parameters("debug=true"))
# Create selector
selector = zenoh.Selector("sensors/temperature?region=north&limit=5")
print(selector.key_expr) # KeyExpr("sensors/temperature")
print(selector.parameters.get("region")) # "north"
# Use in queries
session = zenoh.open()
replies = session.get(selector)
for reply in replies:
if reply.ok:
print(reply.ok.payload.to_string())
session.close()import zenoh
session = zenoh.open()
# Create timestamp
timestamp = session.new_timestamp()
print(f"Time: {timestamp.get_time()}")
print(f"ID: {timestamp.get_id()}")
# RFC3339 format
rfc3339_str = timestamp.to_string_rfc3339_lossy()
print(f"RFC3339: {rfc3339_str}")
# Parse timestamp
parsed = zenoh.Timestamp.parse_rfc3339("2023-01-01T12:00:00Z")
# Compare timestamps
if timestamp > parsed:
print("Current timestamp is later")
# Duration between timestamps
duration = timestamp.get_diff_duration(parsed)
print(f"Difference: {duration} seconds")
session.close()import zenoh
import json
import time
class SensorData:
def __init__(self, sensor_id: str, value: float, unit: str):
self.sensor_id = sensor_id
self.value = value
self.unit = unit
self.timestamp = time.time()
def to_json(self) -> str:
return json.dumps({
"sensor_id": self.sensor_id,
"value": self.value,
"unit": self.unit,
"timestamp": self.timestamp
})
@classmethod
def from_json(cls, json_str: str):
data = json.loads(json_str)
return cls(data["sensor_id"], data["value"], data["unit"])
def main():
session = zenoh.open()
# Create sensor data
sensor = SensorData("temp_01", 23.5, "°C")
# Create key expression for this sensor
key_expr = zenoh.KeyExpr(f"sensors/temperature/{sensor.sensor_id}")
# Create encoding for JSON data
encoding = zenoh.Encoding(zenoh.Encoding.APPLICATION_JSON)
# Declare publisher with encoding
publisher = session.declare_publisher(key_expr, encoding=encoding)
# Create ZBytes payload
payload = zenoh.ZBytes(sensor.to_json())
# Create timestamp
timestamp = session.new_timestamp()
# Publish with metadata
publisher.put(
payload,
timestamp=timestamp,
attachment={"source": "building_a", "floor": "2"}
)
print(f"Published sensor data:")
print(f" Key: {key_expr}")
print(f" Payload: {payload.to_string()}")
print(f" Encoding: {encoding}")
print(f" Timestamp: {timestamp.to_string_rfc3339_lossy()}")
# Query with parameters
query_params = zenoh.Parameters("unit=°C;recent=true")
selector = zenoh.Selector("sensors/temperature/**")
selector.parameters = query_params
print(f"\nQuerying with selector: {selector}")
# Simulate some delay
time.sleep(0.1)
# Query the data back
replies = session.get(selector)
for reply in replies:
if reply.ok:
sample = reply.ok
received_data = SensorData.from_json(sample.payload.to_string())
print(f"Received: {received_data.sensor_id} = {received_data.value} {received_data.unit}")
# Cleanup
publisher.undeclare()
session.close()
if __name__ == "__main__":
main()Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh