or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-queuing.mdconnection-pooling.mdconnectivity.mddata-types.mddatabase-objects.mdindex.mdlobs.mdpipeline.mdsoda.mdsql-execution.mdsubscriptions.md

advanced-queuing.mddocs/

0

# Advanced Queuing (AQ)

1

2

Oracle Advanced Queuing (AQ) provides message queuing functionality as an integral part of the Oracle Database. AQ enables applications to use the database as a message broker for reliable, persistent, and transactional message passing. The API supports both synchronous and asynchronous operations, with options for message delivery modes, priorities, and transformation functions.

3

4

## Capabilities

5

6

### Queue Access

7

8

Access to AQ queues through database connections with support for both synchronous and asynchronous operations.

9

10

```python { .api }

11

# Access queues through connection

12

def queue(self, name: str, payload_type: DbObjectType = None) -> Queue:

13

"""

14

Creates and returns a queue which is used to enqueue and dequeue

15

messages in Advanced Queueing (AQ).

16

17

Parameters:

18

- name (str): Name of the queue to access

19

- payload_type (DbObjectType): Object type for structured payloads

20

21

Returns:

22

Queue: Queue object for message operations

23

"""

24

25

# Async version

26

async def queue(self, name: str, payload_type: DbObjectType = None) -> AsyncQueue: ...

27

```

28

29

### Queue Class

30

31

Synchronous queue operations for enqueuing and dequeuing messages.

32

33

```python { .api }

34

class Queue:

35

# Properties

36

connection: Connection # Read-only connection object

37

deqoptions: DeqOptions # Dequeue options for configuration

38

enqoptions: EnqOptions # Enqueue options for configuration

39

name: str # Read-only queue name

40

payload_type: Union[DbObjectType, str, None] # Read-only payload type

41

42

def deqone(self) -> Union[MessageProperties, None]:

43

"""

44

Dequeues at most one message from the queue and returns it. If no

45

message is dequeued, None is returned.

46

47

Returns:

48

Union[MessageProperties, None]: Dequeued message or None

49

"""

50

51

def deqmany(self, max_num_messages: int) -> list:

52

"""

53

Dequeues up to the specified number of messages from the queue and

54

returns a list of these messages.

55

56

Parameters:

57

- max_num_messages (int): Maximum number of messages to dequeue

58

59

Returns:

60

list: List of MessageProperties objects

61

"""

62

63

def enqone(self, message: MessageProperties) -> None:

64

"""

65

Enqueues a single message into the queue. The message must be a message

66

property object which has had its payload attribute set to a value that

67

the queue supports.

68

69

Parameters:

70

- message (MessageProperties): Message to enqueue

71

"""

72

73

def enqmany(self, messages: list) -> None:

74

"""

75

Enqueues multiple messages into the queue. The messages parameter must

76

be a sequence containing message property objects which have all had

77

their payload attribute set to a value that the queue supports.

78

79

Parameters:

80

- messages (list): List of MessageProperties objects to enqueue

81

82

Warning: calling this function in parallel on different connections

83

acquired from the same pool may fail due to Oracle bug 29928074.

84

"""

85

```

86

87

### Async Queue Class

88

89

Asynchronous queue operations for enqueuing and dequeuing messages.

90

91

```python { .api }

92

class AsyncQueue:

93

# Properties (same as Queue)

94

connection: AsyncConnection

95

deqoptions: DeqOptions

96

enqoptions: EnqOptions

97

name: str

98

payload_type: Union[DbObjectType, str, None]

99

100

async def deqone(self) -> Union[MessageProperties, None]:

101

"""

102

Dequeues at most one message from the queue and returns it. If no

103

message is dequeued, None is returned.

104

105

Returns:

106

Union[MessageProperties, None]: Dequeued message or None

107

"""

108

109

async def deqmany(self, max_num_messages: int) -> list:

110

"""

111

Dequeues up to the specified number of messages from the queue and

112

returns a list of these messages.

113

114

Parameters:

115

- max_num_messages (int): Maximum number of messages to dequeue

116

117

Returns:

118

list: List of MessageProperties objects

119

"""

120

121

async def enqone(self, message: MessageProperties) -> None:

122

"""

123

Enqueues a single message into the queue. The message must be a message

124

property object which has had its payload attribute set to a value that

125

the queue supports.

126

127

Parameters:

128

- message (MessageProperties): Message to enqueue

129

"""

130

131

async def enqmany(self, messages: list) -> None:

132

"""

133

Enqueues multiple messages into the queue. The messages parameter must

134

be a sequence containing message property objects which have all had

135

their payload attribute set to a value that the queue supports.

136

137

Parameters:

138

- messages (list): List of MessageProperties objects to enqueue

139

"""

140

```

141

142

### Dequeue Options

143

144

Configuration options for message dequeuing operations.

145

146

```python { .api }

147

class DeqOptions:

148

condition: str # Boolean expression for message filtering

149

consumername: str # Consumer name for multi-consumer queues

150

correlation: str # Correlation identifier with pattern matching

151

deliverymode: int # Message delivery mode (write-only)

152

mode: int # Locking behavior (DEQ_BROWSE, DEQ_LOCKED, DEQ_REMOVE, DEQ_REMOVE_NODATA)

153

msgid: bytes # Specific message identifier to dequeue

154

navigation: int # Message position (DEQ_FIRST_MSG, DEQ_NEXT_MSG, DEQ_NEXT_TRANSACTION)

155

transformation: str # Transformation function name

156

visibility: int # Transaction behavior (DEQ_ON_COMMIT, DEQ_IMMEDIATE)

157

wait: int # Wait time in seconds (DEQ_NO_WAIT, DEQ_WAIT_FOREVER, or timeout)

158

```

