A Node.js module for Oracle Database access from JavaScript and TypeScript
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Oracle Advanced Queuing (AQ) support for reliable message-based communication between applications.
Get and configure Oracle Advanced Queuing queues for message processing.
/**
* Gets an Advanced Queuing queue
* @param name - Queue name
* @param options - Optional queue options
* @returns Promise resolving to AqQueue instance
*/
getQueue(name: string, options?: AqQueueOptions): Promise<AqQueue>;
interface AqQueueOptions {
payloadType?: string;
payloadTypeName?: string;
}
interface AqQueue {
// Message operations
deqOne(options?: AqDeqOptions): Promise<AqMessage>;
deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;
enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;
enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;
// Queue properties
name: string;
payloadType: number;
payloadTypeName?: string;
}Usage Examples:
const oracledb = require('oracledb');
// Get a queue
const queue = await connection.getQueue('MY_QUEUE');
// Create and enqueue a message
const message = new oracledb.AqMessage();
message.payload = 'Hello, World!';
message.correlation = 'greeting';
await queue.enqOne(message);
// Dequeue a message
const dequeuedMessage = await queue.deqOne();
if (dequeuedMessage) {
console.log('Received:', dequeuedMessage.payload);
console.log('Correlation:', dequeuedMessage.correlation);
}Enqueue messages to Oracle Advanced Queuing queues.
/**
* Enqueues a single message
* @param message - Message to enqueue
* @param options - Optional enqueue options
* @returns Promise that resolves when message is enqueued
*/
enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;
/**
* Enqueues multiple messages
* @param messages - Array of messages to enqueue
* @param options - Optional enqueue options
* @returns Promise that resolves when all messages are enqueued
*/
enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;
interface AqEnqOptions {
deliveryMode?: number;
transformation?: string;
visibility?: number;
}
class AqEnqOptions {
constructor();
deliveryMode: number;
transformation: string;
visibility: number;
}Usage Examples:
// Basic message enqueuing
const message = new oracledb.AqMessage();
message.payload = { orderId: 12345, status: 'pending' };
message.priority = 1;
message.delay = 30; // Delay 30 seconds
await queue.enqOne(message);
// Enqueue with options
const enqOptions = new oracledb.AqEnqOptions();
enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;
await queue.enqOne(message, enqOptions);
// Bulk enqueuing
const messages = [
{ payload: 'Message 1', correlation: 'batch1' },
{ payload: 'Message 2', correlation: 'batch1' },
{ payload: 'Message 3', correlation: 'batch1' }
].map(data => {
const msg = new oracledb.AqMessage();
msg.payload = data.payload;
msg.correlation = data.correlation;
return msg;
});
await queue.enqMany(messages);Dequeue messages from Oracle Advanced Queuing queues.
/**
* Dequeues a single message
* @param options - Optional dequeue options
* @returns Promise resolving to AqMessage or null
*/
deqOne(options?: AqDeqOptions): Promise<AqMessage | null>;
/**
* Dequeues multiple messages
* @param maxMessages - Maximum number of messages to dequeue
* @param options - Optional dequeue options
* @returns Promise resolving to array of AqMessage
*/
deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;
interface AqDeqOptions {
condition?: string;
consumerName?: string;
correlation?: string;
mode?: number;
msgId?: Buffer;
navigation?: number;
transformation?: string;
visibility?: number;
wait?: number;
}
class AqDeqOptions {
constructor();
condition: string;
consumerName: string;
correlation: string;
mode: number;
msgId: Buffer;
navigation: number;
transformation: string;
visibility: number;
wait: number;
}Usage Examples:
// Basic message dequeuing
const message = await queue.deqOne();
if (message) {
console.log('Payload:', message.payload);
console.log('Message ID:', message.msgId);
}
// Dequeue with options
const deqOptions = new oracledb.AqDeqOptions();
deqOptions.wait = 10; // Wait up to 10 seconds
deqOptions.correlation = 'batch1'; // Only messages with this correlation
deqOptions.mode = oracledb.AQ_DEQ_MODE_REMOVE;
const message = await queue.deqOne(deqOptions);
// Dequeue multiple messages
const messages = await queue.deqMany(5); // Get up to 5 messages
for (const msg of messages) {
console.log('Processing message:', msg.payload);
}
// Non-blocking dequeue
const deqOptions2 = new oracledb.AqDeqOptions();
deqOptions2.wait = oracledb.AQ_DEQ_NO_WAIT;
const message2 = await queue.deqOne(deqOptions2);
if (message2) {
console.log('Got immediate message:', message2.payload);
} else {
console.log('No messages available');
}Advanced Queuing message structure and properties.
class AqMessage {
constructor();
// Message content
payload: any;
// Message properties
correlation: string;
delay: number;
deliveryMode: number;
exceptionQueue: string;
expiration: number;
msgId: Buffer;
numAttempts: number;
originalMsgId: Buffer;
priority: number;
state: number;
// Recipients (for multi-consumer queues)
recipients: string[];
}Usage Examples:
// Create a comprehensive message
const message = new oracledb.AqMessage();
// Set payload (any JSON-serializable data)
message.payload = {
orderNumber: 'ORD-12345',
customerId: 67890,
items: [
{ sku: 'ITEM1', quantity: 2 },
{ sku: 'ITEM2', quantity: 1 }
],
total: 149.99
};
// Set message properties
message.correlation = 'order-processing';
message.priority = 1; // High priority
message.delay = 0; // No delay
message.expiration = 3600; // Expire after 1 hour
message.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;
// For multi-consumer queues
message.recipients = ['ORDER_PROCESSOR', 'INVENTORY_SYSTEM'];
await queue.enqOne(message);
// Process dequeued message
const receivedMessage = await queue.deqOne();
if (receivedMessage) {
console.log('Message ID:', receivedMessage.msgId);
console.log('Attempts:', receivedMessage.numAttempts);
console.log('State:', receivedMessage.state);
console.log('Order data:', receivedMessage.payload);
}// Dequeue wait options
const AQ_DEQ_NO_WAIT = 0;
const AQ_DEQ_WAIT_FOREVER = 4294967295;
// Dequeue modes
const AQ_DEQ_MODE_BROWSE = 1;
const AQ_DEQ_MODE_LOCKED = 2;
const AQ_DEQ_MODE_REMOVE = 3;
const AQ_DEQ_MODE_REMOVE_NO_DATA = 4;
// Dequeue navigation
const AQ_DEQ_NAV_FIRST_MSG = 1;
const AQ_DEQ_NAV_NEXT_TRANSACTION = 2;
const AQ_DEQ_NAV_NEXT_MSG = 3;
// Message delivery modes
const AQ_MSG_DELIV_MODE_PERSISTENT = 1;
const AQ_MSG_DELIV_MODE_BUFFERED = 2;
const AQ_MSG_DELIV_MODE_PERSISTENT_OR_BUFFERED = 3;
// Message states
const AQ_MSG_STATE_READY = 0;
const AQ_MSG_STATE_WAITING = 1;
const AQ_MSG_STATE_PROCESSED = 2;
const AQ_MSG_STATE_EXPIRED = 3;
// Visibility modes
const AQ_VISIBILITY_IMMEDIATE = 1;
const AQ_VISIBILITY_ON_COMMIT = 2;Usage Examples:
// Configure dequeue behavior
const deqOptions = new oracledb.AqDeqOptions();
deqOptions.mode = oracledb.AQ_DEQ_MODE_BROWSE; // Don't remove from queue
deqOptions.navigation = oracledb.AQ_DEQ_NAV_FIRST_MSG; // Start from first
deqOptions.wait = 30; // Wait up to 30 seconds
// Browse messages without removing them
const message = await queue.deqOne(deqOptions);
// Configure enqueue behavior
const enqOptions = new oracledb.AqEnqOptions();
enqOptions.visibility = oracledb.AQ_VISIBILITY_ON_COMMIT; // Visible after commit
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT; // Persistent storage
// Check message state
const message2 = await queue.deqOne();
if (message2) {
switch (message2.state) {
case oracledb.AQ_MSG_STATE_READY:
console.log('Message is ready for processing');
break;
case oracledb.AQ_MSG_STATE_WAITING:
console.log('Message is waiting (delayed)');
break;
case oracledb.AQ_MSG_STATE_PROCESSED:
console.log('Message has been processed');
break;
case oracledb.AQ_MSG_STATE_EXPIRED:
console.log('Message has expired');
break;
}
}Common error handling patterns for Advanced Queuing operations.
Usage Examples:
// Handle empty queue
try {
const deqOptions = new oracledb.AqDeqOptions();
deqOptions.wait = oracledb.AQ_DEQ_NO_WAIT;
const message = await queue.deqOne(deqOptions);
if (!message) {
console.log('Queue is empty');
}
} catch (error) {
if (error.message.includes('ORA-25228')) {
console.log('No messages available');
} else {
throw error;
}
}
// Handle message expiration
try {
await queue.enqOne(message);
} catch (error) {
if (error.message.includes('ORA-25226')) {
console.log('Message expired before enqueuing');
} else {
throw error;
}
}
// Retry logic for dequeuing
async function dequeueWithRetry(queue, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
const message = await queue.deqOne();
if (message) return message;
// Wait before retry
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
} catch (error) {
console.log(`Dequeue attempt ${i + 1} failed:`, error.message);
if (i === maxRetries - 1) throw error;
}
}
return null;
}Install with Tessl CLI
npx tessl i tessl/npm-oracledb