0
# Advanced Features
1
2
Advanced features in Zenoh provide sophisticated networking capabilities including liveliness detection, matching status monitoring, custom handler systems, and network administration tools. These features enable building robust, self-monitoring distributed systems with fine-grained control over communication patterns.
3
4
## Capabilities
5
6
### Liveliness Management
7
8
Monitor and advertise the availability of distributed system components.
9
10
```python { .api }
11
class Liveliness:
12
"""Liveliness management for distributed system components"""
13
14
def declare_token(self, key_expr) -> LivelinessToken:
15
"""
16
Declare a liveliness token for a key expression.
17
18
Parameters:
19
- key_expr: Key expression to declare liveliness for
20
21
Returns:
22
LivelinessToken that maintains liveliness while active
23
"""
24
25
def get(
26
self,
27
key_expr,
28
handler = None,
29
timeout: float = None
30
):
31
"""
32
Query current liveliness status.
33
34
Parameters:
35
- key_expr: Key expression pattern to query
36
- handler: Handler for liveliness replies
37
- timeout: Query timeout in seconds
38
39
Returns:
40
Iterator over liveliness replies if no handler provided
41
"""
42
43
def declare_subscriber(
44
self,
45
key_expr,
46
handler,
47
history: bool = False
48
) -> Subscriber:
49
"""
50
Subscribe to liveliness changes.
51
52
Parameters:
53
- key_expr: Key expression pattern to monitor
54
- handler: Handler for liveliness change notifications
55
- history: Whether to receive historical liveliness data
56
57
Returns:
58
Subscriber for liveliness changes
59
"""
60
61
class LivelinessToken:
62
"""Liveliness token that maintains component availability"""
63
64
def __enter__(self):
65
"""Context manager entry"""
66
return self
67
68
def __exit__(self, exc_type, exc_val, exc_tb):
69
"""Context manager exit - automatically undeclares token"""
70
71
def undeclare(self) -> None:
72
"""Undeclare the liveliness token"""
73
```
74
75
### Matching Status Monitoring
76
77
Monitor whether publishers/subscribers and queriers/queryables have matching peers.
78
79
```python { .api }
80
class MatchingStatus:
81
"""Entity matching status"""
82
83
@property
84
def matching(self) -> bool:
85
"""Whether there are matching entities on the network"""
86
87
class MatchingListener:
88
"""Matching status listener for monitoring connection changes"""
89
90
@property
91
def handler(self):
92
"""Get the listener's handler"""
93
94
def undeclare(self) -> None:
95
"""Undeclare the matching listener"""
96
97
def try_recv(self):
98
"""Try to receive matching status update without blocking"""
99
100
def recv(self):
101
"""Receive matching status update (blocking)"""
102
103
def __iter__(self):
104
"""Iterate over matching status updates"""
105
```
106
107
### Handler System
108
109
Flexible handler system supporting various callback patterns and channel types.
110
111
```python { .api }
112
from zenoh.handlers import Handler, DefaultHandler, FifoChannel, RingChannel, Callback
113
114
class Handler:
115
"""Generic handler interface for receiving data"""
116
117
def try_recv(self):
118
"""Try to receive data without blocking"""
119
120
def recv(self):
121
"""Receive data (blocking)"""
122
123
def __iter__(self):
124
"""Iterate over received data"""
125
126
def __next__(self):
127
"""Get next item in iteration"""
128
129
class DefaultHandler:
130
"""Default FIFO handler with unlimited capacity"""
131
pass
132
133
class FifoChannel:
134
"""FIFO channel with configurable capacity"""
135
136
def __init__(self, capacity: int):
137
"""
138
Create FIFO channel with specified capacity.
139
140
Parameters:
141
- capacity: Maximum number of items to buffer
142
"""
143
144
class RingChannel:
145
"""Ring channel that overwrites oldest data when full"""
146
147
def __init__(self, capacity: int):
148
"""
149
Create ring channel with fixed capacity.
150
151
Parameters:
152
- capacity: Fixed size of the ring buffer
153
"""
154
155
class Callback:
156
"""Callback handler wrapper for Python functions"""
157
158
@property
159
def callback(self):
160
"""Get the callback function"""
161
162
@property
163
def drop(self):
164
"""Get the drop callback (called when handler is dropped)"""
165
166
@property
167
def indirect(self) -> bool:
168
"""Whether callback is called indirectly"""
169
170
def __init__(
171
self,
172
callback,
173
drop = None,
174
indirect: bool = False
175
):
176
"""
177
Create callback handler.
178
179
Parameters:
180
- callback: Function to call with received data
181
- drop: Optional cleanup function
182
- indirect: Whether to use indirect calling
183
"""
184
185
def __call__(self, *args, **kwargs):
186
"""Call the wrapped callback function"""
187
```
188
189
### Network Administration
190
191
Tools for network configuration and monitoring.
192
193
```python { .api }
194
# Access session's liveliness interface
195
@property
196
def liveliness(self) -> Liveliness:
197
"""Get session's liveliness interface"""
198
199
# Session operations for entity management
200
def undeclare(self, entity) -> None:
201
"""
202
Undeclare any Zenoh entity (publisher, subscriber, etc.)
203
204
Parameters:
205
- entity: The entity to undeclare
206
"""
207
208
def declare_keyexpr(self, key_expr: str) -> KeyExpr:
209
"""
210
Declare a key expression for optimized repeated usage.
211
212
Parameters:
213
- key_expr: Key expression string to optimize
214
215
Returns:
216
Optimized KeyExpr object
217
"""
218
```
219
220
### Error Handling
221
222
Comprehensive error handling for robust distributed applications.
223
224
```python { .api }
225
class ZError(Exception):
226
"""Base exception for all Zenoh errors"""
227
pass
228
```
229
230
## Usage Examples
231
232
### Basic Liveliness
233
234
```python
235
import zenoh
236
import time
237
238
session = zenoh.open()
239
240
# Declare liveliness token
241
token = session.liveliness.declare_token("services/temperature_monitor")
242
243
print("Service is alive...")
244
245
# Service runs for some time
246
time.sleep(10)
247
248
# Cleanup - this signals that service is no longer alive
249
token.undeclare()
250
251
session.close()
252
```
253
254
### Liveliness with Context Manager
255
256
```python
257
import zenoh
258
import time
259
260
session = zenoh.open()
261
262
# Use context manager for automatic cleanup
263
with session.liveliness.declare_token("services/data_processor") as token:
264
print("Data processor service is running...")
265
266
# Simulate service work
267
for i in range(5):
268
print(f"Processing batch {i+1}...")
269
time.sleep(2)
270
271
print("Service work complete")
272
# Token is automatically undeclared when exiting the context
273
274
session.close()
275
```
276
277
### Monitoring Liveliness
278
279
```python
280
import zenoh
281
import time
282
283
def liveliness_handler(sample):
284
if sample.kind == zenoh.SampleKind.PUT:
285
print(f"Service ONLINE: {sample.key_expr}")
286
elif sample.kind == zenoh.SampleKind.DELETE:
287
print(f"Service OFFLINE: {sample.key_expr}")
288
289
session = zenoh.open()
290
291
# Subscribe to liveliness changes for all services
292
subscriber = session.liveliness.declare_subscriber(
293
"services/**",
294
liveliness_handler,
295
history=True # Get current liveliness state
296
)
297
298
print("Monitoring service liveliness...")
299
300
# Let it monitor for a while
301
time.sleep(30)
302
303
subscriber.undeclare()
304
session.close()
305
```
306
307
### Querying Liveliness Status
308
309
```python
310
import zenoh
311
312
session = zenoh.open()
313
314
# Query current liveliness status
315
print("Querying current service status...")
316
replies = session.liveliness.get("services/**", timeout=5.0)
317
318
active_services = []
319
for reply in replies:
320
if reply.ok:
321
service_key = str(reply.ok.key_expr)
322
active_services.append(service_key)
323
print(f"ACTIVE: {service_key}")
324
325
print(f"\nTotal active services: {len(active_services)}")
326
327
session.close()
328
```
329
330
### Matching Status Monitoring
331
332
```python
333
import zenoh
334
import time
335
336
def matching_handler(status):
337
if status.matching:
338
print("Publisher has subscribers!")
339
else:
340
print("No subscribers found")
341
342
session = zenoh.open()
343
344
# Create publisher with matching monitoring
345
publisher = session.declare_publisher("sensor/data")
346
347
# Monitor matching status
348
listener = publisher.declare_matching_listener(matching_handler)
349
350
# Check initial status
351
print(f"Initial matching status: {publisher.matching_status.matching}")
352
353
# Publish some data
354
for i in range(5):
355
publisher.put(f"Data point {i}")
356
time.sleep(1)
357
358
# Cleanup
359
listener.undeclare()
360
publisher.undeclare()
361
session.close()
362
```
363
364
### Custom Handler Examples
365
366
```python
367
import zenoh
368
from zenoh.handlers import FifoChannel, RingChannel, Callback
369
import threading
370
import time
371
372
session = zenoh.open()
373
374
# Example 1: FIFO Channel Handler
375
print("Example 1: FIFO Channel")
376
fifo_handler = FifoChannel(capacity=10)
377
subscriber1 = session.declare_subscriber("data/fifo", fifo_handler)
378
379
# Simulate some data
380
publisher = session.declare_publisher("data/fifo")
381
for i in range(5):
382
publisher.put(f"FIFO message {i}")
383
384
# Receive from FIFO
385
try:
386
sample = subscriber1.recv() # Blocks until data available
387
print(f"FIFO received: {sample.payload.to_string()}")
388
except:
389
pass
390
391
subscriber1.undeclare()
392
393
# Example 2: Ring Channel Handler
394
print("\nExample 2: Ring Channel")
395
ring_handler = RingChannel(capacity=3)
396
subscriber2 = session.declare_subscriber("data/ring", ring_handler)
397
398
# Send more data than ring capacity
399
for i in range(5):
400
publisher.put(f"Ring message {i}")
401
402
# Ring channel only keeps latest 3 messages
403
for sample in subscriber2:
404
print(f"Ring received: {sample.payload.to_string()}")
405
break # Just get first one for demo
406
407
subscriber2.undeclare()
408
409
# Example 3: Callback Handler
410
print("\nExample 3: Callback Handler")
411
def my_callback(sample):
412
print(f"Callback received: {sample.payload.to_string()}")
413
414
def cleanup_callback():
415
print("Callback handler cleanup")
416
417
callback_handler = Callback(
418
callback=my_callback,
419
drop=cleanup_callback
420
)
421
422
subscriber3 = session.declare_subscriber("data/callback", callback_handler)
423
424
# Send test data
425
publisher.put("Callback test message")
426
time.sleep(0.1) # Let callback process
427
428
subscriber3.undeclare() # This will trigger cleanup_callback
429
430
publisher.undeclare()
431
session.close()
432
```
433
434
### Advanced Error Handling
435
436
```python
437
import zenoh
438
import logging
439
440
logging.basicConfig(level=logging.INFO)
441
logger = logging.getLogger(__name__)
442
443
def robust_session_example():
444
session = None
445
publisher = None
446
447
try:
448
# Open session with error handling
449
config = zenoh.Config()
450
session = zenoh.open(config)
451
logger.info("Session opened successfully")
452
453
# Declare publisher
454
publisher = session.declare_publisher("robust/example")
455
logger.info("Publisher declared")
456
457
# Publish with error handling
458
for i in range(10):
459
try:
460
publisher.put(f"Message {i}")
461
logger.debug(f"Published message {i}")
462
except zenoh.ZError as e:
463
logger.error(f"Failed to publish message {i}: {e}")
464
continue
465
466
except zenoh.ZError as e:
467
logger.error(f"Zenoh error: {e}")
468
return False
469
470
except Exception as e:
471
logger.error(f"Unexpected error: {e}")
472
return False
473
474
finally:
475
# Cleanup with error handling
476
if publisher:
477
try:
478
publisher.undeclare()
479
logger.info("Publisher undeclared")
480
except zenoh.ZError as e:
481
logger.error(f"Error undeclaring publisher: {e}")
482
483
if session:
484
try:
485
session.close()
486
logger.info("Session closed")
487
except zenoh.ZError as e:
488
logger.error(f"Error closing session: {e}")
489
490
return True
491
492
# Run the robust example
493
success = robust_session_example()
494
print(f"Example completed successfully: {success}")
495
```
496
497
### Complete Advanced Features Example
498
499
```python
500
import zenoh
501
from zenoh.handlers import FifoChannel, Callback
502
import threading
503
import time
504
import logging
505
506
logging.basicConfig(level=logging.INFO)
507
logger = logging.getLogger(__name__)
508
509
class DistributedService:
510
def __init__(self, service_name: str):
511
self.service_name = service_name
512
self.session = None
513
self.liveliness_token = None
514
self.publisher = None
515
self.subscriber = None
516
self.matching_listener = None
517
self.running = False
518
519
def start(self):
520
"""Start the distributed service with full monitoring"""
521
try:
522
# Open session
523
self.session = zenoh.open()
524
logger.info(f"Service {self.service_name} session started")
525
526
# Declare liveliness
527
self.liveliness_token = self.session.liveliness.declare_token(
528
f"services/{self.service_name}"
529
)
530
logger.info(f"Liveliness declared for {self.service_name}")
531
532
# Setup publisher with matching monitoring
533
self.publisher = self.session.declare_publisher(
534
f"output/{self.service_name}"
535
)
536
537
def matching_handler(status):
538
if status.matching:
539
logger.info(f"{self.service_name}: Consumers connected")
540
else:
541
logger.warning(f"{self.service_name}: No consumers")
542
543
self.matching_listener = self.publisher.declare_matching_listener(
544
matching_handler
545
)
546
547
# Setup subscriber with custom handler
548
handler = FifoChannel(capacity=100)
549
self.subscriber = self.session.declare_subscriber(
550
f"input/{self.service_name}",
551
handler
552
)
553
554
self.running = True
555
logger.info(f"Service {self.service_name} fully started")
556
557
except zenoh.ZError as e:
558
logger.error(f"Failed to start service {self.service_name}: {e}")
559
self.stop()
560
raise
561
562
def process_data(self):
563
"""Main processing loop"""
564
while self.running:
565
try:
566
# Try to receive input data
567
sample = self.subscriber.try_recv()
568
if sample:
569
input_data = sample.payload.to_string()
570
logger.debug(f"{self.service_name} processing: {input_data}")
571
572
# Process the data (simulate some work)
573
processed = f"PROCESSED[{self.service_name}]: {input_data.upper()}"
574
575
# Publish result
576
self.publisher.put(processed)
577
logger.debug(f"{self.service_name} output: {processed}")
578
579
else:
580
# No data available, short sleep
581
time.sleep(0.1)
582
583
except zenoh.ZError as e:
584
logger.error(f"Processing error in {self.service_name}: {e}")
585
time.sleep(1) # Brief pause before retry
586
587
except KeyboardInterrupt:
588
logger.info(f"Shutdown requested for {self.service_name}")
589
break
590
591
def stop(self):
592
"""Stop the service and cleanup resources"""
593
self.running = False
594
595
# Cleanup in reverse order
596
if self.matching_listener:
597
try:
598
self.matching_listener.undeclare()
599
except zenoh.ZError as e:
600
logger.error(f"Error undeclaring matching listener: {e}")
601
602
if self.subscriber:
603
try:
604
self.subscriber.undeclare()
605
except zenoh.ZError as e:
606
logger.error(f"Error undeclaring subscriber: {e}")
607
608
if self.publisher:
609
try:
610
self.publisher.undeclare()
611
except zenoh.ZError as e:
612
logger.error(f"Error undeclaring publisher: {e}")
613
614
if self.liveliness_token:
615
try:
616
self.liveliness_token.undeclare()
617
except zenoh.ZError as e:
618
logger.error(f"Error undeclaring liveliness: {e}")
619
620
if self.session:
621
try:
622
self.session.close()
623
except zenoh.ZError as e:
624
logger.error(f"Error closing session: {e}")
625
626
logger.info(f"Service {self.service_name} stopped")
627
628
def monitor_services():
629
"""Monitor all service liveliness"""
630
session = zenoh.open()
631
632
def liveliness_handler(sample):
633
service_name = str(sample.key_expr).split('/')[-1]
634
if sample.kind == zenoh.SampleKind.PUT:
635
logger.info(f"MONITOR: Service {service_name} is ONLINE")
636
else:
637
logger.warning(f"MONITOR: Service {service_name} went OFFLINE")
638
639
# Subscribe to all service liveliness with history
640
monitor = session.liveliness.declare_subscriber(
641
"services/**",
642
liveliness_handler,
643
history=True
644
)
645
646
# Let it monitor
647
time.sleep(20)
648
649
monitor.undeclare()
650
session.close()
651
logger.info("Service monitor stopped")
652
653
def main():
654
# Start service monitor in background
655
monitor_thread = threading.Thread(target=monitor_services)
656
monitor_thread.start()
657
658
# Create and start services
659
service1 = DistributedService("processor_a")
660
service2 = DistributedService("processor_b")
661
662
try:
663
service1.start()
664
service2.start()
665
666
# Start processing in separate threads
667
thread1 = threading.Thread(target=service1.process_data)
668
thread2 = threading.Thread(target=service2.process_data)
669
670
thread1.start()
671
thread2.start()
672
673
# Send some test data
674
test_session = zenoh.open()
675
test_pub = test_session.declare_publisher("input/processor_a")
676
677
for i in range(5):
678
test_pub.put(f"test_data_{i}")
679
time.sleep(2)
680
681
test_pub.undeclare()
682
test_session.close()
683
684
# Let services run for a bit
685
time.sleep(5)
686
687
except KeyboardInterrupt:
688
logger.info("Shutdown requested")
689
690
finally:
691
# Stop services
692
service1.stop()
693
service2.stop()
694
695
# Wait for threads to finish
696
monitor_thread.join(timeout=5)
697
698
logger.info("All services stopped")
699
700
if __name__ == "__main__":
701
main()
702
```