159

160

### Enqueue Options

161

162

Configuration options for message enqueuing operations.

163

164

```python { .api }

165

class EnqOptions:

166

deliverymode: int # Message delivery mode (MSG_PERSISTENT, MSG_BUFFERED) - write-only

167

transformation: str # Transformation function name

168

visibility: int # Transaction behavior (ENQ_ON_COMMIT, ENQ_IMMEDIATE)

169

```

170

171

### Message Properties

172

173

Properties and content of queued messages with metadata and payload management.

174

175

```python { .api }

176

class MessageProperties:

177

# Read-only properties

178

attempts: int # Number of dequeue attempts made

179

deliverymode: int # Message delivery mode

180

enqtime: datetime # Time message was enqueued

181

msgid: bytes # Message identifier

182

state: int # Message state (MSG_WAITING, MSG_READY, MSG_PROCESSED, MSG_EXPIRED)

183

184

# Read-write properties

185

correlation: str # Correlation identifier

186

delay: int # Delay before message becomes available

187

exceptionq: str # Exception queue name

188

expiration: int # Message expiration time in seconds

189

payload: Union[bytes, str, dict, list, DbObject] # Message payload

190

priority: int # Message priority (lower numbers = higher priority)

191

recipients: list # List of recipient names for targeted delivery

192

```

193

194

## Constants

195

196

```python { .api }

197

# Delivery Modes

198

MSG_PERSISTENT: int # Persistent message storage

199

MSG_BUFFERED: int # Buffered message storage

200

MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or buffered

201

202

# Dequeue Modes

203

DEQ_BROWSE: int # Browse without locking

204

DEQ_LOCKED: int # Lock for update

205

DEQ_REMOVE: int # Remove from queue (default)

206

DEQ_REMOVE_NODATA: int # Remove without returning data

207

208

# Dequeue Navigation

209

DEQ_FIRST_MSG: int # First message

210

DEQ_NEXT_MSG: int # Next message (default)

211

DEQ_NEXT_TRANSACTION: int # Next transaction

212

213

# Dequeue Visibility

214

DEQ_IMMEDIATE: int # Immediate visibility

215

DEQ_ON_COMMIT: int # Visible on commit (default)

216

217

# Dequeue Wait Modes

218

DEQ_NO_WAIT: int # Don't wait for messages

219

DEQ_WAIT_FOREVER: int # Wait indefinitely (default)

220

221

# Enqueue Visibility

222

ENQ_IMMEDIATE: int # Immediate visibility

223

ENQ_ON_COMMIT: int # Visible on commit (default)

224

225

# Message States

226

MSG_EXPIRED: int # Message has expired

227

MSG_PROCESSED: int # Message has been processed

228

MSG_READY: int # Message is ready for dequeue

229

MSG_WAITING: int # Message is waiting

230

231

# Message Timing Constants

232

MSG_NO_DELAY: int # No delay for message availability

233

MSG_NO_EXPIRATION: int # Message never expires

234

```

235

236

## Usage Examples

237

238

```python

239

import oracledb

240

241

# Basic AQ usage

242

with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:

243

# Create/access a queue

244

queue = connection.queue("my_queue")

245

246

# Create and enqueue a message

247

message = MessageProperties()

248

message.payload = "Hello, World!"

249

message.priority = 1

250

queue.enqone(message)

251

252

# Dequeue a message

253

received_msg = queue.deqone()

254

if received_msg:

255

print(f"Received: {received_msg.payload}")

256

print(f"Priority: {received_msg.priority}")

257

print(f"Enqueue time: {received_msg.enqtime}")

258

259

connection.commit()

260

261

# Advanced AQ with options

262

with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:

263

queue = connection.queue("priority_queue")

264

265

# Configure dequeue options

266

queue.deqoptions.wait = 10 # Wait 10 seconds

267

queue.deqoptions.mode = oracledb.DEQ_REMOVE

268

queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG

269

270

# Configure enqueue options

271

queue.enqoptions.visibility = oracledb.ENQ_IMMEDIATE

272

273

# Enqueue multiple messages with different priorities

274

messages = []

275

for i, priority in enumerate([3, 1, 2]):

276

msg = MessageProperties()

277

msg.payload = f"Message {i+1}"

278

msg.priority = priority

279

msg.correlation = f"batch_1_msg_{i+1}"

280

messages.append(msg)

281

282

queue.enqmany(messages)

283

284

# Dequeue messages (will come out in priority order)

285

while True:

286

msg = queue.deqone()

287

if msg is None:

288

break

289

print(f"Dequeued: {msg.payload} (priority {msg.priority})")

290

291

connection.commit()

292

293

# Async AQ operations

294

import asyncio

295

296

async def async_aq_example():

297

async with oracledb.connect_async(user="user", password="pwd", dsn="localhost/orclpdb") as connection:

298

queue = await connection.queue("async_queue")

299

300

# Async enqueue

301

message = MessageProperties()

302

message.payload = {"type": "notification", "data": "async message"}

303

await queue.enqone(message)

304

305

# Async dequeue

306

received = await queue.deqone()

307

if received:

308

print(f"Async received: {received.payload}")

309

310

await connection.commit()

311

312

asyncio.run(async_aq_example())

313

```