or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced.mddata-types.mdextensions.mdhandlers.mdindex.mdpubsub.mdquery.mdsession-management.md

pubsub.mddocs/

0

# Publisher/Subscriber Pattern

1

2

The Publisher/Subscriber pattern enables real-time data streaming in Zenoh applications. Publishers send data to specific key expressions, while Subscribers receive data matching their subscription patterns. This decoupled messaging pattern supports high-throughput, low-latency communication with flexible quality of service controls.

3

4

## Capabilities

5

6

### Publisher

7

8

Publishers send data to specific key expressions with configurable quality of service parameters.

9

10

```python { .api }

11

def declare_publisher(

12

self,

13

key_expr,

14

encoding: Encoding = None,

15

congestion_control: CongestionControl = None,

16

priority: Priority = None,

17

reliability: Reliability = None

18

) -> Publisher:

19

"""

20

Declare a publisher for a key expression.

21

22

Parameters:

23

- key_expr: Key expression to publish on

24

- encoding: Data encoding specification

25

- congestion_control: How to handle network congestion

26

- priority: Message priority level

27

- reliability: Reliability mode for message delivery

28

29

Returns:

30

Publisher object for sending data

31

"""

32

33

class Publisher:

34

"""Publisher for data streams"""

35

36

@property

37

def key_expr(self) -> KeyExpr:

38

"""Get the publisher's key expression"""

39

40

@property

41

def encoding(self) -> Encoding:

42

"""Get the publisher's encoding"""

43

44

@property

45

def congestion_control(self) -> CongestionControl:

46

"""Get congestion control setting"""

47

48

@property

49

def priority(self) -> Priority:

50

"""Get priority setting"""

51

52

@property

53

def reliability(self) -> Reliability:

54

"""Get reliability setting"""

55

56

@property

57

def matching_status(self) -> MatchingStatus:

58

"""Get current matching status"""

59

60

def put(

61

self,

62

payload,

63

encoding: Encoding = None,

64

timestamp: Timestamp = None,

65

attachment = None

66

) -> None:

67

"""

68

Send data through the publisher.

69

70

Parameters:

71

- payload: Data to send (str, bytes, or ZBytes)

72

- encoding: Override default encoding

73

- timestamp: Custom timestamp for the data

74

- attachment: Additional metadata

75

"""

76

77

def delete(

78

self,

79

timestamp: Timestamp = None,

80

attachment = None

81

) -> None:

82

"""

83

Send a delete operation.

84

85

Parameters:

86

- timestamp: Custom timestamp for the delete

87

- attachment: Additional metadata

88

"""

89

90

def undeclare(self) -> None:

91

"""Undeclare the publisher and release resources"""

92

93

def declare_matching_listener(self, handler) -> MatchingListener:

94

"""Declare a listener for matching status changes"""

95

```

96

97

### Subscriber

98

99

Subscribers receive data matching their subscription key expressions through configurable handlers.

100

101

```python { .api }

102

def declare_subscriber(

103

self,

104

key_expr,

105

handler = None,

106

reliability: Reliability = None,

107

locality: Locality = None

108

) -> Subscriber:

109

"""

110

Declare a subscriber for a key expression.

111

112

Parameters:

113

- key_expr: Key expression pattern to subscribe to

114

- handler: Handler for received samples (callback, channel, etc.)

115

- reliability: Reliability mode for receiving data

116

- locality: Locality constraint for data sources

117

118

Returns:

119

Subscriber object for receiving data

120

"""

121

122

class Subscriber:

123

"""Subscriber with generic handler"""

124

125

@property

126

def key_expr(self) -> KeyExpr:

127

"""Get the subscriber's key expression"""

128

129

@property

130

def handler(self):

131

"""Get the subscriber's handler"""

132

133

def undeclare(self) -> None:

134

"""Undeclare the subscriber and release resources"""

135

136

def try_recv(self):

137

"""Try to receive a sample without blocking"""

138

139

def recv(self):

140

"""Receive a sample (blocking)"""

141

142

def __iter__(self):

143

"""Iterate over received samples"""

144

```

145

146

### Sample Data

147

148

Data samples received by subscribers contain the payload and metadata.

149

150

```python { .api }

151

class Sample:

152

"""Data sample"""

153

154

@property

155

def key_expr(self) -> KeyExpr:

156

"""Key expression where data was published"""

157

158

@property

159

def payload(self) -> ZBytes:

160

"""Sample payload data"""

161

162

@property

163

def kind(self) -> SampleKind:

164

"""Sample kind (PUT or DELETE)"""

165

166

@property

167

def encoding(self) -> Encoding:

168

"""Data encoding"""

169

170

@property

171

def timestamp(self) -> Timestamp:

172

"""Sample timestamp"""

173

174

@property

175

def congestion_control(self) -> CongestionControl:

176

"""Congestion control setting"""

177

178

@property

179

def priority(self) -> Priority:

180

"""Message priority"""

181

182

@property

183

def express(self) -> bool:

184

"""Express delivery flag"""

185

186

@property

187

def attachment(self):

188

"""Additional metadata attachment"""

189

190

class SampleKind:

191

"""Sample operation type"""

192

PUT = ...

193

DELETE = ...

194

```

195

196

### Quality of Service Controls

197

198

Configure message delivery characteristics and network behavior.

199

200

