or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

async-support.mddocs/

0

# Async Support

1

2

Native async/await support with AsyncIO-compatible Context, Socket, and Poller classes that integrate seamlessly with Python's event loop for non-blocking messaging operations.

3

4

## Capabilities

5

6

### AsyncIO Context

7

8

Async-compatible context manager that integrates with Python's asyncio event loop for non-blocking socket operations.

9

10

```python { .api }

11

class Context:

12

def __init__(self, io_threads: int | zmq.Context = 1, shadow: zmq.Context | int = 0) -> None:

13

"""

14

Create a new async ZMQ context.

15

16

Parameters:

17

- io_threads: Number of I/O threads or existing Context to shadow (default: 1)

18

- shadow: Context or address to shadow (default: 0)

19

"""

20

21

def socket(self, socket_type: int, **kwargs) -> Socket:

22

"""

23

Create an async socket of the specified type.

24

25

Parameters:

26

- socket_type: ZMQ socket type constant

27

28

Returns:

29

- Socket: New async socket instance

30

"""

31

32

def term(self) -> None:

33

"""Terminate the context and close all sockets."""

34

35

def destroy(self, linger: int = None) -> None:

36

"""

37

Close all sockets and terminate context.

38

39

Parameters:

40

- linger: Linger period in milliseconds

41

"""

42

43

def __enter__(self) -> Context:

44

"""Context manager entry."""

45

46

def __exit__(self, exc_type, exc_value, traceback) -> None:

47

"""Context manager exit - destroys context."""

48

49

@property

50

def closed(self) -> bool:

51

"""True if the context has been terminated."""

52

```

53

54

### AsyncIO Socket

55

56

Async socket class providing non-blocking send/receive operations that work with Python's async/await syntax. Inherits all methods from the base Socket class with async versions where applicable.

57

58

```python { .api }

59

class Socket:

60

def bind(self, address: str) -> None:

61

"""Bind socket to an address."""

62

63

def connect(self, address: str) -> None:

64

"""Connect socket to an address."""

65

66

async def send(self, data: bytes | Frame, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:

67

"""

68

Send a message asynchronously.

69

70

Parameters:

71

- data: Message data

72

- flags: Send flags (NOBLOCK, SNDMORE)

73

- copy: Whether to copy data

74

- track: Whether to return MessageTracker

75

76

Returns:

77

- MessageTracker: If track=True

78

"""

79

80

async def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> bytes | Frame:

81

"""

82

Receive a message asynchronously.

83

84

Parameters:

85

- flags: Receive flags (NOBLOCK)

86

- copy: Whether to copy data

87

- track: Whether to return Frame with tracking

88

89

Returns:

90

- bytes or Frame: Received message data

91

"""

92

93

async def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:

94

"""

95

Send a string message asynchronously.

96

97

Parameters:

98

- string: String to send

99

- flags: Send flags

100

- encoding: String encoding

101

"""

102

103

async def recv_string(self, flags: int = 0, encoding: str = 'utf-8', copy: bool = True) -> str:

104

"""

105

Receive a string message asynchronously.

106

107

Parameters:

108

- flags: Receive flags

109

- encoding: String encoding

110

111

Returns:

112

- str: Received string

113

"""

114

115

async def send_json(self, obj: Any, flags: int = 0, **kwargs) -> None:

116

"""

117

Send a JSON object asynchronously.

118

119

Parameters:

120

- obj: JSON-serializable object

121

- flags: Send flags

122

- kwargs: Additional json.dumps() arguments

123

"""

124

125

async def recv_json(self, flags: int = 0, **kwargs) -> Any:

126

"""

127

Receive a JSON object asynchronously.

128

129

Parameters:

130

- flags: Receive flags

131

- kwargs: Additional json.loads() arguments

132

133

Returns:

134

- Any: Deserialized JSON object

135

"""

136

137

async def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL) -> None:

138

"""

139

Send a Python object asynchronously using pickle.

140

141

Parameters:

142

- obj: Python object to send

143

- flags: Send flags

144

- protocol: Pickle protocol version

145

"""

146

147

async def recv_pyobj(self, flags: int = 0) -> Any:

148

"""

149

Receive a Python object asynchronously using pickle.

150

151

Parameters:

152

- flags: Receive flags

153

154

Returns:

155

- Any: Unpickled Python object

156

"""

157

158

async def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:

159

"""

160

Send a multipart message asynchronously.

161

162

Parameters:

163

- msg_parts: List of message parts

164

- flags: Send flags

165

- copy: Whether to copy data

166

- track: Whether to return MessageTracker

167

168

Returns:

169

- MessageTracker: If track=True

170

"""

171

172

async def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:

173

"""

174

Receive a multipart message asynchronously.

175

176

Parameters:

177

- flags: Receive flags

178

- copy: Whether to copy data

179

- track: Whether to return Frames with tracking

180

181

Returns:

182

- list: List of message parts

183

"""

184

185

def close(self, linger: int = None) -> None:

186

"""Close the socket."""

187

188

@property

189

def closed(self) -> bool:

190

"""True if socket is closed."""

191

```

