or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md

handlers.mddocs/

0

# Handler System

1

2

Zenoh's handler system provides flexible mechanisms for processing asynchronous data streams from subscribers, queryables, and other communication patterns. The handler system supports both Rust-style channels and Python-style callbacks, enabling efficient data processing patterns suitable for different application architectures.

3

4

## Capabilities

5

6

### Handler Interface

7

8

The base handler interface that all concrete handlers implement.

9

10

```python { .api }

11

from zenoh.handlers import Handler

12

13

class Handler:

14

"""Base handler interface for processing data streams"""

15

16

def try_recv(self):

17

"""

18

Try to receive data without blocking.

19

20

Returns:

21

Data item if available, or raises exception if no data

22

"""

23

24

def recv(self):

25

"""

26

Receive data (blocking operation).

27

28

Returns:

29

Next data item from the handler

30

"""

31

32

def __iter__(self):

33

"""

34

Iterate over received data items.

35

36

Returns:

37

Iterator that yields data items as they arrive

38

"""

39

40

def __next__(self):

41

"""

42

Get next data item for iterator protocol.

43

44

Returns:

45

Next data item or raises StopIteration

46

"""

47

```

48

49

### Default Handler

50

51

FIFO queue handler with automatic capacity management.

52

53

```python { .api }

54

from zenoh.handlers import DefaultHandler

55

56

class DefaultHandler:

57

"""Default FIFO handler with unlimited capacity"""

58

59

def try_recv(self):

60

"""Try to receive without blocking"""

61

62

def recv(self):

63

"""Receive data (blocking)"""

64

65

def __iter__(self):

66

"""Iterate over received data"""

67

68

def __next__(self):

69

"""Iterator protocol implementation"""

70

```

71

72

### FIFO Channel Handler

73

74

First-in-first-out channel with configurable capacity.

75

76

```python { .api }

77

from zenoh.handlers import FifoChannel

78

79

class FifoChannel:

80

"""FIFO channel handler with configurable capacity"""

81

82

def __init__(self, capacity: int):

83

"""

84

Create FIFO channel with specified capacity.

85

86

Parameters:

87

- capacity: Maximum number of items to buffer

88

"""

89

90

def try_recv(self):

91

"""Try to receive without blocking"""

92

93

def recv(self):

94

"""Receive data (blocking)"""

95

96

def __iter__(self):

97

"""Iterate over received data"""

98

99

def __next__(self):

100

"""Iterator protocol implementation"""

101

```

102

103

### Ring Channel Handler

104

105

Ring buffer handler that overwrites oldest data when capacity is exceeded.

106

107

```python { .api }

108

from zenoh.handlers import RingChannel

109

110

class RingChannel:

111

"""Ring buffer handler with configurable capacity"""

112

113

def __init__(self, capacity: int):

114

"""

115

Create ring buffer channel with specified capacity.

116

117

Parameters:

118

- capacity: Maximum number of items to buffer (older items overwritten)

119

"""

120

121

def try_recv(self):

122

"""Try to receive without blocking"""

123

124

def recv(self):

125

"""Receive data (blocking)"""

126

127

def __iter__(self):

128

"""Iterate over received data"""

129

130

def __next__(self):

131

"""Iterator protocol implementation"""

132

```

133

134

### Callback Handler

135

136

Python callback-based handler for immediate processing.

137

138

```python { .api }

139

from zenoh.handlers import Callback

140

141

class Callback:

142

"""Callback handler for immediate data processing"""

143

144

def __init__(

145

self,

146

callback: callable,

147

drop: callable = None,

148

indirect: bool = True

149

):

150

"""

151

Create callback handler.

152

153

Parameters:

154

- callback: Function to call for each data item

155

- drop: Optional cleanup function when handler is dropped

156

- indirect: Whether to use indirect callback invocation

157

"""

158

159

@property

160

def callback(self) -> callable:

161

"""Get the callback function"""

162

163

@property

164

def drop(self) -> callable:

165

"""Get the drop callback function"""

166

167

@property

168

def indirect(self) -> bool:

169

"""Get indirect flag"""

170

171

def try_recv(self):

172

"""Try to receive without blocking (not applicable for callbacks)"""

173

174

def recv(self):

175

"""Receive data (not applicable for callbacks)"""

176

177

def __iter__(self):

178

"""Iterate over received data (not applicable for callbacks)"""

179

180

def __next__(self):

181

"""Iterator protocol (not applicable for callbacks)"""

182

```

