CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-oracledb

A Node.js module for Oracle Database access from JavaScript and TypeScript

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

advanced-queuing.mddocs/

Advanced Queuing

Oracle Advanced Queuing (AQ) support for reliable message-based communication between applications.

Capabilities

Queue Management

Get and configure Oracle Advanced Queuing queues for message processing.

/**
 * Gets an Advanced Queuing queue
 * @param name - Queue name
 * @param options - Optional queue options
 * @returns Promise resolving to AqQueue instance
 */
getQueue(name: string, options?: AqQueueOptions): Promise<AqQueue>;

interface AqQueueOptions {
  payloadType?: string;
  payloadTypeName?: string;
}

interface AqQueue {
  // Message operations
  deqOne(options?: AqDeqOptions): Promise<AqMessage>;
  deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;
  enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;
  enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;
  
  // Queue properties
  name: string;
  payloadType: number;
  payloadTypeName?: string;
}

Usage Examples:

const oracledb = require('oracledb');

// Get a queue
const queue = await connection.getQueue('MY_QUEUE');

// Create and enqueue a message
const message = new oracledb.AqMessage();
message.payload = 'Hello, World!';
message.correlation = 'greeting';

await queue.enqOne(message);

// Dequeue a message
const dequeuedMessage = await queue.deqOne();
if (dequeuedMessage) {
  console.log('Received:', dequeuedMessage.payload);
  console.log('Correlation:', dequeuedMessage.correlation);
}

Message Enqueuing

Enqueue messages to Oracle Advanced Queuing queues.

/**
 * Enqueues a single message
 * @param message - Message to enqueue
 * @param options - Optional enqueue options
 * @returns Promise that resolves when message is enqueued
 */
enqOne(message: AqMessage, options?: AqEnqOptions): Promise<void>;

/**
 * Enqueues multiple messages
 * @param messages - Array of messages to enqueue
 * @param options - Optional enqueue options
 * @returns Promise that resolves when all messages are enqueued
 */
enqMany(messages: AqMessage[], options?: AqEnqOptions): Promise<void>;

interface AqEnqOptions {
  deliveryMode?: number;
  transformation?: string;
  visibility?: number;
}

class AqEnqOptions {
  constructor();
  deliveryMode: number;
  transformation: string;
  visibility: number;
}

Usage Examples:

// Basic message enqueuing
const message = new oracledb.AqMessage();
message.payload = { orderId: 12345, status: 'pending' };
message.priority = 1;
message.delay = 30; // Delay 30 seconds

await queue.enqOne(message);

// Enqueue with options
const enqOptions = new oracledb.AqEnqOptions();
enqOptions.visibility = oracledb.AQ_VISIBILITY_IMMEDIATE;
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;

await queue.enqOne(message, enqOptions);

// Bulk enqueuing
const messages = [
  { payload: 'Message 1', correlation: 'batch1' },
  { payload: 'Message 2', correlation: 'batch1' },
  { payload: 'Message 3', correlation: 'batch1' }
].map(data => {
  const msg = new oracledb.AqMessage();
  msg.payload = data.payload;
  msg.correlation = data.correlation;
  return msg;
});

await queue.enqMany(messages);

Message Dequeuing

Dequeue messages from Oracle Advanced Queuing queues.

/**
 * Dequeues a single message
 * @param options - Optional dequeue options
 * @returns Promise resolving to AqMessage or null
 */
deqOne(options?: AqDeqOptions): Promise<AqMessage | null>;

/**
 * Dequeues multiple messages
 * @param maxMessages - Maximum number of messages to dequeue
 * @param options - Optional dequeue options
 * @returns Promise resolving to array of AqMessage
 */
deqMany(maxMessages: number, options?: AqDeqOptions): Promise<AqMessage[]>;

interface AqDeqOptions {
  condition?: string;
  consumerName?: string;
  correlation?: string;
  mode?: number;
  msgId?: Buffer;
  navigation?: number;
  transformation?: string;
  visibility?: number;
  wait?: number;
}

class AqDeqOptions {
  constructor();
  condition: string;
  consumerName: string;
  correlation: string;
  mode: number;
  msgId: Buffer;
  navigation: number;
  transformation: string;
  visibility: number;
  wait: number;
}

Usage Examples:

// Basic message dequeuing
const message = await queue.deqOne();
if (message) {
  console.log('Payload:', message.payload);
  console.log('Message ID:', message.msgId);
}

// Dequeue with options
const deqOptions = new oracledb.AqDeqOptions();
deqOptions.wait = 10; // Wait up to 10 seconds
deqOptions.correlation = 'batch1'; // Only messages with this correlation
deqOptions.mode = oracledb.AQ_DEQ_MODE_REMOVE;

const message = await queue.deqOne(deqOptions);

// Dequeue multiple messages
const messages = await queue.deqMany(5); // Get up to 5 messages
for (const msg of messages) {
  console.log('Processing message:', msg.payload);
}

// Non-blocking dequeue
const deqOptions2 = new oracledb.AqDeqOptions();
deqOptions2.wait = oracledb.AQ_DEQ_NO_WAIT;

const message2 = await queue.deqOne(deqOptions2);
if (message2) {
  console.log('Got immediate message:', message2.payload);
} else {
  console.log('No messages available');
}

Message Structure

Advanced Queuing message structure and properties.

class AqMessage {
  constructor();
  
  // Message content
  payload: any;
  
