or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

constants.mdindex.mdmessage-queues.mdsemaphores.mdshared-memory.md

message-queues.mddocs/

0

# Message Queues

1

2

POSIX named message queues provide reliable, priority-based message passing between processes. Messages are ordered by priority (highest first) and FIFO within the same priority level. Message queues support both blocking and non-blocking operations, timeouts, and asynchronous notifications.

3

4

Note: Message queue support varies by platform. Check the `MESSAGE_QUEUES_SUPPORTED` constant before using message queue functionality. macOS does not support POSIX message queues.

5

6

## Capabilities

7

8

### Message Queue Creation and Management

9

10

Create and manage named POSIX message queues with configurable capacity, message size limits, and access permissions.

11

12

```python { .api }

13

class MessageQueue:

14

def __init__(self, name, flags=0, mode=0o600, max_messages=QUEUE_MESSAGES_MAX_DEFAULT,

15

max_message_size=QUEUE_MESSAGE_SIZE_MAX_DEFAULT, read=True, write=True):

16

"""

17

Create or open a named message queue.

18

19

Parameters:

20

- name: str or None. If None, a random name is chosen. If str, should start with '/' (e.g., '/my_queue')

21

- flags: int, creation flags (O_CREAT, O_EXCL, O_CREX)

22

- mode: int, permissions (octal, default 0o600)

23

- max_messages: int, maximum messages in queue (default: QUEUE_MESSAGES_MAX_DEFAULT)

24

- max_message_size: int, maximum message size in bytes (default: QUEUE_MESSAGE_SIZE_MAX_DEFAULT)

25

- read: bool, whether this handle can receive messages

26

- write: bool, whether this handle can send messages

27

28

Notes:

29

- max_messages and max_message_size are ignored when opening existing queue

30

- Default values may be quite small (e.g., 10 messages) on some systems

31

- Consider providing explicit values for production use

32

- read/write permissions only affect this handle, not other handles to same queue

33

"""

34

```

35

36

### Message Operations

37

38

Send and receive messages with priority support and timeout handling.

39

40

```python { .api }

41

def send(self, message, timeout=None, priority=0):

42

"""

43

Send a message to the queue.

44

45

Parameters:

46

- message: bytes or str, message content (can contain embedded NULLs)

47

- timeout: None, 0, or positive float

48

- None: Block indefinitely until message sent

49

- 0: Non-blocking, raises BusyError if queue full

50

- > 0: Wait up to timeout seconds, raises BusyError if timeout expires

51

- priority: int, message priority (0 = lowest, higher values = higher priority, max = QUEUE_PRIORITY_MAX)

52

53

Raises:

54

- BusyError: When timeout expires or non-blocking call cannot proceed

55

- ValueError: When message exceeds max_message_size

56

"""

57

58

def receive(self, timeout=None):

59

"""

60

Receive a message from the queue.

61

62

Parameters:

63

- timeout: None, 0, or positive float (same semantics as send())

64

65

Returns:

66

tuple: (message, priority) where message is bytes and priority is int

67

68

Messages are received in priority order (highest priority first),

69

and FIFO order within the same priority level.

70

71

Raises:

72

- BusyError: When timeout expires or non-blocking call finds empty queue

73

"""

74

```

75

76

### Asynchronous Notifications

77

78

Request notifications when the queue transitions from empty to non-empty.

79

80

```python { .api }

81

def request_notification(self, notification=None):

82

"""

83

Request or cancel notification when queue becomes non-empty.

84

85

Parameters:

86

- notification: None, int, or tuple

87

- None: Cancel any existing notification request

88

- int: Signal number to send when queue becomes non-empty

89

- tuple: (function, param) to call function(param) in new thread when queue becomes non-empty

90

91

Notes:

92

- Only one notification request per queue system-wide

93

- OS delivers at most one notification per request

94

- Must call again for subsequent notifications

95

- Fails with BusyError if another process has pending notification request

96

97

Raises:

98

- BusyError: When another process already has notification request pending

99

"""

100

```

101

102

### Resource Management

103

104

Close queue handles and clean up queue resources.

105

106

```python { .api }

107

def fileno(self):

108

"""

109

Returns the message queue descriptor.

110

111

Returns:

112

int: Message queue descriptor (same as the mqd property)

113

114

This method allows MessageQueue objects to be used with functions

115

that expect file-like objects with a fileno() method.

116

"""

117

118

def close(self):

119

"""

120

Close this handle to the message queue.

121

122

Must be called explicitly - not automatically called on garbage collection.

123

Other handles to the same queue remain valid.

124

"""

125

126

def unlink(self):

127

"""

128

Request destruction of the message queue.

129

130

Actual destruction is postponed until all handles are closed.

131

After unlinking, new open() calls with the same name create a new queue.

132

"""

133

134

def __str__(self):

135

"""

136

String representation of the message queue.

137

138

Returns:

139

str: Human-readable representation including name and current message count

140

"""

141

142

def __repr__(self):

143

"""

144

Detailed string representation for debugging.

145

146

Returns:

147

str: Technical representation with class name and key attributes

148

"""

149

```