183

184

### Handler Usage in Zenoh Operations

185

186

Handlers are used throughout the Zenoh API for asynchronous data processing.

187

188

```python { .api }

189

# Handler usage patterns in Zenoh operations

190

191

# Subscriber with different handler types

192

def declare_subscriber(self, key_expr, handler=None, **kwargs):

193

"""

194

Declare subscriber with flexible handler options.

195

196

Handler options:

197

- None: Returns subscriber with DefaultHandler

198

- callable: Python function - creates Callback handler

199

- tuple(callable, handler): Python callback with custom handler

200

- Handler instance: Uses provided handler directly

201

"""

202

203

# Queryable with handler

204

def declare_queryable(self, key_expr, handler, **kwargs):

205

"""

206

Declare queryable with handler for processing queries.

207

208

Handler receives Query objects for processing and replying.

209

"""

210

211

# Scout with handler

212

def scout(what=None, timeout=None, handler=None):

213

"""

214

Scout with handler for discovery messages.

215

216

Handler receives Hello messages from discovered nodes.

217

"""

218

```

219

220

## Usage Examples

221

222

### Default Handler Usage

223

224

```python

225

import zenoh

226

from zenoh.handlers import DefaultHandler

227

228

session = zenoh.open()

229

230

# Subscriber with default handler (implicit)

231

subscriber = session.declare_subscriber("sensors/temperature")

232

233

# Process data using iterator pattern

234

print("Listening for temperature data...")

235

for sample in subscriber:

236

temp = float(sample.payload.to_string())

237

print(f"Temperature: {temp}°C from {sample.key_expr}")

238

239

if temp > 30: # Stop listening if too hot

240

break

241

242

subscriber.undeclare()

243

session.close()

244

```

245

246

### FIFO Channel Handler

247

248

```python

249

import zenoh

250

from zenoh.handlers import FifoChannel

251

252

session = zenoh.open()

253

254

# Create FIFO channel with limited capacity

255

fifo_handler = FifoChannel(capacity=10)

256

257

# Subscriber with FIFO handler

258

subscriber = session.declare_subscriber("data/stream", fifo_handler)

259

260

print("Processing with FIFO channel...")

261

262

# Non-blocking processing

263

while True:

264

try:

265

sample = subscriber.try_recv()

266

data = sample.payload.to_string()

267

print(f"Processed: {data}")

268

except:

269

print("No data available, doing other work...")

270

import time

271

time.sleep(0.1)

272

break

273

274

# Blocking processing

275

print("Switching to blocking mode...")

276

for i in range(5):

277

sample = subscriber.recv() # Blocks until data available

278

print(f"Received: {sample.payload.to_string()}")

279

280

subscriber.undeclare()

281

session.close()

282

```

283

284

### Ring Channel Handler

285

286

```python

287

import zenoh

288

from zenoh.handlers import RingChannel

289

import time

290

291

session = zenoh.open()

292

293

# Create ring buffer - newest data overwrites oldest

294

ring_handler = RingChannel(capacity=5)

295

296

# Subscriber with ring buffer handler

297

subscriber = session.declare_subscriber("high_frequency/data", ring_handler)

298

299

# Publisher to generate high-frequency data

300

publisher = session.declare_publisher("high_frequency/data")

301

302

# Generate data faster than we can process

303

def generate_data():

304

for i in range(20):

305

publisher.put(f"data_point_{i}")

306

time.sleep(0.01) # Very fast publishing

307

308

import threading

309

generator = threading.Thread(target=generate_data)

310

generator.start()

311

312

# Slow processing - ring buffer will drop old data

313

time.sleep(0.5) # Let data accumulate

314

315

print("Processing with ring buffer (may miss some data):")

316

while True:

317

try:

318

sample = subscriber.try_recv()

319

data = sample.payload.to_string()

320

print(f"Got: {data}")

321

time.sleep(0.1) # Slow processing

322

except:

323

break

324

325

generator.join()

326

publisher.undeclare()

327

subscriber.undeclare()

328

session.close()

329

```

330

331

### Callback Handler

332

333

