The python API for Eclipse zenoh - a high-performance networking library providing pub/sub, store/query and compute framework for zero-overhead communication
—
The Publisher/Subscriber pattern enables real-time data streaming in Zenoh applications. Publishers send data to specific key expressions, while Subscribers receive data matching their subscription patterns. This decoupled messaging pattern supports high-throughput, low-latency communication with flexible quality of service controls.
Publishers send data to specific key expressions with configurable quality of service parameters.
def declare_publisher(
self,
key_expr,
encoding: Encoding = None,
congestion_control: CongestionControl = None,
priority: Priority = None,
reliability: Reliability = None
) -> Publisher:
"""
Declare a publisher for a key expression.
Parameters:
- key_expr: Key expression to publish on
- encoding: Data encoding specification
- congestion_control: How to handle network congestion
- priority: Message priority level
- reliability: Reliability mode for message delivery
Returns:
Publisher object for sending data
"""
class Publisher:
"""Publisher for data streams"""
@property
def key_expr(self) -> KeyExpr:
"""Get the publisher's key expression"""
@property
def encoding(self) -> Encoding:
"""Get the publisher's encoding"""
@property
def congestion_control(self) -> CongestionControl:
"""Get congestion control setting"""
@property
def priority(self) -> Priority:
"""Get priority setting"""
@property
def reliability(self) -> Reliability:
"""Get reliability setting"""
@property
def matching_status(self) -> MatchingStatus:
"""Get current matching status"""
def put(
self,
payload,
encoding: Encoding = None,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""
Send data through the publisher.
Parameters:
- payload: Data to send (str, bytes, or ZBytes)
- encoding: Override default encoding
- timestamp: Custom timestamp for the data
- attachment: Additional metadata
"""
def delete(
self,
timestamp: Timestamp = None,
attachment = None
) -> None:
"""
Send a delete operation.
Parameters:
- timestamp: Custom timestamp for the delete
- attachment: Additional metadata
"""
def undeclare(self) -> None:
"""Undeclare the publisher and release resources"""
def declare_matching_listener(self, handler) -> MatchingListener:
"""Declare a listener for matching status changes"""Subscribers receive data matching their subscription key expressions through configurable handlers.
def declare_subscriber(
self,
key_expr,
handler = None,
reliability: Reliability = None,
locality: Locality = None
) -> Subscriber:
"""
Declare a subscriber for a key expression.
Parameters:
- key_expr: Key expression pattern to subscribe to
- handler: Handler for received samples (callback, channel, etc.)
- reliability: Reliability mode for receiving data
- locality: Locality constraint for data sources
Returns:
Subscriber object for receiving data
"""
class Subscriber:
"""Subscriber with generic handler"""
@property
def key_expr(self) -> KeyExpr:
"""Get the subscriber's key expression"""
@property
def handler(self):
"""Get the subscriber's handler"""
def undeclare(self) -> None:
"""Undeclare the subscriber and release resources"""
def try_recv(self):
"""Try to receive a sample without blocking"""
def recv(self):
"""Receive a sample (blocking)"""
def __iter__(self):
"""Iterate over received samples"""Data samples received by subscribers contain the payload and metadata.
class Sample:
"""Data sample"""
@property
def key_expr(self) -> KeyExpr:
"""Key expression where data was published"""
@property
def payload(self) -> ZBytes:
"""Sample payload data"""
@property
def kind(self) -> SampleKind:
"""Sample kind (PUT or DELETE)"""
@property
def encoding(self) -> Encoding:
"""Data encoding"""
@property
def timestamp(self) -> Timestamp:
"""Sample timestamp"""
@property
def congestion_control(self) -> CongestionControl:
"""Congestion control setting"""
@property
def priority(self) -> Priority:
"""Message priority"""
@property
def express(self) -> bool:
"""Express delivery flag"""
@property
def attachment(self):
"""Additional metadata attachment"""
class SampleKind:
"""Sample operation type"""
PUT = ...
DELETE = ...Configure message delivery characteristics and network behavior.
class Priority:
"""Message priority levels"""
REAL_TIME = ...
INTERACTIVE_HIGH = ...
INTERACTIVE_LOW = ...
DATA_HIGH = ...
DATA = ...
DATA_LOW = ...
BACKGROUND = ...
DEFAULT = ...
MIN = ...
MAX = ...
class CongestionControl:
"""Congestion control modes"""
DROP = ... # Drop messages when congested
BLOCK = ... # Block sender when congested
BLOCK_FIRST = ... # Block first message when congested (unstable)
DEFAULT = ...
class Reliability:
"""Reliability modes (unstable)"""
BEST_EFFORT = ... # Best effort delivery
RELIABLE = ... # Reliable delivery
class Locality:
"""Origin/destination locality"""
SESSION_LOCAL = ... # Only local session
REMOTE = ... # Only remote sources
ANY = ... # Any source
DEFAULT = ...Monitor whether publishers and subscribers are matched with peers.
class MatchingStatus:
"""Entity matching status"""
@property
def matching(self) -> bool:
"""Whether there are matching entities"""
class MatchingListener:
"""Matching status listener"""
@property
def handler(self):
"""Get the listener's handler"""
def undeclare(self) -> None:
"""Undeclare the matching listener"""
def try_recv(self):
"""Try to receive a matching status update"""
def recv(self):
"""Receive a matching status update (blocking)"""
def __iter__(self):
"""Iterate over matching status updates"""import zenoh
session = zenoh.open()
# Declare publisher
publisher = session.declare_publisher("sensors/temperature")
# Send data
publisher.put("25.3")
publisher.put(b"binary_data")
# Send with metadata
publisher.put(
"26.1",
timestamp=session.new_timestamp(),
attachment={"sensor_id": "temp_01"}
)
# Clean up
publisher.undeclare()
session.close()import zenoh
session = zenoh.open()
# High-priority publisher with reliable delivery
publisher = session.declare_publisher(
"critical/alerts",
priority=zenoh.Priority.REAL_TIME,
congestion_control=zenoh.CongestionControl.BLOCK,
reliability=zenoh.Reliability.RELIABLE
)
publisher.put("System critical alert!")
publisher.undeclare()
session.close()import zenoh
def data_handler(sample):
print(f"Received on {sample.key_expr}: {sample.payload.to_string()}")
if sample.kind == zenoh.SampleKind.DELETE:
print(" -> DELETE operation")
session = zenoh.open()
# Subscribe with callback handler
subscriber = session.declare_subscriber("sensors/**", data_handler)
# Let it run
import time
time.sleep(10)
subscriber.undeclare()
session.close()import zenoh
session = zenoh.open()
# Subscribe without callback
subscriber = session.declare_subscriber("data/stream")
# Manual reception
try:
sample = subscriber.recv() # Blocking receive
print(f"Got: {sample.payload.to_string()}")
except KeyboardInterrupt:
pass
# Non-blocking reception
sample = subscriber.try_recv()
if sample is not None:
print(f"Got: {sample.payload.to_string()}")
# Iterator style
for sample in subscriber:
print(f"Sample: {sample.payload.to_string()}")
if some_condition:
break
subscriber.undeclare()
session.close()import zenoh
session = zenoh.open()
publisher = session.declare_publisher("demo/pub")
def matching_handler(status):
if status.matching:
print("Publisher has matching subscribers!")
else:
print("No matching subscribers")
# Monitor matching status
listener = publisher.declare_matching_listener(matching_handler)
# Check current status
print(f"Currently matching: {publisher.matching_status.matching}")
# Clean up
listener.undeclare()
publisher.undeclare()
session.close()import zenoh
import threading
import time
def publisher_thread():
session = zenoh.open()
publisher = session.declare_publisher("demo/example")
for i in range(10):
publisher.put(f"Message {i}")
time.sleep(1)
publisher.undeclare()
session.close()
def subscriber_thread():
def handler(sample):
print(f"Subscriber received: {sample.payload.to_string()}")
session = zenoh.open()
subscriber = session.declare_subscriber("demo/example", handler)
time.sleep(12) # Let it run
subscriber.undeclare()
session.close()
# Run both threads
pub_thread = threading.Thread(target=publisher_thread)
sub_thread = threading.Thread(target=subscriber_thread)
pub_thread.start()
sub_thread.start()
pub_thread.join()
sub_thread.join()Install with Tessl CLI
npx tessl i tessl/pypi-eclipse-zenoh