0
# Publisher/Subscriber Pattern
1
2
The Publisher/Subscriber pattern enables real-time data streaming in Zenoh applications. Publishers send data to specific key expressions, while Subscribers receive data matching their subscription patterns. This decoupled messaging pattern supports high-throughput, low-latency communication with flexible quality of service controls.
3
4
## Capabilities
5
6
### Publisher
7
8
Publishers send data to specific key expressions with configurable quality of service parameters.
9
10
```python { .api }
11
def declare_publisher(
12
self,
13
key_expr,
14
encoding: Encoding = None,
15
congestion_control: CongestionControl = None,
16
priority: Priority = None,
17
reliability: Reliability = None
18
) -> Publisher:
19
"""
20
Declare a publisher for a key expression.
21
22
Parameters:
23
- key_expr: Key expression to publish on
24
- encoding: Data encoding specification
25
- congestion_control: How to handle network congestion
26
- priority: Message priority level
27
- reliability: Reliability mode for message delivery
28
29
Returns:
30
Publisher object for sending data
31
"""
32
33
class Publisher:
34
"""Publisher for data streams"""
35
36
@property
37
def key_expr(self) -> KeyExpr:
38
"""Get the publisher's key expression"""
39
40
@property
41
def encoding(self) -> Encoding:
42
"""Get the publisher's encoding"""
43
44
@property
45
def congestion_control(self) -> CongestionControl:
46
"""Get congestion control setting"""
47
48
@property
49
def priority(self) -> Priority:
50
"""Get priority setting"""
51
52
@property
53
def reliability(self) -> Reliability:
54
"""Get reliability setting"""
55
56
@property
57
def matching_status(self) -> MatchingStatus:
58
"""Get current matching status"""
59
60
def put(
61
self,
62
payload,
63
encoding: Encoding = None,
64
timestamp: Timestamp = None,
65
attachment = None
66
) -> None:
67
"""
68
Send data through the publisher.
69
70
Parameters:
71
- payload: Data to send (str, bytes, or ZBytes)
72
- encoding: Override default encoding
73
- timestamp: Custom timestamp for the data
74
- attachment: Additional metadata
75
"""
76
77
def delete(
78
self,
79
timestamp: Timestamp = None,
80
attachment = None
81
) -> None:
82
"""
83
Send a delete operation.
84
85
Parameters:
86
- timestamp: Custom timestamp for the delete
87
- attachment: Additional metadata
88
"""
89
90
def undeclare(self) -> None:
91
"""Undeclare the publisher and release resources"""
92
93
def declare_matching_listener(self, handler) -> MatchingListener:
94
"""Declare a listener for matching status changes"""
95
```
96
97
### Subscriber
98
99
Subscribers receive data matching their subscription key expressions through configurable handlers.
100
101
```python { .api }
102
def declare_subscriber(
103
self,
104
key_expr,
105
handler = None,
106
reliability: Reliability = None,
107
locality: Locality = None
108
) -> Subscriber:
109
"""
110
Declare a subscriber for a key expression.
111
112
Parameters:
113
- key_expr: Key expression pattern to subscribe to
114
- handler: Handler for received samples (callback, channel, etc.)
115
- reliability: Reliability mode for receiving data
116
- locality: Locality constraint for data sources
117
118
Returns:
119
Subscriber object for receiving data
120
"""
121
122
class Subscriber:
123
"""Subscriber with generic handler"""
124
125
@property
126
def key_expr(self) -> KeyExpr:
127
"""Get the subscriber's key expression"""
128
129
@property
130
def handler(self):
131
"""Get the subscriber's handler"""
132
133
def undeclare(self) -> None:
134
"""Undeclare the subscriber and release resources"""
135
136
def try_recv(self):
137
"""Try to receive a sample without blocking"""
138
139
def recv(self):
140
"""Receive a sample (blocking)"""
141
142
def __iter__(self):
143
"""Iterate over received samples"""
144
```
145
146
### Sample Data
147
148
Data samples received by subscribers contain the payload and metadata.
149
150
```python { .api }
151
class Sample:
152
"""Data sample"""
153
154
@property
155
def key_expr(self) -> KeyExpr:
156
"""Key expression where data was published"""
157
158
@property
159
def payload(self) -> ZBytes:
160
"""Sample payload data"""
161
162
@property
163
def kind(self) -> SampleKind:
164
"""Sample kind (PUT or DELETE)"""
165
166
@property
167
def encoding(self) -> Encoding:
168
"""Data encoding"""
169
170
@property
171
def timestamp(self) -> Timestamp:
172
"""Sample timestamp"""
173
174
@property
175
def congestion_control(self) -> CongestionControl:
176
"""Congestion control setting"""
177
178
@property
179
def priority(self) -> Priority:
180
"""Message priority"""
181
182
@property
183
def express(self) -> bool:
184
"""Express delivery flag"""
185
186
@property
187
def attachment(self):
188
"""Additional metadata attachment"""
189
190
class SampleKind:
191
"""Sample operation type"""
192
PUT = ...
193
DELETE = ...
194
```
195
196
### Quality of Service Controls
197
198
Configure message delivery characteristics and network behavior.
199
200
```python { .api }
201
class Priority:
202
"""Message priority levels"""
203
REAL_TIME = ...
204
INTERACTIVE_HIGH = ...
205
INTERACTIVE_LOW = ...
206
DATA_HIGH = ...
207
DATA = ...
208
DATA_LOW = ...
209
BACKGROUND = ...
210
211
DEFAULT = ...
212
MIN = ...
213
MAX = ...
214
215
class CongestionControl:
216
"""Congestion control modes"""
217
DROP = ... # Drop messages when congested
218
BLOCK = ... # Block sender when congested
219
BLOCK_FIRST = ... # Block first message when congested (unstable)
220
221
DEFAULT = ...
222
223
class Reliability:
224
"""Reliability modes (unstable)"""
225
BEST_EFFORT = ... # Best effort delivery
226
RELIABLE = ... # Reliable delivery
227
228
class Locality:
229
"""Origin/destination locality"""
230
SESSION_LOCAL = ... # Only local session
231
REMOTE = ... # Only remote sources
232
ANY = ... # Any source
233
234
DEFAULT = ...
235
```
236
237
### Matching Status
238
239
Monitor whether publishers and subscribers are matched with peers.
240
241
```python { .api }
242
class MatchingStatus:
243
"""Entity matching status"""
244
245
@property
246
def matching(self) -> bool:
247
"""Whether there are matching entities"""
248
249
class MatchingListener:
250
"""Matching status listener"""
251
252
@property
253
def handler(self):
254
"""Get the listener's handler"""
255
256
def undeclare(self) -> None:
257
"""Undeclare the matching listener"""
258
259
def try_recv(self):
260
"""Try to receive a matching status update"""
261
262
def recv(self):
263
"""Receive a matching status update (blocking)"""
264
265
def __iter__(self):
266
"""Iterate over matching status updates"""
267
```
268
269
## Usage Examples
270
271
### Basic Publisher
272
273
```python
274
import zenoh
275
276
session = zenoh.open()
277
278
# Declare publisher
279
publisher = session.declare_publisher("sensors/temperature")
280
281
# Send data
282
publisher.put("25.3")
283
publisher.put(b"binary_data")
284
285
# Send with metadata
286
publisher.put(
287
"26.1",
288
timestamp=session.new_timestamp(),
289
attachment={"sensor_id": "temp_01"}
290
)
291
292
# Clean up
293
publisher.undeclare()
294
session.close()
295
```
296
297
### Publisher with Quality of Service
298
299
```python
300
import zenoh
301
302
session = zenoh.open()
303
304
# High-priority publisher with reliable delivery
305
publisher = session.declare_publisher(
306
"critical/alerts",
307
priority=zenoh.Priority.REAL_TIME,
308
congestion_control=zenoh.CongestionControl.BLOCK,
309
reliability=zenoh.Reliability.RELIABLE
310
)
311
312
publisher.put("System critical alert!")
313
publisher.undeclare()
314
session.close()
315
```
316
317
### Basic Subscriber with Callback
318
319
```python
320
import zenoh
321
322
def data_handler(sample):
323
print(f"Received on {sample.key_expr}: {sample.payload.to_string()}")
324
if sample.kind == zenoh.SampleKind.DELETE:
325
print(" -> DELETE operation")
326
327
session = zenoh.open()
328
329
# Subscribe with callback handler
330
subscriber = session.declare_subscriber("sensors/**", data_handler)
331
332
# Let it run
333
import time
334
time.sleep(10)
335
336
subscriber.undeclare()
337
session.close()
338
```
339
340
### Subscriber with Manual Reception
341
342
```python
343
import zenoh
344
345
session = zenoh.open()
346
347
# Subscribe without callback
348
subscriber = session.declare_subscriber("data/stream")
349
350
# Manual reception
351
try:
352
sample = subscriber.recv() # Blocking receive
353
print(f"Got: {sample.payload.to_string()}")
354
except KeyboardInterrupt:
355
pass
356
357
# Non-blocking reception
358
sample = subscriber.try_recv()
359
if sample is not None:
360
print(f"Got: {sample.payload.to_string()}")
361
362
# Iterator style
363
for sample in subscriber:
364
print(f"Sample: {sample.payload.to_string()}")
365
if some_condition:
366
break
367
368
subscriber.undeclare()
369
session.close()
370
```
371
372
### Matching Status Monitoring
373
374
```python
375
import zenoh
376
377
session = zenoh.open()
378
379
publisher = session.declare_publisher("demo/pub")
380
381
def matching_handler(status):
382
if status.matching:
383
print("Publisher has matching subscribers!")
384
else:
385
print("No matching subscribers")
386
387
# Monitor matching status
388
listener = publisher.declare_matching_listener(matching_handler)
389
390
# Check current status
391
print(f"Currently matching: {publisher.matching_status.matching}")
392
393
# Clean up
394
listener.undeclare()
395
publisher.undeclare()
396
session.close()
397
```
398
399
### Complete Pub/Sub Example
400
401
```python
402
import zenoh
403
import threading
404
import time
405
406
def publisher_thread():
407
session = zenoh.open()
408
publisher = session.declare_publisher("demo/example")
409
410
for i in range(10):
411
publisher.put(f"Message {i}")
412
time.sleep(1)
413
414
publisher.undeclare()
415
session.close()
416
417
def subscriber_thread():
418
def handler(sample):
419
print(f"Subscriber received: {sample.payload.to_string()}")
420
421
session = zenoh.open()
422
subscriber = session.declare_subscriber("demo/example", handler)
423
424
time.sleep(12) # Let it run
425
426
subscriber.undeclare()
427
session.close()
428
429
# Run both threads
430
pub_thread = threading.Thread(target=publisher_thread)
431
sub_thread = threading.Thread(target=subscriber_thread)
432
433
pub_thread.start()
434
sub_thread.start()
435
436
pub_thread.join()
437
sub_thread.join()
438
```