192

193

### AsyncIO Polling

194

195

Async poller for monitoring multiple sockets with non-blocking event detection.

196

197

```python { .api }

198

class Poller:

199

def register(self, socket: Socket | zmq.Socket, flags: int = POLLIN | POLLOUT) -> None:

200

"""

201

Register a socket for polling.

202

203

Parameters:

204

- socket: Socket to monitor

205

- flags: Event flags (POLLIN, POLLOUT, POLLERR)

206

"""

207

208

def unregister(self, socket: Socket | zmq.Socket) -> None:

209

"""

210

Unregister a socket from polling.

211

212

Parameters:

213

- socket: Socket to unregister

214

"""

215

216

async def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:

217

"""

218

Poll for events asynchronously.

219

220

Parameters:

221

- timeout: Timeout in milliseconds (-1 for infinite)

222

223

Returns:

224

- list: List of (socket, events) tuples

225

"""

226

```

227

228

## Usage Examples

229

230

### Async Request-Reply Server

231

232

```python

233

import asyncio

234

import zmq.asyncio

235

236

async def server():

237

context = zmq.asyncio.Context()

238

socket = context.socket(zmq.REP)

239

socket.bind("tcp://*:5555")

240

241

try:

242

while True:

243

# Non-blocking receive

244

message = await socket.recv_string()

245

print(f"Received: {message}")

246

247

# Simulate async processing

248

await asyncio.sleep(0.1)

249

250

# Non-blocking send

251

await socket.send_string(f"Echo: {message}")

252

finally:

253

socket.close()

254

context.term()

255

256

# Run the server

257

asyncio.run(server())

258

```

259

260

### Async Request-Reply Client

261

262

```python

263

import asyncio

264

import zmq.asyncio

265

266

async def client():

267

context = zmq.asyncio.Context()

268

socket = context.socket(zmq.REQ)

269

socket.connect("tcp://localhost:5555")

270

271

try:

272

# Send multiple requests concurrently

273

tasks = []

274

for i in range(10):

275

task = send_request(socket, f"Request {i}")

276

tasks.append(task)

277

278

responses = await asyncio.gather(*tasks)

279

for response in responses:

280

print(f"Response: {response}")

281

finally:

282

socket.close()

283

context.term()

284

285

async def send_request(socket, message):

286

await socket.send_string(message)

287

return await socket.recv_string()

288

289

asyncio.run(client())

290

```

291

292

### Async Publisher

293

294

```python

295

import asyncio

296

import zmq.asyncio

297

298

async def publisher():

299

context = zmq.asyncio.Context()

300

socket = context.socket(zmq.PUB)

301

socket.bind("tcp://*:5556")

302

303

try:

304

i = 0

305

while True:

306

topic = "weather" if i % 2 else "news"

307

message = f"Update {i}"

308

309

# Non-blocking send

310

await socket.send_string(f"{topic} {message}")

311

print(f"Published: {topic} {message}")

312

313

# Async sleep allows other coroutines to run

314

await asyncio.sleep(1)

315

i += 1

316

finally:

317

socket.close()

318

context.term()

319

320

asyncio.run(publisher())

321

```

322

323

### Async Subscriber

324

325