  // Message properties
  correlation: string;
  delay: number;
  deliveryMode: number;
  exceptionQueue: string;
  expiration: number;
  msgId: Buffer;
  numAttempts: number;
  originalMsgId: Buffer;
  priority: number;
  state: number;
  
  // Recipients (for multi-consumer queues)
  recipients: string[];
}

Usage Examples:

// Create a comprehensive message
const message = new oracledb.AqMessage();

// Set payload (any JSON-serializable data)
message.payload = {
  orderNumber: 'ORD-12345',
  customerId: 67890,
  items: [
    { sku: 'ITEM1', quantity: 2 },
    { sku: 'ITEM2', quantity: 1 }
  ],
  total: 149.99
};

// Set message properties
message.correlation = 'order-processing';
message.priority = 1; // High priority
message.delay = 0; // No delay
message.expiration = 3600; // Expire after 1 hour
message.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT;

// For multi-consumer queues
message.recipients = ['ORDER_PROCESSOR', 'INVENTORY_SYSTEM'];

await queue.enqOne(message);

// Process dequeued message
const receivedMessage = await queue.deqOne();
if (receivedMessage) {
  console.log('Message ID:', receivedMessage.msgId);
  console.log('Attempts:', receivedMessage.numAttempts);
  console.log('State:', receivedMessage.state);
  console.log('Order data:', receivedMessage.payload);
}

Queue Constants

// Dequeue wait options
const AQ_DEQ_NO_WAIT = 0;
const AQ_DEQ_WAIT_FOREVER = 4294967295;

// Dequeue modes
const AQ_DEQ_MODE_BROWSE = 1;
const AQ_DEQ_MODE_LOCKED = 2;
const AQ_DEQ_MODE_REMOVE = 3;
const AQ_DEQ_MODE_REMOVE_NO_DATA = 4;

// Dequeue navigation
const AQ_DEQ_NAV_FIRST_MSG = 1;
const AQ_DEQ_NAV_NEXT_TRANSACTION = 2;
const AQ_DEQ_NAV_NEXT_MSG = 3;

// Message delivery modes
const AQ_MSG_DELIV_MODE_PERSISTENT = 1;
const AQ_MSG_DELIV_MODE_BUFFERED = 2;
const AQ_MSG_DELIV_MODE_PERSISTENT_OR_BUFFERED = 3;

// Message states
const AQ_MSG_STATE_READY = 0;
const AQ_MSG_STATE_WAITING = 1;
const AQ_MSG_STATE_PROCESSED = 2;
const AQ_MSG_STATE_EXPIRED = 3;

// Visibility modes
const AQ_VISIBILITY_IMMEDIATE = 1;
const AQ_VISIBILITY_ON_COMMIT = 2;

Usage Examples:

// Configure dequeue behavior
const deqOptions = new oracledb.AqDeqOptions();
deqOptions.mode = oracledb.AQ_DEQ_MODE_BROWSE; // Don't remove from queue
deqOptions.navigation = oracledb.AQ_DEQ_NAV_FIRST_MSG; // Start from first
deqOptions.wait = 30; // Wait up to 30 seconds

// Browse messages without removing them
const message = await queue.deqOne(deqOptions);

// Configure enqueue behavior
const enqOptions = new oracledb.AqEnqOptions();
enqOptions.visibility = oracledb.AQ_VISIBILITY_ON_COMMIT; // Visible after commit
enqOptions.deliveryMode = oracledb.AQ_MSG_DELIV_MODE_PERSISTENT; // Persistent storage

// Check message state
const message2 = await queue.deqOne();
if (message2) {
  switch (message2.state) {
    case oracledb.AQ_MSG_STATE_READY:
      console.log('Message is ready for processing');
      break;
    case oracledb.AQ_MSG_STATE_WAITING:
      console.log('Message is waiting (delayed)');
      break;
    case oracledb.AQ_MSG_STATE_PROCESSED:
      console.log('Message has been processed');
      break;
    case oracledb.AQ_MSG_STATE_EXPIRED:
      console.log('Message has expired');
      break;
  }
}

Error Handling

Common error handling patterns for Advanced Queuing operations.

Usage Examples:

// Handle empty queue
try {
  const deqOptions = new oracledb.AqDeqOptions();
  deqOptions.wait = oracledb.AQ_DEQ_NO_WAIT;
  
  const message = await queue.deqOne(deqOptions);
  if (!message) {
    console.log('Queue is empty');
  }
} catch (error) {
  if (error.message.includes('ORA-25228')) {
    console.log('No messages available');
  } else {
    throw error;
  }
}

// Handle message expiration
try {
  await queue.enqOne(message);
} catch (error) {
  if (error.message.includes('ORA-25226')) {
    console.log('Message expired before enqueuing');
  } else {
    throw error;
  }
}

// Retry logic for dequeuing
async function dequeueWithRetry(queue, maxRetries = 3) {
  for (let i = 0; i < maxRetries; i++) {
    try {
      const message = await queue.deqOne();
      if (message) return message;
      
      // Wait before retry
      await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
    } catch (error) {
      console.log(`Dequeue attempt ${i + 1} failed:`, error.message);
      if (i === maxRetries - 1) throw error;
    }
  }
  return null;
}

Install with Tessl CLI

npx tessl i tessl/npm-oracledb

docs

advanced-queuing.md

configuration-settings.md

connection-management.md

connection-pools.md

data-types-lobs.md

index.md

soda-operations.md

sql-execution.md

transaction-management.md

tile.json