```python

334

import zenoh

335

from zenoh.handlers import Callback

336

337

session = zenoh.open()

338

339

# Create callback for immediate processing

340

def data_callback(sample):

341

data = sample.payload.to_string()

342

timestamp = sample.timestamp

343

print(f"Immediate processing: {data} at {timestamp}")

344

345

# Process data immediately in callback context

346

if "error" in data.lower():

347

print("ERROR DETECTED - taking immediate action!")

348

349

def cleanup_callback():

350

print("Callback handler cleanup")

351

352

# Create callback handler

353

callback_handler = Callback(

354

callback=data_callback,

355

drop=cleanup_callback,

356

indirect=True

357

)

358

359

# Subscriber with callback handler

360

subscriber = session.declare_subscriber("alerts/system", callback_handler)

361

362

# Simulate some data

363

publisher = session.declare_publisher("alerts/system")

364

365

import time

366

publisher.put("System status: OK")

367

time.sleep(0.1)

368

publisher.put("System status: ERROR detected")

369

time.sleep(0.1)

370

publisher.put("System status: Recovered")

371

time.sleep(0.5)

372

373

publisher.undeclare()

374

subscriber.undeclare() # Will trigger cleanup_callback

375

session.close()

376

```

377

378

### Python Function as Handler

379

380

```python

381

import zenoh

382

383

session = zenoh.open()

384

385

# Simple Python function as handler

386

def temperature_handler(sample):

387

temp = float(sample.payload.to_string())

388

location = str(sample.key_expr).split('/')[-1]

389

390

print(f"Temperature in {location}: {temp}°C")

391

392

if temp > 25:

393

print(f" -> {location} is warm!")

394

elif temp < 15:

395

print(f" -> {location} is cold!")

396

397

# Zenoh automatically creates Callback handler

398

subscriber = session.declare_subscriber(

399

"sensors/temperature/*",

400

temperature_handler

401

)

402

403

# Test data

404

publisher = session.declare_publisher("sensors/temperature/room1")

405

publisher.put("23.5")

406

407

publisher2 = session.declare_publisher("sensors/temperature/outside")

408

publisher2.put("12.3")

409

410

publisher3 = session.declare_publisher("sensors/temperature/office")

411

publisher3.put("26.8")

412

413

import time

414

time.sleep(1)

415

416

publisher.undeclare()

417

publisher2.undeclare()

418

publisher3.undeclare()

419

subscriber.undeclare()

420

session.close()

421

```

422

423

### Handler with Custom Processing Logic

424

425

```python

426

import zenoh

427

from zenoh.handlers import FifoChannel

428

import json

429

import threading

430

import queue

431

432

class ProcessingHandler:

433

"""Custom handler with background processing"""

434

435

def __init__(self, max_batch_size=5):

436

self.fifo = FifoChannel(capacity=100)

437

self.batch_queue = queue.Queue()

438

self.max_batch_size = max_batch_size

439

self.processing_thread = None

440

self.running = False

441

442

def start_processing(self):

443

"""Start background processing thread"""

444

self.running = True

445

self.processing_thread = threading.Thread(target=self._process_batches)

446

self.processing_thread.start()

447

448

def stop_processing(self):

449

"""Stop background processing"""

450

self.running = False

451

if self.processing_thread:

452

self.processing_thread.join()

453

454

def _process_batches(self):

455

"""Background thread for batch processing"""

456

batch = []

457

458

while self.running:

459

try:

460

# Collect samples into batches

461

sample = self.fifo.try_recv()

462

batch.append(sample)

463

464

if len(batch) >= self.max_batch_size:

465

self._process_batch(batch)

466

batch = []

467

468

except:

469

# No data available, process current batch if any

470

if batch:

471

self._process_batch(batch)

472

batch = []

473

import time

474

time.sleep(0.1)

475

476

def _process_batch(self, batch):

477

"""Process a batch of samples"""

478

values = []

479

for sample in batch:

480

try:

481

data = json.loads(sample.payload.to_string())

482

values.append(data['value'])

483

except:

484

continue

485

486

if values:

487

avg = sum(values) / len(values)

488

print(f"Batch processed: {len(values)} samples, average = {avg:.2f}")

489

490

# Usage

491

session = zenoh.open()

492

493

# Create custom processing handler

494

processor = ProcessingHandler(max_batch_size=3)

495

496

# Use the FIFO part of our custom handler

497

subscriber = session.declare_subscriber("data/batch", processor.fifo)

498

499

# Start background processing

500

processor.start_processing()

501

502

# Generate test data

503

publisher = session.declare_publisher("data/batch")

504

505

for i in range(10):

506

data = {"sequence": i, "value": i * 2.5 + 10}

507

publisher.put(json.dumps(data))

508

import time

509

time.sleep(0.2)

510

511

# Let processing complete

512

time.sleep(2)

513

514

# Cleanup

515

processor.stop_processing()

516

publisher.undeclare()

517

subscriber.undeclare()

518

session.close()

519

```