150

151

### Message Queue Properties

152

153

Access queue metadata, capacity limits, and current state.

154

155

```python { .api }

156

@property

157

def name(self):

158

"""

159

The name provided in the constructor.

160

161

Returns:

162

str: Message queue name

163

"""

164

165

@property

166

def mode(self):

167

"""

168

The mode (permissions) provided in the constructor.

169

170

Returns:

171

int: File mode/permissions (e.g., 0o600)

172

"""

173

174

@property

175

def mqd(self):

176

"""

177

Message queue descriptor representing the queue.

178

179

Returns:

180

int: Message queue descriptor (platform-specific handle)

181

"""

182

183

@property

184

def block(self):

185

"""

186

Whether send() and receive() operations may block.

187

188

Returns:

189

bool: True if operations may block, False if they raise BusyError instead

190

"""

191

192

@block.setter

193

def block(self, value):

194

"""

195

Set blocking behavior for send() and receive() operations.

196

197

Parameters:

198

- value: bool, True to allow blocking, False to raise BusyError instead

199

"""

200

201

@property

202

def max_messages(self):

203

"""

204

Maximum number of messages the queue can hold.

205

206

Returns:

207

int: Maximum message count

208

"""

209

210

@property

211

def max_message_size(self):

212

"""

213

Maximum size of individual messages in bytes.

214

215

Returns:

216

int: Maximum message size

217

"""

218

219

@property

220

def current_messages(self):

221

"""

222

Current number of messages in the queue.

223

224

Returns:

225

int: Current message count

226

"""

227

```

228

229

### Module Function

230

231

Convenience function for unlinking message queues by name.

232

233

```python { .api }

234

def unlink_message_queue(name):

235

"""

236

Convenience function to unlink a message queue by name.

237

238

Parameters:

239

- name: str, message queue name (e.g., '/my_queue')

240

241

Equivalent to opening the queue and calling unlink(), but more convenient

242

when you only need to remove an existing queue.

243

"""

244

```

245

246

## Usage Examples

247

248

### Basic Message Queue Usage

249

250

```python

251

import posix_ipc

252

253

# Check if message queues are supported

254

if not posix_ipc.MESSAGE_QUEUES_SUPPORTED:

255

print("Message queues not supported on this platform")

256

exit(1)

257

258

# Create a message queue

259

mq = posix_ipc.MessageQueue('/my_queue', posix_ipc.O_CREAT)

260

261

# Send a message

262

message = b'Hello, message queue!'

263

mq.send(message)

264

265

# Receive the message

266

received_message, priority = mq.receive()

267

print(f"Received: {received_message.decode()}, Priority: {priority}")

268

269

# Clean up

270

mq.close()

271

mq.unlink()

272

```

273

274

### Priority-Based Messaging

275

276

```python

277

import posix_ipc

278

279

mq = posix_ipc.MessageQueue('/priority_queue', posix_ipc.O_CREAT)

280

281

# Send messages with different priorities

282

mq.send(b'Low priority message', priority=1)

283

mq.send(b'High priority message', priority=10)

284

mq.send(b'Medium priority message', priority=5)

285

286

# Receive messages (highest priority first)

287

for i in range(3):

288

message, priority = mq.receive()

289

print(f"Received (priority {priority}): {message.decode()}")

290

291

mq.close()

292

mq.unlink()

293

```

294

295

### Non-Blocking and Timeout Operations

296

297

```python

298

import posix_ipc

299

300

mq = posix_ipc.MessageQueue('/timeout_queue', posix_ipc.O_CREAT)

301

302

# Non-blocking send (queue empty, should succeed)

303

try:

304

mq.send(b'Non-blocking message', timeout=0)

305

print("Message sent immediately")

306

except posix_ipc.BusyError:

307

print("Queue full, message not sent")

308

309

# Non-blocking receive

310

try:

311

message, priority = mq.receive(timeout=0)

312

print(f"Message received immediately: {message.decode()}")

313

except posix_ipc.BusyError:

314

print("Queue empty, no message received")

315

316

# Timeout receive

317

try:

318

message, priority = mq.receive(timeout=2.0)

319

print(f"Message received within timeout: {message.decode()}")

320

except posix_ipc.BusyError:

321

print("Timeout expired, no message received")

322

323

mq.close()

324

mq.unlink()

325

```

326

327

### Using Block Property

328

329

