0
# Extensions
1
2
The Zenoh extensions module (`zenoh.ext`) provides additional functionality including type-safe serialization utilities, custom numeric types, and advanced publisher/subscriber features with enhanced reliability and caching capabilities. These extensions enable more sophisticated data handling and communication patterns while maintaining compatibility with the core Zenoh API.
3
4
## Capabilities
5
6
### Serialization System
7
8
Type-safe serialization and deserialization of Python objects to/from ZBytes.
9
10
```python { .api }
11
from zenoh.ext import z_serialize, z_deserialize, ZDeserializeError
12
13
def z_serialize(obj) -> ZBytes:
14
"""
15
Serialize Python objects to ZBytes with automatic type detection.
16
17
Parameters:
18
- obj: Python object to serialize (supports basic types, collections, custom objects)
19
20
Returns:
21
ZBytes containing serialized data
22
23
Raises:
24
ZError: If serialization fails
25
"""
26
27
def z_deserialize(target_type, data: ZBytes):
28
"""
29
Deserialize ZBytes to Python objects with type validation.
30
31
Parameters:
32
- target_type: Expected type for deserialization validation
33
- data: ZBytes containing serialized data
34
35
Returns:
36
Deserialized Python object of specified type
37
38
Raises:
39
ZDeserializeError: If deserialization fails or type doesn't match
40
"""
41
42
class ZDeserializeError(Exception):
43
"""Exception raised when deserialization fails"""
44
pass
45
```
46
47
### Custom Numeric Types
48
49
Precise numeric types for cross-platform data exchange and scientific computing.
50
51
```python { .api }
52
# Signed Integer Types
53
class Int8:
54
"""8-bit signed integer (-128 to 127)"""
55
def __init__(self, value: int): ...
56
57
class Int16:
58
"""16-bit signed integer (-32,768 to 32,767)"""
59
def __init__(self, value: int): ...
60
61
class Int32:
62
"""32-bit signed integer (-2,147,483,648 to 2,147,483,647)"""
63
def __init__(self, value: int): ...
64
65
class Int64:
66
"""64-bit signed integer"""
67
def __init__(self, value: int): ...
68
69
class Int128:
70
"""128-bit signed integer for very large numbers"""
71
def __init__(self, value: int): ...
72
73
# Unsigned Integer Types
74
class UInt8:
75
"""8-bit unsigned integer (0 to 255)"""
76
def __init__(self, value: int): ...
77
78
class UInt16:
79
"""16-bit unsigned integer (0 to 65,535)"""
80
def __init__(self, value: int): ...
81
82
class UInt32:
83
"""32-bit unsigned integer (0 to 4,294,967,295)"""
84
def __init__(self, value: int): ...
85
86
class UInt64:
87
"""64-bit unsigned integer"""
88
def __init__(self, value: int): ...
89
90
class UInt128:
91
"""128-bit unsigned integer for very large numbers"""
92
def __init__(self, value: int): ...
93
94
# Floating Point Types
95
class Float32:
96
"""32-bit IEEE 754 floating point number"""
97
def __init__(self, value: float): ...
98
99
class Float64:
100
"""64-bit IEEE 754 floating point number"""
101
def __init__(self, value: float): ...
102
```
103
104
### Advanced Publisher (Unstable)
105
106
Enhanced publisher with additional features for reliable communication.
107
108
```python { .api }
109
def declare_advanced_publisher(
110
session,
111
key_expr,
112
encoding: Encoding = None,
113
congestion_control: CongestionControl = None,
114
priority: Priority = None,
115
cache: CacheConfig = None,
116
subscriber_detection: bool = None
117
) -> AdvancedPublisher:
118
"""
119
Declare an advanced publisher with enhanced features (unstable).
120
121
Parameters:
122
- session: Zenoh session
123
- key_expr: Key expression to publish on
124
- encoding: Data encoding specification
125
- congestion_control: Congestion control mode
126
- priority: Message priority
127
- cache: Cache configuration for late-joining subscribers
128
- subscriber_detection: Enable subscriber detection
129
130
Returns:
131
AdvancedPublisher with enhanced capabilities
132
"""
133
134
class AdvancedPublisher:
135
"""Advanced publisher with additional features (unstable)"""
136
137
@property
138
def key_expr(self) -> KeyExpr:
139
"""Get the publisher's key expression"""
140
141
@property
142
def encoding(self) -> Encoding:
143
"""Get the publisher's encoding"""
144
145
@property
146
def congestion_control(self) -> CongestionControl:
147
"""Get congestion control setting"""
148
149
@property
150
def priority(self) -> Priority:
151
"""Get priority setting"""
152
153
def put(
154
self,
155
payload,
156
encoding: Encoding = None,
157
timestamp: Timestamp = None,
158
attachment = None
159
) -> None:
160
"""
161
Send data through the advanced publisher.
162
163
Parameters:
164
- payload: Data to send
165
- encoding: Override default encoding
166
- timestamp: Custom timestamp
167
- attachment: Additional metadata
168
"""
169
170
def delete(
171
self,
172
timestamp: Timestamp = None,
173
attachment = None
174
) -> None:
175
"""Send a delete operation"""
176
177
def undeclare(self) -> None:
178
"""Undeclare the advanced publisher"""
179
```
180
181
### Advanced Subscriber (Unstable)
182
183
Enhanced subscriber with miss detection, recovery, and publisher detection.
184
185
```python { .api }
186
def declare_advanced_subscriber(
187
session,
188
key_expr,
189
handler = None,
190
reliability: Reliability = None,
191
recovery: RecoveryConfig = None,
192
history: HistoryConfig = None,
193
miss_detection: MissDetectionConfig = None
194
) -> AdvancedSubscriber:
195
"""
196
Declare an advanced subscriber with enhanced features (unstable).
197
198
Parameters:
199
- session: Zenoh session
200
- key_expr: Key expression pattern to subscribe to
201
- handler: Handler for received samples
202
- reliability: Reliability mode
203
- recovery: Recovery configuration for missed samples
204
- history: History configuration for late-joining
205
- miss_detection: Configuration for detecting missed samples
206
207
Returns:
208
AdvancedSubscriber with enhanced capabilities
209
"""
210
211
class AdvancedSubscriber:
212
"""Advanced subscriber with additional features (unstable)"""
213
214
@property
215
def key_expr(self) -> KeyExpr:
216
"""Get the subscriber's key expression"""
217
218
@property
219
def handler(self):
220
"""Get the subscriber's handler"""
221
222
def sample_miss_listener(self, handler) -> SampleMissListener:
223
"""
224
Declare a listener for sample miss detection.
225
226
Parameters:
227
- handler: Handler for miss notifications
228
229
Returns:
230
SampleMissListener for monitoring missed samples
231
"""
232
233
def detect_publishers(self, handler):
234
"""
235
Enable publisher detection with callback.
236
237
Parameters:
238
- handler: Handler for publisher detection events
239
"""
240
241
def undeclare(self) -> None:
242
"""Undeclare the advanced subscriber"""
243
244
def try_recv(self):
245
"""Try to receive a sample without blocking"""
246
247
def recv(self):
248
"""Receive a sample (blocking)"""
249
250
def __iter__(self):
251
"""Iterate over received samples"""
252
```
253
254
### Advanced Configuration
255
256
Configuration objects for advanced publisher/subscriber features.
257
258
```python { .api }
259
class CacheConfig:
260
"""Cache configuration for late-joining subscribers (unstable)"""
261
# Configuration details depend on implementation
262
263
class HistoryConfig:
264
"""History configuration for subscriber catch-up (unstable)"""
265
# Configuration details depend on implementation
266
267
class MissDetectionConfig:
268
"""Configuration for detecting missed samples (unstable)"""
269
# Configuration details depend on implementation
270
271
class RecoveryConfig:
272
"""Recovery configuration for missed sample recovery (unstable)"""
273
# Configuration details depend on implementation
274
275
class RepliesConfig:
276
"""Configuration for query replies handling (unstable)"""
277
# Configuration details depend on implementation
278
```
279
280
### Miss Detection
281
282
Monitor and handle missed samples in data streams.
283
284
```python { .api }
285
class Miss:
286
"""Information about missed samples (unstable)"""
287
288
@property
289
def source(self) -> ZenohId:
290
"""Source that missed samples are from"""
291
292
@property
293
def nb(self) -> int:
294
"""Number of missed samples"""
295
296
class SampleMissListener:
297
"""Listener for sample miss detection (unstable)"""
298
299
def undeclare(self) -> None:
300
"""Undeclare the miss listener"""
301
302
def try_recv(self):
303
"""Try to receive a miss notification without blocking"""
304
305
def recv(self):
306
"""Receive a miss notification (blocking)"""
307
308
def __iter__(self):
309
"""Iterate over miss notifications"""
310
```
311
312
## Usage Examples
313
314
### Basic Serialization
315
316
```python
317
import zenoh.ext as zext
318
import zenoh
319
320
# Serialize various data types
321
data_list = [1, 2, 3, "hello", {"key": "value"}]
322
serialized = zext.z_serialize(data_list)
323
324
# Deserialize with type checking
325
try:
326
deserialized = zext.z_deserialize(list, serialized)
327
print(f"Deserialized: {deserialized}")
328
except zext.ZDeserializeError as e:
329
print(f"Deserialization failed: {e}")
330
331
# Use in Zenoh communication
332
session = zenoh.open()
333
publisher = session.declare_publisher("data/serialized")
334
335
# Publish serialized data
336
publisher.put(serialized)
337
338
def handler(sample):
339
try:
340
data = zext.z_deserialize(list, sample.payload)
341
print(f"Received: {data}")
342
except zext.ZDeserializeError as e:
343
print(f"Failed to deserialize: {e}")
344
345
subscriber = session.declare_subscriber("data/serialized", handler)
346
347
# Cleanup
348
import time
349
time.sleep(1)
350
subscriber.undeclare()
351
publisher.undeclare()
352
session.close()
353
```
354
355
### Custom Numeric Types
356
357
```python
358
import zenoh.ext as zext
359
import zenoh
360
361
session = zenoh.open()
362
363
# Create precise numeric values
364
temperature = zext.Float32(23.567) # 32-bit precision
365
sensor_id = zext.UInt16(1024) # 16-bit unsigned
366
timestamp = zext.Int64(1640995200) # 64-bit signed
367
368
# Serialize numeric data
369
sensor_data = {
370
"temperature": temperature,
371
"sensor_id": sensor_id,
372
"timestamp": timestamp
373
}
374
375
serialized_data = zext.z_serialize(sensor_data)
376
377
# Publish precise sensor data
378
publisher = session.declare_publisher("sensors/precise",
379
encoding=zenoh.Encoding.ZENOH_SERIALIZED)
380
publisher.put(serialized_data)
381
382
def precise_handler(sample):
383
try:
384
data = zext.z_deserialize(dict, sample.payload)
385
print(f"Precise sensor reading:")
386
print(f" Temperature: {data['temperature']} (32-bit float)")
387
print(f" Sensor ID: {data['sensor_id']} (16-bit uint)")
388
print(f" Timestamp: {data['timestamp']} (64-bit int)")
389
except zext.ZDeserializeError as e:
390
print(f"Failed to deserialize sensor data: {e}")
391
392
subscriber = session.declare_subscriber("sensors/precise", precise_handler)
393
394
time.sleep(1)
395
subscriber.undeclare()
396
publisher.undeclare()
397
session.close()
398
```
399
400
### Advanced Publisher with Cache
401
402
```python
403
import zenoh
404
import zenoh.ext as zext
405
import time
406
407
session = zenoh.open()
408
409
# Note: This is unstable API - actual configuration may differ
410
try:
411
# Declare advanced publisher with caching for late joiners
412
advanced_pub = zext.declare_advanced_publisher(
413
session,
414
"data/cached",
415
cache=zext.CacheConfig(), # Enable caching
416
subscriber_detection=True # Detect subscribers
417
)
418
419
print("Advanced publisher declared with caching")
420
421
# Publish data that will be cached
422
for i in range(5):
423
data = {"message": f"Cached message {i}", "timestamp": time.time()}
424
serialized = zext.z_serialize(data)
425
advanced_pub.put(serialized)
426
print(f"Published cached message {i}")
427
time.sleep(1)
428
429
# Late-joining subscriber should receive cached data
430
def late_subscriber_handler(sample):
431
try:
432
data = zext.z_deserialize(dict, sample.payload)
433
print(f"Late subscriber received: {data['message']}")
434
except zext.ZDeserializeError:
435
print("Failed to deserialize cached data")
436
437
print("Creating late-joining subscriber...")
438
late_subscriber = session.declare_subscriber("data/cached", late_subscriber_handler)
439
440
time.sleep(2)
441
442
# Cleanup
443
late_subscriber.undeclare()
444
advanced_pub.undeclare()
445
446
except Exception as e:
447
print(f"Advanced publisher features may not be available: {e}")
448
449
session.close()
450
```
451
452
### Advanced Subscriber with Miss Detection
453
454
```python
455
import zenoh
456
import zenoh.ext as zext
457
import time
458
import threading
459
460
session = zenoh.open()
461
462
# Note: This is unstable API - actual configuration may differ
463
try:
464
def sample_handler(sample):
465
try:
466
data = zext.z_deserialize(dict, sample.payload)
467
print(f"Received sample: {data['sequence']}")
468
except zext.ZDeserializeError:
469
print("Failed to deserialize sample")
470
471
def miss_handler(miss):
472
print(f"MISSED {miss.nb} samples from source {miss.source}")
473
474
# Declare advanced subscriber with miss detection
475
advanced_sub = zext.declare_advanced_subscriber(
476
session,
477
"data/reliable",
478
handler=sample_handler,
479
miss_detection=zext.MissDetectionConfig()
480
)
481
482
# Setup miss detection listener
483
miss_listener = advanced_sub.sample_miss_listener(miss_handler)
484
485
print("Advanced subscriber with miss detection ready")
486
487
# Simulate publisher sending data with intentional gaps
488
def publisher_thread():
489
pub = session.declare_publisher("data/reliable")
490
491
for i in range(10):
492
if i == 5: # Skip message 5 to simulate loss
493
continue
494
495
data = {"sequence": i, "payload": f"data_{i}"}
496
serialized = zext.z_serialize(data)
497
pub.put(serialized)
498
time.sleep(0.5)
499
500
pub.undeclare()
501
502
# Start publisher in separate thread
503
pub_thread = threading.Thread(target=publisher_thread)
504
pub_thread.start()
505
506
# Let it run
507
time.sleep(6)
508
509
# Cleanup
510
pub_thread.join()
511
miss_listener.undeclare()
512
advanced_sub.undeclare()
513
514
except Exception as e:
515
print(f"Advanced subscriber features may not be available: {e}")
516
517
session.close()
518
```
519
520
### Complete Extension Example
521
522
```python
523
import zenoh
524
import zenoh.ext as zext
525
import threading
526
import time
527
import random
528
529
class AdvancedDataProcessor:
530
"""Example using multiple extension features"""
531
532
def __init__(self, processor_id: int):
533
self.processor_id = processor_id
534
self.session = zenoh.open()
535
self.running = False
536
537
# Use precise numeric types
538
self.id = zext.UInt16(processor_id)
539
self.processed_count = zext.UInt64(0)
540
541
def start_processing(self):
542
"""Start the data processing service"""
543
self.running = True
544
545
try:
546
# Setup advanced publisher
547
self.publisher = zext.declare_advanced_publisher(
548
self.session,
549
f"results/processor_{self.processor_id}",
550
cache=zext.CacheConfig() # Cache results for late subscribers
551
)
552
553
# Setup advanced subscriber with miss detection
554
def input_handler(sample):
555
try:
556
data = zext.z_deserialize(dict, sample.payload)
557
self.process_data(data)
558
except zext.ZDeserializeError as e:
559
print(f"Processor {self.processor_id}: Deserialization error: {e}")
560
561
def miss_handler(miss):
562
print(f"Processor {self.processor_id}: MISSED {miss.nb} inputs!")
563
564
self.subscriber = zext.declare_advanced_subscriber(
565
self.session,
566
"input/data",
567
handler=input_handler,
568
miss_detection=zext.MissDetectionConfig()
569
)
570
571
self.miss_listener = self.subscriber.sample_miss_listener(miss_handler)
572
573
print(f"Advanced processor {self.processor_id} started")
574
575
except Exception as e:
576
print(f"Failed to start advanced features: {e}")
577
# Fallback to basic functionality
578
self.publisher = self.session.declare_publisher(f"results/processor_{self.processor_id}")
579
self.subscriber = self.session.declare_subscriber("input/data", self.basic_handler)
580
581
def basic_handler(self, sample):
582
"""Fallback handler for basic functionality"""
583
try:
584
data = zext.z_deserialize(dict, sample.payload)
585
self.process_data(data)
586
except zext.ZDeserializeError as e:
587
print(f"Processor {self.processor_id}: Basic deserialization error: {e}")
588
589
def process_data(self, input_data):
590
"""Process input data and publish results"""
591
# Simulate processing time
592
time.sleep(random.uniform(0.1, 0.3))
593
594
# Increment counter with precise type
595
self.processed_count = zext.UInt64(int(self.processed_count) + 1)
596
597
# Create result with precise numeric types
598
result = {
599
"processor_id": self.id,
600
"input_sequence": input_data.get("sequence", 0),
601
"processed_count": self.processed_count,
602
"processing_time": zext.Float32(random.uniform(0.1, 0.3)),
603
"result": f"PROCESSED_{input_data.get('value', 'unknown')}"
604
}
605
606
# Serialize and publish result
607
serialized_result = zext.z_serialize(result)
608
self.publisher.put(serialized_result)
609
610
print(f"Processor {self.processor_id}: Processed #{int(self.processed_count)}")
611
612
def stop(self):
613
"""Stop the processor and cleanup"""
614
self.running = False
615
616
if hasattr(self, 'miss_listener'):
617
self.miss_listener.undeclare()
618
if hasattr(self, 'subscriber'):
619
self.subscriber.undeclare()
620
if hasattr(self, 'publisher'):
621
self.publisher.undeclare()
622
623
self.session.close()
624
print(f"Processor {self.processor_id} stopped")
625
626
def data_generator():
627
"""Generate test data"""
628
session = zenoh.open()
629
publisher = session.declare_publisher("input/data")
630
631
for i in range(20):
632
data = {
633
"sequence": zext.UInt32(i),
634
"value": f"data_item_{i}",
635
"timestamp": zext.Float64(time.time())
636
}
637
638
serialized = zext.z_serialize(data)
639
publisher.put(serialized)
640
641
print(f"Generated data item {i}")
642
time.sleep(0.5)
643
644
publisher.undeclare()
645
session.close()
646
print("Data generator finished")
647
648
def result_monitor():
649
"""Monitor processing results"""
650
session = zenoh.open()
651
652
def result_handler(sample):
653
try:
654
result = zext.z_deserialize(dict, sample.payload)
655
print(f"MONITOR: Processor {int(result['processor_id'])} "
656
f"completed #{int(result['processed_count'])}: {result['result']}")
657
except zext.ZDeserializeError as e:
658
print(f"Monitor deserialization error: {e}")
659
660
subscriber = session.declare_subscriber("results/**", result_handler)
661
662
time.sleep(15) # Monitor for 15 seconds
663
664
subscriber.undeclare()
665
session.close()
666
print("Result monitor stopped")
667
668
def main():
669
"""Main example execution"""
670
print("Starting advanced Zenoh extension example...")
671
672
# Create processors
673
processor1 = AdvancedDataProcessor(1)
674
processor2 = AdvancedDataProcessor(2)
675
676
try:
677
# Start processors
678
processor1.start_processing()
679
processor2.start_processing()
680
681
# Start monitoring and data generation in separate threads
682
monitor_thread = threading.Thread(target=result_monitor)
683
generator_thread = threading.Thread(target=data_generator)
684
685
monitor_thread.start()
686
time.sleep(1) # Let monitor start first
687
generator_thread.start()
688
689
# Wait for completion
690
generator_thread.join()
691
monitor_thread.join(timeout=5)
692
693
except KeyboardInterrupt:
694
print("Shutting down...")
695
696
finally:
697
processor1.stop()
698
processor2.stop()
699
700
print("Advanced extension example completed")
701
702
if __name__ == "__main__":
703
main()
704
```