0
# Handler System
1
2
Zenoh's handler system provides flexible mechanisms for processing asynchronous data streams from subscribers, queryables, and other communication patterns. The handler system supports both Rust-style channels and Python-style callbacks, enabling efficient data processing patterns suitable for different application architectures.
3
4
## Capabilities
5
6
### Handler Interface
7
8
The base handler interface that all concrete handlers implement.
9
10
```python { .api }
11
from zenoh.handlers import Handler
12
13
class Handler:
14
"""Base handler interface for processing data streams"""
15
16
def try_recv(self):
17
"""
18
Try to receive data without blocking.
19
20
Returns:
21
Data item if available, or raises exception if no data
22
"""
23
24
def recv(self):
25
"""
26
Receive data (blocking operation).
27
28
Returns:
29
Next data item from the handler
30
"""
31
32
def __iter__(self):
33
"""
34
Iterate over received data items.
35
36
Returns:
37
Iterator that yields data items as they arrive
38
"""
39
40
def __next__(self):
41
"""
42
Get next data item for iterator protocol.
43
44
Returns:
45
Next data item or raises StopIteration
46
"""
47
```
48
49
### Default Handler
50
51
FIFO queue handler with automatic capacity management.
52
53
```python { .api }
54
from zenoh.handlers import DefaultHandler
55
56
class DefaultHandler:
57
"""Default FIFO handler with unlimited capacity"""
58
59
def try_recv(self):
60
"""Try to receive without blocking"""
61
62
def recv(self):
63
"""Receive data (blocking)"""
64
65
def __iter__(self):
66
"""Iterate over received data"""
67
68
def __next__(self):
69
"""Iterator protocol implementation"""
70
```
71
72
### FIFO Channel Handler
73
74
First-in-first-out channel with configurable capacity.
75
76
```python { .api }
77
from zenoh.handlers import FifoChannel
78
79
class FifoChannel:
80
"""FIFO channel handler with configurable capacity"""
81
82
def __init__(self, capacity: int):
83
"""
84
Create FIFO channel with specified capacity.
85
86
Parameters:
87
- capacity: Maximum number of items to buffer
88
"""
89
90
def try_recv(self):
91
"""Try to receive without blocking"""
92
93
def recv(self):
94
"""Receive data (blocking)"""
95
96
def __iter__(self):
97
"""Iterate over received data"""
98
99
def __next__(self):
100
"""Iterator protocol implementation"""
101
```
102
103
### Ring Channel Handler
104
105
Ring buffer handler that overwrites oldest data when capacity is exceeded.
106
107
```python { .api }
108
from zenoh.handlers import RingChannel
109
110
class RingChannel:
111
"""Ring buffer handler with configurable capacity"""
112
113
def __init__(self, capacity: int):
114
"""
115
Create ring buffer channel with specified capacity.
116
117
Parameters:
118
- capacity: Maximum number of items to buffer (older items overwritten)
119
"""
120
121
def try_recv(self):
122
"""Try to receive without blocking"""
123
124
def recv(self):
125
"""Receive data (blocking)"""
126
127
def __iter__(self):
128
"""Iterate over received data"""
129
130
def __next__(self):
131
"""Iterator protocol implementation"""
132
```
133
134
### Callback Handler
135
136
Python callback-based handler for immediate processing.
137
138
```python { .api }
139
from zenoh.handlers import Callback
140
141
class Callback:
142
"""Callback handler for immediate data processing"""
143
144
def __init__(
145
self,
146
callback: callable,
147
drop: callable = None,
148
indirect: bool = True
149
):
150
"""
151
Create callback handler.
152
153
Parameters:
154
- callback: Function to call for each data item
155
- drop: Optional cleanup function when handler is dropped
156
- indirect: Whether to use indirect callback invocation
157
"""
158
159
@property
160
def callback(self) -> callable:
161
"""Get the callback function"""
162
163
@property
164
def drop(self) -> callable:
165
"""Get the drop callback function"""
166
167
@property
168
def indirect(self) -> bool:
169
"""Get indirect flag"""
170
171
def try_recv(self):
172
"""Try to receive without blocking (not applicable for callbacks)"""
173
174
def recv(self):
175
"""Receive data (not applicable for callbacks)"""
176
177
def __iter__(self):
178
"""Iterate over received data (not applicable for callbacks)"""
179
180
def __next__(self):
181
"""Iterator protocol (not applicable for callbacks)"""
182
```
183
184
### Handler Usage in Zenoh Operations
185
186
Handlers are used throughout the Zenoh API for asynchronous data processing.
187
188
```python { .api }
189
# Handler usage patterns in Zenoh operations
190
191
# Subscriber with different handler types
192
def declare_subscriber(self, key_expr, handler=None, **kwargs):
193
"""
194
Declare subscriber with flexible handler options.
195
196
Handler options:
197
- None: Returns subscriber with DefaultHandler
198
- callable: Python function - creates Callback handler
199
- tuple(callable, handler): Python callback with custom handler
200
- Handler instance: Uses provided handler directly
201
"""
202
203
# Queryable with handler
204
def declare_queryable(self, key_expr, handler, **kwargs):
205
"""
206
Declare queryable with handler for processing queries.
207
208
Handler receives Query objects for processing and replying.
209
"""
210
211
# Scout with handler
212
def scout(what=None, timeout=None, handler=None):
213
"""
214
Scout with handler for discovery messages.
215
216
Handler receives Hello messages from discovered nodes.
217
"""
218
```
219
220
## Usage Examples
221
222
### Default Handler Usage
223
224
```python
225
import zenoh
226
from zenoh.handlers import DefaultHandler
227
228
session = zenoh.open()
229
230
# Subscriber with default handler (implicit)
231
subscriber = session.declare_subscriber("sensors/temperature")
232
233
# Process data using iterator pattern
234
print("Listening for temperature data...")
235
for sample in subscriber:
236
temp = float(sample.payload.to_string())
237
print(f"Temperature: {temp}°C from {sample.key_expr}")
238
239
if temp > 30: # Stop listening if too hot
240
break
241
242
subscriber.undeclare()
243
session.close()
244
```
245
246
### FIFO Channel Handler
247
248
```python
249
import zenoh
250
from zenoh.handlers import FifoChannel
251
252
session = zenoh.open()
253
254
# Create FIFO channel with limited capacity
255
fifo_handler = FifoChannel(capacity=10)
256
257
# Subscriber with FIFO handler
258
subscriber = session.declare_subscriber("data/stream", fifo_handler)
259
260
print("Processing with FIFO channel...")
261
262
# Non-blocking processing
263
while True:
264
try:
265
sample = subscriber.try_recv()
266
data = sample.payload.to_string()
267
print(f"Processed: {data}")
268
except:
269
print("No data available, doing other work...")
270
import time
271
time.sleep(0.1)
272
break
273
274
# Blocking processing
275
print("Switching to blocking mode...")
276
for i in range(5):
277
sample = subscriber.recv() # Blocks until data available
278
print(f"Received: {sample.payload.to_string()}")
279
280
subscriber.undeclare()
281
session.close()
282
```
283
284
### Ring Channel Handler
285
286
```python
287
import zenoh
288
from zenoh.handlers import RingChannel
289
import time
290
291
session = zenoh.open()
292
293
# Create ring buffer - newest data overwrites oldest
294
ring_handler = RingChannel(capacity=5)
295
296
# Subscriber with ring buffer handler
297
subscriber = session.declare_subscriber("high_frequency/data", ring_handler)
298
299
# Publisher to generate high-frequency data
300
publisher = session.declare_publisher("high_frequency/data")
301
302
# Generate data faster than we can process
303
def generate_data():
304
for i in range(20):
305
publisher.put(f"data_point_{i}")
306
time.sleep(0.01) # Very fast publishing
307
308
import threading
309
generator = threading.Thread(target=generate_data)
310
generator.start()
311
312
# Slow processing - ring buffer will drop old data
313
time.sleep(0.5) # Let data accumulate
314
315
print("Processing with ring buffer (may miss some data):")
316
while True:
317
try:
318
sample = subscriber.try_recv()
319
data = sample.payload.to_string()
320
print(f"Got: {data}")
321
time.sleep(0.1) # Slow processing
322
except:
323
break
324
325
generator.join()
326
publisher.undeclare()
327
subscriber.undeclare()
328
session.close()
329
```
330
331
### Callback Handler
332
333
```python
334
import zenoh
335
from zenoh.handlers import Callback
336
337
session = zenoh.open()
338
339
# Create callback for immediate processing
340
def data_callback(sample):
341
data = sample.payload.to_string()
342
timestamp = sample.timestamp
343
print(f"Immediate processing: {data} at {timestamp}")
344
345
# Process data immediately in callback context
346
if "error" in data.lower():
347
print("ERROR DETECTED - taking immediate action!")
348
349
def cleanup_callback():
350
print("Callback handler cleanup")
351
352
# Create callback handler
353
callback_handler = Callback(
354
callback=data_callback,
355
drop=cleanup_callback,
356
indirect=True
357
)
358
359
# Subscriber with callback handler
360
subscriber = session.declare_subscriber("alerts/system", callback_handler)
361
362
# Simulate some data
363
publisher = session.declare_publisher("alerts/system")
364
365
import time
366
publisher.put("System status: OK")
367
time.sleep(0.1)
368
publisher.put("System status: ERROR detected")
369
time.sleep(0.1)
370
publisher.put("System status: Recovered")
371
time.sleep(0.5)
372
373
publisher.undeclare()
374
subscriber.undeclare() # Will trigger cleanup_callback
375
session.close()
376
```
377
378
### Python Function as Handler
379
380
```python
381
import zenoh
382
383
session = zenoh.open()
384
385
# Simple Python function as handler
386
def temperature_handler(sample):
387
temp = float(sample.payload.to_string())
388
location = str(sample.key_expr).split('/')[-1]
389
390
print(f"Temperature in {location}: {temp}°C")
391
392
if temp > 25:
393
print(f" -> {location} is warm!")
394
elif temp < 15:
395
print(f" -> {location} is cold!")
396
397
# Zenoh automatically creates Callback handler
398
subscriber = session.declare_subscriber(
399
"sensors/temperature/*",
400
temperature_handler
401
)
402
403
# Test data
404
publisher = session.declare_publisher("sensors/temperature/room1")
405
publisher.put("23.5")
406
407
publisher2 = session.declare_publisher("sensors/temperature/outside")
408
publisher2.put("12.3")
409
410
publisher3 = session.declare_publisher("sensors/temperature/office")
411
publisher3.put("26.8")
412
413
import time
414
time.sleep(1)
415
416
publisher.undeclare()
417
publisher2.undeclare()
418
publisher3.undeclare()
419
subscriber.undeclare()
420
session.close()
421
```
422
423
### Handler with Custom Processing Logic
424
425
```python
426
import zenoh
427
from zenoh.handlers import FifoChannel
428
import json
429
import threading
430
import queue
431
432
class ProcessingHandler:
433
"""Custom handler with background processing"""
434
435
def __init__(self, max_batch_size=5):
436
self.fifo = FifoChannel(capacity=100)
437
self.batch_queue = queue.Queue()
438
self.max_batch_size = max_batch_size
439
self.processing_thread = None
440
self.running = False
441
442
def start_processing(self):
443
"""Start background processing thread"""
444
self.running = True
445
self.processing_thread = threading.Thread(target=self._process_batches)
446
self.processing_thread.start()
447
448
def stop_processing(self):
449
"""Stop background processing"""
450
self.running = False
451
if self.processing_thread:
452
self.processing_thread.join()
453
454
def _process_batches(self):
455
"""Background thread for batch processing"""
456
batch = []
457
458
while self.running:
459
try:
460
# Collect samples into batches
461
sample = self.fifo.try_recv()
462
batch.append(sample)
463
464
if len(batch) >= self.max_batch_size:
465
self._process_batch(batch)
466
batch = []
467
468
except:
469
# No data available, process current batch if any
470
if batch:
471
self._process_batch(batch)
472
batch = []
473
import time
474
time.sleep(0.1)
475
476
def _process_batch(self, batch):
477
"""Process a batch of samples"""
478
values = []
479
for sample in batch:
480
try:
481
data = json.loads(sample.payload.to_string())
482
values.append(data['value'])
483
except:
484
continue
485
486
if values:
487
avg = sum(values) / len(values)
488
print(f"Batch processed: {len(values)} samples, average = {avg:.2f}")
489
490
# Usage
491
session = zenoh.open()
492
493
# Create custom processing handler
494
processor = ProcessingHandler(max_batch_size=3)
495
496
# Use the FIFO part of our custom handler
497
subscriber = session.declare_subscriber("data/batch", processor.fifo)
498
499
# Start background processing
500
processor.start_processing()
501
502
# Generate test data
503
publisher = session.declare_publisher("data/batch")
504
505
for i in range(10):
506
data = {"sequence": i, "value": i * 2.5 + 10}
507
publisher.put(json.dumps(data))
508
import time
509
time.sleep(0.2)
510
511
# Let processing complete
512
time.sleep(2)
513
514
# Cleanup
515
processor.stop_processing()
516
publisher.undeclare()
517
subscriber.undeclare()
518
session.close()
519
```
520
521
### Advanced Handler Composition
522
523
```python
524
import zenoh
525
from zenoh.handlers import FifoChannel, Callback
526
import threading
527
import time
528
529
class CompositeHandler:
530
"""Handler that combines multiple processing strategies"""
531
532
def __init__(self):
533
# Primary handler for normal processing
534
self.primary = FifoChannel(capacity=50)
535
536
# Secondary handler for immediate alerts
537
self.alert_callback = Callback(
538
callback=self._handle_alert,
539
indirect=True
540
)
541
542
self.alert_keywords = ["error", "critical", "failure"]
543
544
def _handle_alert(self, sample):
545
"""Immediate alert processing"""
546
data = sample.payload.to_string().lower()
547
548
for keyword in self.alert_keywords:
549
if keyword in data:
550
print(f"🚨 ALERT: {sample.payload.to_string()}")
551
# Could send notifications, emails, etc.
552
break
553
554
def get_primary_handler(self):
555
"""Get handler for normal data processing"""
556
return self.primary
557
558
def get_alert_handler(self):
559
"""Get handler for alert processing"""
560
return self.alert_callback
561
562
# Usage
563
session = zenoh.open()
564
565
# Create composite handler
566
composite = CompositeHandler()
567
568
# Subscribe to normal data stream
569
data_subscriber = session.declare_subscriber(
570
"system/logs",
571
composite.get_primary_handler()
572
)
573
574
# Subscribe to same stream for alerts (separate subscription)
575
alert_subscriber = session.declare_subscriber(
576
"system/logs",
577
composite.get_alert_handler()
578
)
579
580
# Background thread for normal processing
581
def normal_processing():
582
print("Normal processing started...")
583
584
while True:
585
try:
586
sample = data_subscriber.try_recv()
587
data = sample.payload.to_string()
588
589
# Normal processing (logging, analysis, etc.)
590
if "error" not in data.lower():
591
print(f"Processing: {data}")
592
593
except:
594
time.sleep(0.1)
595
continue
596
597
# Start normal processing thread
598
processing_thread = threading.Thread(target=normal_processing)
599
processing_thread.daemon = True
600
processing_thread.start()
601
602
# Generate test data
603
publisher = session.declare_publisher("system/logs")
604
605
test_messages = [
606
"System startup completed",
607
"User login: alice",
608
"ERROR: Database connection failed",
609
"Processing batch job #1234",
610
"CRITICAL: Disk space low",
611
"User logout: alice",
612
"System maintenance scheduled"
613
]
614
615
for msg in test_messages:
616
publisher.put(msg)
617
time.sleep(0.5)
618
619
# Let processing continue
620
time.sleep(3)
621
622
# Cleanup
623
publisher.undeclare()
624
data_subscriber.undeclare()
625
alert_subscriber.undeclare()
626
session.close()
627
```
628
629
## Handler Selection Guidelines
630
631
### When to Use Each Handler Type
632
633
**DefaultHandler:**
634
- General-purpose data processing
635
- Simple applications with moderate data rates
636
- When you don't need specific capacity control
637
638
**FifoChannel:**
639
- When you need bounded memory usage
640
- Applications that can tolerate data loss under high load
641
- Batch processing scenarios
642
643
**RingChannel:**
644
- High-frequency data where only recent values matter
645
- Memory-constrained environments
646
- Real-time systems prioritizing latest data
647
648
**Callback:**
649
- Immediate processing requirements
650
- Event-driven architectures
651
- Alert and notification systems
652
- Low-latency response needs
653
654
**Python Functions:**
655
- Simple processing logic
656
- Rapid prototyping
657
- Educational examples
658
- When callback functionality is sufficient
659
660
### Performance Considerations
661
662
1. **Callback handlers** have the lowest latency but block the receiving thread
663
2. **Channel handlers** provide buffering but add some overhead
664
3. **Ring channels** are most memory-efficient for high-frequency data
665
4. **FIFO channels** provide guaranteed ordering but may consume more memory
666
5. **Custom handlers** allow optimization for specific use cases but require more implementation effort