```python

330

import posix_ipc

331

332

mq = posix_ipc.MessageQueue('/block_queue', posix_ipc.O_CREAT)

333

334

# Enable non-blocking mode

335

mq.block = False

336

337

try:

338

# This will raise BusyError immediately if queue is empty

339

message, priority = mq.receive()

340

except posix_ipc.BusyError:

341

print("Queue empty (non-blocking mode)")

342

343

# Re-enable blocking mode

344

mq.block = True

345

346

mq.close()

347

mq.unlink()

348

```

349

350

### Asynchronous Notifications with Signals

351

352

```python

353

import posix_ipc

354

import signal

355

import os

356

import time

357

358

# Signal handler

359

def message_arrived(signum, frame):

360

print(f"Signal {signum} received - message available!")

361

362

# Set up signal handler

363

signal.signal(signal.SIGUSR1, message_arrived)

364

365

mq = posix_ipc.MessageQueue('/notify_queue', posix_ipc.O_CREAT)

366

367

# Request notification via signal

368

mq.request_notification(signal.SIGUSR1)

369

370

print("Waiting for message notification...")

371

372

# In another process, send a message to trigger notification

373

if os.fork() == 0: # Child process

374

time.sleep(2) # Wait a bit

375

child_mq = posix_ipc.MessageQueue('/notify_queue')

376

child_mq.send(b'Notification test message')

377

child_mq.close()

378

exit(0)

379

380

# Parent process waits for signal

381

time.sleep(5)

382

383

# Receive the message

384

message, priority = mq.receive()

385

print(f"Received: {message.decode()}")

386

387

mq.close()

388

mq.unlink()

389

```

390

391

### Asynchronous Notifications with Function Callback

392

393

```python

394

import posix_ipc

395

import threading

396

import time

397

398

def notification_callback(queue_name):

399

print(f"Callback called for queue: {queue_name}")

400

# Could process message here or signal main thread

401

402

mq = posix_ipc.MessageQueue('/callback_queue', posix_ipc.O_CREAT)

403

404

# Request notification via callback

405

mq.request_notification((notification_callback, '/callback_queue'))

406

407

print("Waiting for message notification...")

408

409

# Send message from another thread

410

def send_message():

411

time.sleep(2)

412

sender_mq = posix_ipc.MessageQueue('/callback_queue')

413

sender_mq.send(b'Callback test message')

414

sender_mq.close()

415

416

threading.Thread(target=send_message).start()

417

418

# Wait for notification and process message

419

time.sleep(5)

420

message, priority = mq.receive()

421

print(f"Received: {message.decode()}")

422

423

mq.close()

424

mq.unlink()

425

```

426

427

### Queue Capacity and Limits

428

429

```python

430

import posix_ipc

431

432

# Create queue with specific limits

433

mq = posix_ipc.MessageQueue('/capacity_queue', posix_ipc.O_CREAT,

434

max_messages=5, max_message_size=256)

435

436

print(f"Max messages: {mq.max_messages}")

437

print(f"Max message size: {mq.max_message_size}")

438

print(f"Current messages: {mq.current_messages}")

439

440

# Fill the queue to capacity

441

for i in range(mq.max_messages):

442

mq.send(f'Message {i}'.encode())

443

print(f"Sent message {i}, queue has {mq.current_messages} messages")

444

445

# Try to send one more (should block or fail depending on timeout)

446

try:

447

mq.send(b'Overflow message', timeout=0)

448

except posix_ipc.BusyError:

449

print("Queue full - cannot send more messages")

450

451

mq.close()

452

mq.unlink()

453

```

454

455

### Producer-Consumer Pattern

456

457

```python

458

import posix_ipc

459

import os

460

import time

461

import json

462

463

def producer():

464

mq = posix_ipc.MessageQueue('/producer_consumer', posix_ipc.O_CREAT)

465

466

for i in range(10):

467

data = {'id': i, 'timestamp': time.time(), 'value': i * 10}

468

message = json.dumps(data).encode()

469

mq.send(message, priority=i % 3) # Vary priority

470

print(f"Produced: {data}")

471

time.sleep(0.5)

472

473

# Send termination message

474

mq.send(b'STOP', priority=99)

475

mq.close()

476

477

def consumer():

478

mq = posix_ipc.MessageQueue('/producer_consumer')

479

480

while True:

481

message, priority = mq.receive()

482

483

if message == b'STOP':

484

print("Consumer stopping")

485

break

486

487

data = json.loads(message.decode())

488

print(f"Consumed (priority {priority}): {data}")

489

time.sleep(0.2)

490

491

mq.close()

492

mq.unlink()

493

494

# Run producer and consumer in separate processes

495

if os.fork() == 0: # Child - consumer

496

consumer()

497

else: # Parent - producer

498

producer()

499

os.wait() # Wait for child to finish

500

```