0
# Advanced Queuing
1
2
Oracle Advanced Queuing (AQ) support for reliable message-based communication between applications.
3
4
## Capabilities
5
6
### Queue Management
7
8
Get and configure Oracle Advanced Queuing queues for message processing.
9
10
```javascript { .api }
11
/**
12
* Gets an Advanced Queuing queue
13
* @param name - Queue name
14
* @param options - Optional queue options
15
* @returns Promise resolving to AqQueue instance
16
*/
17
getQueue(name: string, options?: AqQueueOptions): Promise<AqQueue>;
18
19
interface AqQueueOptions {
20
payloadType?: string;
21
payloadTypeName?: string;
22
}
23
24
interface AqQueue {
25
// Message operations
26
deqOne(options?: AqDeqOptions): Promise<AqMessage>;
27
deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;
28
enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;
29
enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;
30
31
// Queue properties
32
name: string;
33
payloadType: number;
34
payloadTypeName?: string;
35
}
36
```
37
38
**Usage Examples:**
39
40
```javascript
41
const oracledb = require('oracledb');
42
43
// Get a queue
44
const queue = await connection.getQueue('MY_QUEUE');
45
46
// Create and enqueue a message
47
const message = new oracledb.AqMessage();
48
message.payload = 'Hello, World!';
49
message.correlation = 'greeting';
50
51
await queue.enqOne(message);
52
53
// Dequeue a message
54
const dequeuedMessage = await queue.deqOne();
55
if (dequeuedMessage) {
56
console.log('Received:', dequeuedMessage.payload);
57
console.log('Correlation:', dequeuedMessage.correlation);
58
}
59
```
60
61
### Message Enqueuing
62
63
Enqueue messages to Oracle Advanced Queuing queues.
64
65
```javascript { .api }
66
/**
67
* Enqueues a single message
68
* @param message - Message to enqueue
69
* @param options - Optional enqueue options
70
* @returns Promise that resolves when message is enqueued
71
*/
72
enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;
73
74
/**
75
* Enqueues multiple messages
76
* @param messages - Array of messages to enqueue
77
* @param options - Optional enqueue options
78
* @returns Promise that resolves when all messages are enqueued
79
*/
80
enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;
81
82
interface AqEnqOptions {
83
deliveryMode?: number;
84
transformation?: string;
85
visibility?: number;
86
}
87
88
class AqEnqOptions {
89
constructor();
90
deliveryMode: number;
91
transformation: string;
92
visibility: number;
93
}
94
```
95
96
**Usage Examples:**
97
98
```javascript
99
// Basic message enqueuing
100
const message = new oracledb.AqMessage();
101
message.payload = { orderId: 12345, status: 'pending' };
102
message.priority = 1;
103
message.delay = 30; // Delay 30 seconds
104
105
await queue.enqOne(message);
106
107
// Enqueue with options
108
const enqOptions = new oracledb.AqEnqOptions();
109
enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
110
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;
111
112
await queue.enqOne(message, enqOptions);
113
114
// Bulk enqueuing
115
const messages = [
116
{ payload: 'Message 1', correlation: 'batch1' },
117
{ payload: 'Message 2', correlation: 'batch1' },
118
{ payload: 'Message 3', correlation: 'batch1' }
119
].map(data => {
120
const msg = new oracledb.AqMessage();
121
msg.payload = data.payload;
122
msg.correlation = data.correlation;
123
return msg;
124
});
125
126
await queue.enqMany(messages);
127
```
128
129
### Message Dequeuing
130
131
Dequeue messages from Oracle Advanced Queuing queues.
132
133
```javascript { .api }
134
/**
135
* Dequeues a single message
136
* @param options - Optional dequeue options
137
* @returns Promise resolving to AqMessage or null
138
*/
139
deqOne(options?: AqDeqOptions): Promise<AqMessage | null>;
140
141
/**
142
* Dequeues multiple messages
143
* @param maxMessages - Maximum number of messages to dequeue
144
* @param options - Optional dequeue options
145
* @returns Promise resolving to array of AqMessage
146
*/
147
deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;
148
149
interface AqDeqOptions {
150
condition?: string;
151
consumerName?: string;
152
correlation?: string;
153
mode?: number;
154
msgId?: Buffer;
155
navigation?: number;
156
transformation?: string;
157
visibility?: number;
158
wait?: number;
159
}
160
161
class AqDeqOptions {
162
constructor();
163
condition: string;
164
consumerName: string;
165
correlation: string;
166
mode: number;
167
msgId: Buffer;
168
navigation: number;
169
transformation: string;
170
visibility: number;
171
wait: number;
172
}
173
```
174
175
**Usage Examples:**
176
177
```javascript
178
// Basic message dequeuing
179
const message = await queue.deqOne();
180
if (message) {
181
console.log('Payload:', message.payload);
182
console.log('Message ID:', message.msgId);
183
}
184
185
// Dequeue with options
186
const deqOptions = new oracledb.AqDeqOptions();
187
deqOptions.wait = 10; // Wait up to 10 seconds
188
deqOptions.correlation = 'batch1'; // Only messages with this correlation
189
deqOptions.mode = oracledb.AQ_DEQ_MODE_REMOVE;
190
191
const message = await queue.deqOne(deqOptions);
192
193
// Dequeue multiple messages
194
const messages = await queue.deqMany(5); // Get up to 5 messages
195
for (const msg of messages) {
196
console.log('Processing message:', msg.payload);
197
}
198
199
// Non-blocking dequeue
200
const deqOptions2 = new oracledb.AqDeqOptions();
201
deqOptions2.wait = oracledb.AQ_DEQ_NO_WAIT;
202
203
const message2 = await queue.deqOne(deqOptions2);
204
if (message2) {
205
console.log('Got immediate message:', message2.payload);
206
} else {
207
console.log('No messages available');
208
}
209
```
210
211
### Message Structure
212
213
Advanced Queuing message structure and properties.
214
215
```javascript { .api }
216
class AqMessage {
217
constructor();
218
219
// Message content
220
payload: any;
221
222
// Message properties
223
correlation: string;
224
delay: number;
225
deliveryMode: number;
226
exceptionQueue: string;
227
expiration: number;
228
msgId: Buffer;
229
numAttempts: number;
230
originalMsgId: Buffer;
231
priority: number;
232
state: number;
233
234
// Recipients (for multi-consumer queues)
235
recipients: string[];
236
}
237
```
238
239
**Usage Examples:**
240
241
```javascript
242
// Create a comprehensive message
243
const message = new oracledb.AqMessage();
244
245
// Set payload (any JSON-serializable data)
246
message.payload = {
247
orderNumber: 'ORD-12345',
248
customerId: 67890,
249
items: [
250
{ sku: 'ITEM1', quantity: 2 },
251
{ sku: 'ITEM2', quantity: 1 }
252
],
253
total: 149.99
254
};
255
256
// Set message properties
257
message.correlation = 'order-processing';
258
message.priority = 1; // High priority
259
message.delay = 0; // No delay
260
message.expiration = 3600; // Expire after 1 hour
261
message.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;
262
263
// For multi-consumer queues
264
message.recipients = ['ORDER_PROCESSOR', 'INVENTORY_SYSTEM'];
265
266
await queue.enqOne(message);
267
268
// Process dequeued message
269
const receivedMessage = await queue.deqOne();
270
if (receivedMessage) {
271
console.log('Message ID:', receivedMessage.msgId);
272
console.log('Attempts:', receivedMessage.numAttempts);
273
console.log('State:', receivedMessage.state);
274
console.log('Order data:', receivedMessage.payload);
275
}
276
```
277
278
### Queue Constants
279
280
```javascript { .api }
281
// Dequeue wait options
282
const AQ_DEQ_NO_WAIT = 0;
283
const AQ_DEQ_WAIT_FOREVER = 4294967295;
284
285
// Dequeue modes
286
const AQ_DEQ_MODE_BROWSE = 1;
287
const AQ_DEQ_MODE_LOCKED = 2;
288
const AQ_DEQ_MODE_REMOVE = 3;
289
const AQ_DEQ_MODE_REMOVE_NO_DATA = 4;
290
291
// Dequeue navigation
292
const AQ_DEQ_NAV_FIRST_MSG = 1;
293
const AQ_DEQ_NAV_NEXT_TRANSACTION = 2;
294
const AQ_DEQ_NAV_NEXT_MSG = 3;
295
296
// Message delivery modes
297
const AQ_MSG_DELIV_MODE_PERSISTENT = 1;
298
const AQ_MSG_DELIV_MODE_BUFFERED = 2;
299
const AQ_MSG_DELIV_MODE_PERSISTENT_OR_BUFFERED = 3;
300
301
// Message states
302
const AQ_MSG_STATE_READY = 0;
303
const AQ_MSG_STATE_WAITING = 1;
304
const AQ_MSG_STATE_PROCESSED = 2;
305
const AQ_MSG_STATE_EXPIRED = 3;
306
307
// Visibility modes
308
const AQ_VISIBILITY_IMMEDIATE = 1;
309
const AQ_VISIBILITY_ON_COMMIT = 2;
310
```
311
312
**Usage Examples:**
313
314
```javascript
315
// Configure dequeue behavior
316
const deqOptions = new oracledb.AqDeqOptions();
317
deqOptions.mode = oracledb.AQ_DEQ_MODE_BROWSE; // Don't remove from queue
318
deqOptions.navigation = oracledb.AQ_DEQ_NAV_FIRST_MSG; // Start from first
319
deqOptions.wait = 30; // Wait up to 30 seconds
320
321
// Browse messages without removing them
322
const message = await queue.deqOne(deqOptions);
323
324
// Configure enqueue behavior
325
const enqOptions = new oracledb.AqEnqOptions();
326
enqOptions.visibility = oracledb.AQ_VISIBILITY_ON_COMMIT; // Visible after commit
327
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT; // Persistent storage
328
329
// Check message state
330
const message2 = await queue.deqOne();
331
if (message2) {
332
switch (message2.state) {
333
case oracledb.AQ_MSG_STATE_READY:
334
console.log('Message is ready for processing');
335
break;
336
case oracledb.AQ_MSG_STATE_WAITING:
337
console.log('Message is waiting (delayed)');
338
break;
339
case oracledb.AQ_MSG_STATE_PROCESSED:
340
console.log('Message has been processed');
341
break;
342
case oracledb.AQ_MSG_STATE_EXPIRED:
343
console.log('Message has expired');
344
break;
345
}
346
}
347
```
348
349
### Error Handling
350
351
Common error handling patterns for Advanced Queuing operations.
352
353
**Usage Examples:**
354
355
```javascript
356
// Handle empty queue
357
try {
358
const deqOptions = new oracledb.AqDeqOptions();
359
deqOptions.wait = oracledb.AQ_DEQ_NO_WAIT;
360
361
const message = await queue.deqOne(deqOptions);
362
if (!message) {
363
console.log('Queue is empty');
364
}
365
} catch (error) {
366
if (error.message.includes('ORA-25228')) {
367
console.log('No messages available');
368
} else {
369
throw error;
370
}
371
}
372
373
// Handle message expiration
374
try {
375
await queue.enqOne(message);
376
} catch (error) {
377
if (error.message.includes('ORA-25226')) {
378
console.log('Message expired before enqueuing');
379
} else {
380
throw error;
381
}
382
}
383
384
// Retry logic for dequeuing
385
async function dequeueWithRetry(queue, maxRetries = 3) {
386
for (let i = 0; i < maxRetries; i++) {
387
try {
388
const message = await queue.deqOne();
389
if (message) return message;
390
391
// Wait before retry
392
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
393
} catch (error) {
394
console.log(`Dequeue attempt ${i + 1} failed:`, error.message);
395
if (i === maxRetries - 1) throw error;
396
}
397
}
398
return null;
399
}
400
```