520

521

### Advanced Handler Composition

522

523

```python

524

import zenoh

525

from zenoh.handlers import FifoChannel, Callback

526

import threading

527

import time

528

529

class CompositeHandler:

530

"""Handler that combines multiple processing strategies"""

531

532

def __init__(self):

533

# Primary handler for normal processing

534

self.primary = FifoChannel(capacity=50)

535

536

# Secondary handler for immediate alerts

537

self.alert_callback = Callback(

538

callback=self._handle_alert,

539

indirect=True

540

)

541

542

self.alert_keywords = ["error", "critical", "failure"]

543

544

def _handle_alert(self, sample):

545

"""Immediate alert processing"""

546

data = sample.payload.to_string().lower()

547

548

for keyword in self.alert_keywords:

549

if keyword in data:

550

print(f"🚨 ALERT: {sample.payload.to_string()}")

551

# Could send notifications, emails, etc.

552

break

553

554

def get_primary_handler(self):

555

"""Get handler for normal data processing"""

556

return self.primary

557

558

def get_alert_handler(self):

559

"""Get handler for alert processing"""

560

return self.alert_callback

561

562

# Usage

563

session = zenoh.open()

564

565

# Create composite handler

566

composite = CompositeHandler()

567

568

# Subscribe to normal data stream

569

data_subscriber = session.declare_subscriber(

570

"system/logs",

571

composite.get_primary_handler()

572

)

573

574

# Subscribe to same stream for alerts (separate subscription)

575

alert_subscriber = session.declare_subscriber(

576

"system/logs",

577

composite.get_alert_handler()

578

)

579

580

# Background thread for normal processing

581

def normal_processing():

582

print("Normal processing started...")

583

584

while True:

585

try:

586

sample = data_subscriber.try_recv()

587

data = sample.payload.to_string()

588

589

# Normal processing (logging, analysis, etc.)

590

if "error" not in data.lower():

591

print(f"Processing: {data}")

592

593

except:

594

time.sleep(0.1)

595

continue

596

597

# Start normal processing thread

598

processing_thread = threading.Thread(target=normal_processing)

599

processing_thread.daemon = True

600

processing_thread.start()

601

602

# Generate test data

603

publisher = session.declare_publisher("system/logs")

604

605

test_messages = [

606

"System startup completed",

607

"User login: alice",

608

"ERROR: Database connection failed",

609

"Processing batch job #1234",

610

"CRITICAL: Disk space low",

611

"User logout: alice",

612

"System maintenance scheduled"

613

]

614

615

for msg in test_messages:

616

publisher.put(msg)

617

time.sleep(0.5)

618

619

# Let processing continue

620

time.sleep(3)

621

622

# Cleanup

623

publisher.undeclare()

624

data_subscriber.undeclare()

625

alert_subscriber.undeclare()

626

session.close()

627

```

628

629

## Handler Selection Guidelines

630

631

### When to Use Each Handler Type

632

633

**DefaultHandler:**

634

- General-purpose data processing

635

- Simple applications with moderate data rates

636

- When you don't need specific capacity control

637

638

**FifoChannel:**

639

- When you need bounded memory usage

640

- Applications that can tolerate data loss under high load

641

- Batch processing scenarios

642

643

**RingChannel:**

644

- High-frequency data where only recent values matter

645

- Memory-constrained environments

646

- Real-time systems prioritizing latest data

647

648

**Callback:**

649

- Immediate processing requirements

650

- Event-driven architectures

651

- Alert and notification systems

652

- Low-latency response needs

653

654

**Python Functions:**

655

- Simple processing logic

656

- Rapid prototyping

657

- Educational examples

658

- When callback functionality is sufficient

659

660

### Performance Considerations

661

662

1. **Callback handlers** have the lowest latency but block the receiving thread

663

2. **Channel handlers** provide buffering but add some overhead

664

3. **Ring channels** are most memory-efficient for high-frequency data

665

4. **FIFO channels** provide guaranteed ordering but may consume more memory

666

5. **Custom handlers** allow optimization for specific use cases but require more implementation effort