```python

326

import asyncio

327

import zmq.asyncio

328

329

async def subscriber():

330

context = zmq.asyncio.Context()

331

socket = context.socket(zmq.SUB)

332

socket.connect("tcp://localhost:5556")

333

socket.setsockopt_string(zmq.SUBSCRIBE, "weather")

334

335

try:

336

while True:

337

# Non-blocking receive

338

message = await socket.recv_string()

339

print(f"Received: {message}")

340

341

# Process message asynchronously

342

await process_weather_data(message)

343

finally:

344

socket.close()

345

context.term()

346

347

async def process_weather_data(message):

348

# Simulate async processing

349

await asyncio.sleep(0.1)

350

print(f"Processed: {message}")

351

352

asyncio.run(subscriber())

353

```

354

355

### Async Polling Multiple Sockets

356

357

```python

358

import asyncio

359

import zmq.asyncio

360

361

async def multi_socket_handler():

362

context = zmq.asyncio.Context()

363

364

# Create multiple sockets

365

frontend = context.socket(zmq.ROUTER)

366

frontend.bind("tcp://*:5555")

367

368

backend = context.socket(zmq.DEALER)

369

backend.bind("tcp://*:5556")

370

371

# Create async poller

372

poller = zmq.asyncio.Poller()

373

poller.register(frontend, zmq.POLLIN)

374

poller.register(backend, zmq.POLLIN)

375

376

try:

377

while True:

378

# Poll for events asynchronously

379

events = await poller.poll()

380

381

for socket, event in events:

382

if socket is frontend and event & zmq.POLLIN:

383

# Handle frontend message

384

message = await frontend.recv_multipart()

385

print(f"Frontend: {message}")

386

await backend.send_multipart(message)

387

388

elif socket is backend and event & zmq.POLLIN:

389

# Handle backend message

390

message = await backend.recv_multipart()

391

print(f"Backend: {message}")

392

await frontend.send_multipart(message)

393

finally:

394

frontend.close()

395

backend.close()

396

context.term()

397

398

asyncio.run(multi_socket_handler())

399

```

400

401

### Integration with Other Async Libraries

402

403

```python

404

import asyncio

405

import aiohttp

406

import zmq.asyncio

407

408

async def web_service_integration():

409

"""Example integrating ZMQ with aiohttp web service"""

410

context = zmq.asyncio.Context()

411

socket = context.socket(zmq.REQ)

412

socket.connect("tcp://localhost:5555")

413

414

async with aiohttp.ClientSession() as session:

415

try:

416

# Send ZMQ message

417

await socket.send_json({"action": "get_data", "id": 123})

418

response = await socket.recv_json()

419

420

# Use response in HTTP request

421

async with session.get(f"https://api.example.com/data/{response['id']}") as resp:

422

web_data = await resp.json()

423

424

# Send web data back via ZMQ

425

await socket.send_json({"web_data": web_data})

426

result = await socket.recv_json()

427

428

return result

429

finally:

430

socket.close()

431

context.term()

432

433

# Run with asyncio

434

result = asyncio.run(web_service_integration())

435

```

436

437

## Event Loop Integration

438

439

PyZMQ's async support automatically integrates with the current asyncio event loop:

440

441

```python

442

import asyncio

443

import zmq.asyncio

444

445

async def main():

446

# Context automatically uses current event loop

447

context = zmq.asyncio.Context()

448

socket = context.socket(zmq.REQ)

449

450

# All operations are non-blocking and event-loop aware

451

await socket.send_string("Hello")

452

response = await socket.recv_string()

453

454

socket.close()

455

context.term()

456

457

# Works with any asyncio event loop

458

if __name__ == "__main__":

459

asyncio.run(main())

460

```

461

462

## Imports

463

464

```python

465

import zmq

466

import zmq.asyncio

467

from zmq import Frame, MessageTracker, POLLIN, POLLOUT, DEFAULT_PROTOCOL

468

```

469

470

## Types

471

472

```python { .api }

473

from typing import Union, Optional, Any, List, Tuple, Awaitable

474

475

# Async message data types

476

AsyncMessageData = Union[bytes, str, memoryview, Frame]

477

AsyncMultipartMessage = List[AsyncMessageData]

478

479

# Async polling result

480

PollResult = List[Tuple[Socket, int]]

481

482

# Coroutine types

483

SendCoroutine = Awaitable[Optional[MessageTracker]]

484

RecvCoroutine = Awaitable[Union[bytes, Frame]]

485

StringCoroutine = Awaitable[str]

486

JsonCoroutine = Awaitable[Any]

487

MultipartCoroutine = Awaitable[List[AsyncMessageData]]

488

PollCoroutine = Awaitable[PollResult]

489

```