```python { .api }

201

class Priority:

202

"""Message priority levels"""

203

REAL_TIME = ...

204

INTERACTIVE_HIGH = ...

205

INTERACTIVE_LOW = ...

206

DATA_HIGH = ...

207

DATA = ...

208

DATA_LOW = ...

209

BACKGROUND = ...

210

211

DEFAULT = ...

212

MIN = ...

213

MAX = ...

214

215

class CongestionControl:

216

"""Congestion control modes"""

217

DROP = ... # Drop messages when congested

218

BLOCK = ... # Block sender when congested

219

BLOCK_FIRST = ... # Block first message when congested (unstable)

220

221

DEFAULT = ...

222

223

class Reliability:

224

"""Reliability modes (unstable)"""

225

BEST_EFFORT = ... # Best effort delivery

226

RELIABLE = ... # Reliable delivery

227

228

class Locality:

229

"""Origin/destination locality"""

230

SESSION_LOCAL = ... # Only local session

231

REMOTE = ... # Only remote sources

232

ANY = ... # Any source

233

234

DEFAULT = ...

235

```

236

237

### Matching Status

238

239

Monitor whether publishers and subscribers are matched with peers.

240

241

```python { .api }

242

class MatchingStatus:

243

"""Entity matching status"""

244

245

@property

246

def matching(self) -> bool:

247

"""Whether there are matching entities"""

248

249

class MatchingListener:

250

"""Matching status listener"""

251

252

@property

253

def handler(self):

254

"""Get the listener's handler"""

255

256

def undeclare(self) -> None:

257

"""Undeclare the matching listener"""

258

259

def try_recv(self):

260

"""Try to receive a matching status update"""

261

262

def recv(self):

263

"""Receive a matching status update (blocking)"""

264

265

def __iter__(self):

266

"""Iterate over matching status updates"""

267

```

268

269

## Usage Examples

270

271

### Basic Publisher

272

273

```python

274

import zenoh

275

276

session = zenoh.open()

277

278

# Declare publisher

279

publisher = session.declare_publisher("sensors/temperature")

280

281

# Send data

282

publisher.put("25.3")

283

publisher.put(b"binary_data")

284

285

# Send with metadata

286

publisher.put(

287

"26.1",

288

timestamp=session.new_timestamp(),

289

attachment={"sensor_id": "temp_01"}

290

)

291

292

# Clean up

293

publisher.undeclare()

294

session.close()

295

```

296

297

### Publisher with Quality of Service

298

299

```python

300

import zenoh

301

302

session = zenoh.open()

303

304

# High-priority publisher with reliable delivery

305

publisher = session.declare_publisher(

306

"critical/alerts",

307

priority=zenoh.Priority.REAL_TIME,

308

congestion_control=zenoh.CongestionControl.BLOCK,

309

reliability=zenoh.Reliability.RELIABLE

310

)

311

312

publisher.put("System critical alert!")

313

publisher.undeclare()

314

session.close()

315

```

316

317

### Basic Subscriber with Callback

318

319

```python

320

import zenoh

321

322

def data_handler(sample):

323

print(f"Received on {sample.key_expr}: {sample.payload.to_string()}")

324

if sample.kind == zenoh.SampleKind.DELETE:

325

print(" -> DELETE operation")

326

327

session = zenoh.open()

328

329

# Subscribe with callback handler

330

subscriber = session.declare_subscriber("sensors/**", data_handler)

331

332

# Let it run

333

import time

334

time.sleep(10)

335

336

subscriber.undeclare()

337

session.close()

338

```

339

340

### Subscriber with Manual Reception

341

342

```python

343

import zenoh

344

345

session = zenoh.open()

346

347

# Subscribe without callback

348

subscriber = session.declare_subscriber("data/stream")

349

350

# Manual reception

351

try:

352

sample = subscriber.recv() # Blocking receive

353

print(f"Got: {sample.payload.to_string()}")

354

except KeyboardInterrupt:

355

pass

356

357

# Non-blocking reception

358

sample = subscriber.try_recv()

359

if sample is not None:

360

print(f"Got: {sample.payload.to_string()}")

361

362

# Iterator style

363

for sample in subscriber:

364

print(f"Sample: {sample.payload.to_string()}")

365

if some_condition:

366

break

367

368

subscriber.undeclare()

369

session.close()

370

```

371

372

### Matching Status Monitoring

373

374

```python

375

import zenoh

376

377

session = zenoh.open()

378

379

publisher = session.declare_publisher("demo/pub")

380

381

def matching_handler(status):

382

if status.matching:

383

print("Publisher has matching subscribers!")

384

else:

385

print("No matching subscribers")

386

387

# Monitor matching status

388

listener = publisher.declare_matching_listener(matching_handler)

389

390

# Check current status

391

print(f"Currently matching: {publisher.matching_status.matching}")

392

393

# Clean up

394

listener.undeclare()

395

publisher.undeclare()

396

session.close()

397

```

398

399

### Complete Pub/Sub Example

400

401

```python

402

import zenoh

403

import threading

404

import time

405

406

def publisher_thread():

407

session = zenoh.open()

408

publisher = session.declare_publisher("demo/example")

409

410

for i in range(10):

411

publisher.put(f"Message {i}")

412

time.sleep(1)

413

414

publisher.undeclare()

415

session.close()

416

417

def subscriber_thread():

418

def handler(sample):

419

print(f"Subscriber received: {sample.payload.to_string()}")

420

421

session = zenoh.open()

422

subscriber = session.declare_subscriber("demo/example", handler)

423

424

time.sleep(12) # Let it run

425

426

subscriber.undeclare()

427

session.close()

428

429

# Run both threads

430

pub_thread = threading.Thread(target=publisher_thread)

431

sub_thread = threading.Thread(target=subscriber_thread)

432

433

pub_thread.start()

434

sub_thread.start()

435

436

pub_thread.join()

437

sub_thread.join()

438

```