0
# Advanced Queuing (AQ)
1
2
Oracle Advanced Queuing (AQ) provides message queuing functionality as an integral part of the Oracle Database. AQ enables applications to use the database as a message broker for reliable, persistent, and transactional message passing. The API supports both synchronous and asynchronous operations, with options for message delivery modes, priorities, and transformation functions.
3
4
## Capabilities
5
6
### Queue Access
7
8
Access to AQ queues through database connections with support for both synchronous and asynchronous operations.
9
10
```python { .api }
11
# Access queues through connection
12
def queue(self, name: str, payload_type: DbObjectType = None) -> Queue:
13
"""
14
Creates and returns a queue which is used to enqueue and dequeue
15
messages in Advanced Queueing (AQ).
16
17
Parameters:
18
- name (str): Name of the queue to access
19
- payload_type (DbObjectType): Object type for structured payloads
20
21
Returns:
22
Queue: Queue object for message operations
23
"""
24
25
# Async version
26
async def queue(self, name: str, payload_type: DbObjectType = None) -> AsyncQueue: ...
27
```
28
29
### Queue Class
30
31
Synchronous queue operations for enqueuing and dequeuing messages.
32
33
```python { .api }
34
class Queue:
35
# Properties
36
connection: Connection # Read-only connection object
37
deqoptions: DeqOptions # Dequeue options for configuration
38
enqoptions: EnqOptions # Enqueue options for configuration
39
name: str # Read-only queue name
40
payload_type: Union[DbObjectType, str, None] # Read-only payload type
41
42
def deqone(self) -> Union[MessageProperties, None]:
43
"""
44
Dequeues at most one message from the queue and returns it. If no
45
message is dequeued, None is returned.
46
47
Returns:
48
Union[MessageProperties, None]: Dequeued message or None
49
"""
50
51
def deqmany(self, max_num_messages: int) -> list:
52
"""
53
Dequeues up to the specified number of messages from the queue and
54
returns a list of these messages.
55
56
Parameters:
57
- max_num_messages (int): Maximum number of messages to dequeue
58
59
Returns:
60
list: List of MessageProperties objects
61
"""
62
63
def enqone(self, message: MessageProperties) -> None:
64
"""
65
Enqueues a single message into the queue. The message must be a message
66
property object which has had its payload attribute set to a value that
67
the queue supports.
68
69
Parameters:
70
- message (MessageProperties): Message to enqueue
71
"""
72
73
def enqmany(self, messages: list) -> None:
74
"""
75
Enqueues multiple messages into the queue. The messages parameter must
76
be a sequence containing message property objects which have all had
77
their payload attribute set to a value that the queue supports.
78
79
Parameters:
80
- messages (list): List of MessageProperties objects to enqueue
81
82
Warning: calling this function in parallel on different connections
83
acquired from the same pool may fail due to Oracle bug 29928074.
84
"""
85
```
86
87
### Async Queue Class
88
89
Asynchronous queue operations for enqueuing and dequeuing messages.
90
91
```python { .api }
92
class AsyncQueue:
93
# Properties (same as Queue)
94
connection: AsyncConnection
95
deqoptions: DeqOptions
96
enqoptions: EnqOptions
97
name: str
98
payload_type: Union[DbObjectType, str, None]
99
100
async def deqone(self) -> Union[MessageProperties, None]:
101
"""
102
Dequeues at most one message from the queue and returns it. If no
103
message is dequeued, None is returned.
104
105
Returns:
106
Union[MessageProperties, None]: Dequeued message or None
107
"""
108
109
async def deqmany(self, max_num_messages: int) -> list:
110
"""
111
Dequeues up to the specified number of messages from the queue and
112
returns a list of these messages.
113
114
Parameters:
115
- max_num_messages (int): Maximum number of messages to dequeue
116
117
Returns:
118
list: List of MessageProperties objects
119
"""
120
121
async def enqone(self, message: MessageProperties) -> None:
122
"""
123
Enqueues a single message into the queue. The message must be a message
124
property object which has had its payload attribute set to a value that
125
the queue supports.
126
127
Parameters:
128
- message (MessageProperties): Message to enqueue
129
"""
130
131
async def enqmany(self, messages: list) -> None:
132
"""
133
Enqueues multiple messages into the queue. The messages parameter must
134
be a sequence containing message property objects which have all had
135
their payload attribute set to a value that the queue supports.
136
137
Parameters:
138
- messages (list): List of MessageProperties objects to enqueue
139
"""
140
```
141
142
### Dequeue Options
143
144
Configuration options for message dequeuing operations.
145
146
```python { .api }
147
class DeqOptions:
148
condition: str # Boolean expression for message filtering
149
consumername: str # Consumer name for multi-consumer queues
150
correlation: str # Correlation identifier with pattern matching
151
deliverymode: int # Message delivery mode (write-only)
152
mode: int # Locking behavior (DEQ_BROWSE, DEQ_LOCKED, DEQ_REMOVE, DEQ_REMOVE_NODATA)
153
msgid: bytes # Specific message identifier to dequeue
154
navigation: int # Message position (DEQ_FIRST_MSG, DEQ_NEXT_MSG, DEQ_NEXT_TRANSACTION)
155
transformation: str # Transformation function name
156
visibility: int # Transaction behavior (DEQ_ON_COMMIT, DEQ_IMMEDIATE)
157
wait: int # Wait time in seconds (DEQ_NO_WAIT, DEQ_WAIT_FOREVER, or timeout)
158
```
159
160
### Enqueue Options
161
162
Configuration options for message enqueuing operations.
163
164
```python { .api }
165
class EnqOptions:
166
deliverymode: int # Message delivery mode (MSG_PERSISTENT, MSG_BUFFERED) - write-only
167
transformation: str # Transformation function name
168
visibility: int # Transaction behavior (ENQ_ON_COMMIT, ENQ_IMMEDIATE)
169
```
170
171
### Message Properties
172
173
Properties and content of queued messages with metadata and payload management.
174
175
```python { .api }
176
class MessageProperties:
177
# Read-only properties
178
attempts: int # Number of dequeue attempts made
179
deliverymode: int # Message delivery mode
180
enqtime: datetime # Time message was enqueued
181
msgid: bytes # Message identifier
182
state: int # Message state (MSG_WAITING, MSG_READY, MSG_PROCESSED, MSG_EXPIRED)
183
184
# Read-write properties
185
correlation: str # Correlation identifier
186
delay: int # Delay before message becomes available
187
exceptionq: str # Exception queue name
188
expiration: int # Message expiration time in seconds
189
payload: Union[bytes, str, dict, list, DbObject] # Message payload
190
priority: int # Message priority (lower numbers = higher priority)
191
recipients: list # List of recipient names for targeted delivery
192
```
193
194
## Constants
195
196
```python { .api }
197
# Delivery Modes
198
MSG_PERSISTENT: int # Persistent message storage
199
MSG_BUFFERED: int # Buffered message storage
200
MSG_PERSISTENT_OR_BUFFERED: int # Either persistent or buffered
201
202
# Dequeue Modes
203
DEQ_BROWSE: int # Browse without locking
204
DEQ_LOCKED: int # Lock for update
205
DEQ_REMOVE: int # Remove from queue (default)
206
DEQ_REMOVE_NODATA: int # Remove without returning data
207
208
# Dequeue Navigation
209
DEQ_FIRST_MSG: int # First message
210
DEQ_NEXT_MSG: int # Next message (default)
211
DEQ_NEXT_TRANSACTION: int # Next transaction
212
213
# Dequeue Visibility
214
DEQ_IMMEDIATE: int # Immediate visibility
215
DEQ_ON_COMMIT: int # Visible on commit (default)
216
217
# Dequeue Wait Modes
218
DEQ_NO_WAIT: int # Don't wait for messages
219
DEQ_WAIT_FOREVER: int # Wait indefinitely (default)
220
221
# Enqueue Visibility
222
ENQ_IMMEDIATE: int # Immediate visibility
223
ENQ_ON_COMMIT: int # Visible on commit (default)
224
225
# Message States
226
MSG_EXPIRED: int # Message has expired
227
MSG_PROCESSED: int # Message has been processed
228
MSG_READY: int # Message is ready for dequeue
229
MSG_WAITING: int # Message is waiting
230
231
# Message Timing Constants
232
MSG_NO_DELAY: int # No delay for message availability
233
MSG_NO_EXPIRATION: int # Message never expires
234
```
235
236
## Usage Examples
237
238
```python
239
import oracledb
240
241
# Basic AQ usage
242
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
243
# Create/access a queue
244
queue = connection.queue("my_queue")
245
246
# Create and enqueue a message
247
message = MessageProperties()
248
message.payload = "Hello, World!"
249
message.priority = 1
250
queue.enqone(message)
251
252
# Dequeue a message
253
received_msg = queue.deqone()
254
if received_msg:
255
print(f"Received: {received_msg.payload}")
256
print(f"Priority: {received_msg.priority}")
257
print(f"Enqueue time: {received_msg.enqtime}")
258
259
connection.commit()
260
261
# Advanced AQ with options
262
with oracledb.connect(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
263
queue = connection.queue("priority_queue")
264
265
# Configure dequeue options
266
queue.deqoptions.wait = 10 # Wait 10 seconds
267
queue.deqoptions.mode = oracledb.DEQ_REMOVE
268
queue.deqoptions.navigation = oracledb.DEQ_FIRST_MSG
269
270
# Configure enqueue options
271
queue.enqoptions.visibility = oracledb.ENQ_IMMEDIATE
272
273
# Enqueue multiple messages with different priorities
274
messages = []
275
for i, priority in enumerate([3, 1, 2]):
276
msg = MessageProperties()
277
msg.payload = f"Message {i+1}"
278
msg.priority = priority
279
msg.correlation = f"batch_1_msg_{i+1}"
280
messages.append(msg)
281
282
queue.enqmany(messages)
283
284
# Dequeue messages (will come out in priority order)
285
while True:
286
msg = queue.deqone()
287
if msg is None:
288
break
289
print(f"Dequeued: {msg.payload} (priority {msg.priority})")
290
291
connection.commit()
292
293
# Async AQ operations
294
import asyncio
295
296
async def async_aq_example():
297
async with oracledb.connect_async(user="user", password="pwd", dsn="localhost/orclpdb") as connection:
298
queue = await connection.queue("async_queue")
299
300
# Async enqueue
301
message = MessageProperties()
302
message.payload = {"type": "notification", "data": "async message"}
303
await queue.enqone(message)
304
305
# Async dequeue
306
received = await queue.deqone()
307
if received:
308
print(f"Async received: {received.payload}")
309
310
await connection.commit()
311
312
asyncio.run(async_aq_example())
313
```