Advanced queue management for handling failed messages with dead letter queue configuration and source queue discovery.
Retrieves queues that use the specified queue as their dead letter queue.
class ListDeadLetterSourceQueuesCommand {
constructor(input: ListDeadLetterSourceQueuesCommandInput);
}
interface ListDeadLetterSourceQueuesCommandInput {
/** URL of the dead letter queue */
QueueUrl: string;
/** Token for pagination */
NextToken?: string;
/** Maximum results per page (1-1000) */
MaxResults?: number;
}
interface ListDeadLetterSourceQueuesCommandOutput {
/** URLs of source queues using this DLQ */
queueUrls: string[];
/** Token for next page of results */
NextToken?: string;
}Usage Examples:
import { SQSClient, ListDeadLetterSourceQueuesCommand } from "@aws-sdk/client-sqs";
const client = new SQSClient({ region: "us-east-1" });
// List all source queues for a DLQ
const sources = await client.send(new ListDeadLetterSourceQueuesCommand({
QueueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/MyApp-DLQ"
}));
console.log("Source queues using this DLQ:", sources.queueUrls);
// Paginated listing for large numbers of source queues
let nextToken: string | undefined;
const allSourceQueues: string[] = [];
do {
const result = await client.send(new ListDeadLetterSourceQueuesCommand({
QueueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/MyApp-DLQ",
MaxResults: 10,
NextToken: nextToken
}));
allSourceQueues.push(...result.queueUrls);
nextToken = result.NextToken;
} while (nextToken);
console.log(`Total source queues: ${allSourceQueues.length}`);// Step 1: Create the dead letter queue
const dlqResult = await client.send(new CreateQueueCommand({
QueueName: "MyApp-DLQ",
Attributes: {
MessageRetentionPeriod: "1209600" // 14 days maximum retention
}
}));
const dlqUrl = dlqResult.QueueUrl!;
// Step 2: Configure main queue to use DLQ
const redrivePolicy = {
deadLetterTargetArn: `arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ`,
maxReceiveCount: 3 // Move to DLQ after 3 failed attempts
};
await client.send(new SetQueueAttributesCommand({
QueueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/MyApp-Main",
Attributes: {
RedrivePolicy: JSON.stringify(redrivePolicy)
}
}));
// Step 3: Configure DLQ redrive permissions (optional)
const redriveAllowPolicy = {
redrivePermission: "byQueue",
sourceQueueArns: ["arn:aws:sqs:us-east-1:123456789012:MyApp-Main"]
};
await client.send(new SetQueueAttributesCommand({
QueueUrl: dlqUrl,
Attributes: {
RedriveAllowPolicy: JSON.stringify(redriveAllowPolicy)
}
}));// Create FIFO DLQ
const fifoDlqResult = await client.send(new CreateQueueCommand({
QueueName: "MyFifoApp-DLQ.fifo",
Attributes: {
FifoQueue: "true",
ContentBasedDeduplication: "true",
MessageRetentionPeriod: "1209600"
}
}));
// Configure FIFO main queue with DLQ
await client.send(new SetQueueAttributesCommand({
QueueUrl: "https://sqs.us-east-1.amazonaws.com/123456789012/MyFifoApp.fifo",
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: "arn:aws:sqs:us-east-1:123456789012:MyFifoApp-DLQ.fifo",
maxReceiveCount: 5
})
}
}));// Check for messages in DLQ
const dlqMessages = await client.send(new ReceiveMessageCommand({
QueueUrl: dlqUrl,
MaxNumberOfMessages: 10,
AttributeNames: ["All"],
MessageAttributeNames: ["All"]
}));
if (dlqMessages.Messages && dlqMessages.Messages.length > 0) {
console.log(`Found ${dlqMessages.Messages.length} failed messages in DLQ`);
for (const message of dlqMessages.Messages) {
console.log("Failed message:", {
messageId: message.MessageId,
body: message.Body,
receiveCount: message.Attributes?.ApproximateReceiveCount,
firstReceiveTime: message.Attributes?.ApproximateFirstReceiveTimestamp,
sentTime: message.Attributes?.SentTimestamp
});
}
}async function analyzeDLQMessages(dlqUrl: string) {
const messages = await client.send(new ReceiveMessageCommand({
QueueUrl: dlqUrl,
MaxNumberOfMessages: 10,
AttributeNames: ["All"],
MessageAttributeNames: ["All"]
}));
const analysis = {
totalMessages: messages.Messages?.length || 0,
messageTypes: new Map<string, number>(),
oldestMessage: null as Date | null,
averageReceiveCount: 0
};
if (messages.Messages) {
let totalReceiveCount = 0;
for (const message of messages.Messages) {
// Analyze message type from attributes
const messageType = message.MessageAttributes?.MessageType?.StringValue || 'unknown';
analysis.messageTypes.set(messageType, (analysis.messageTypes.get(messageType) || 0) + 1);
// Track oldest message
const sentTime = parseInt(message.Attributes?.SentTimestamp || '0');
const messageDate = new Date(sentTime);
if (!analysis.oldestMessage || messageDate < analysis.oldestMessage) {
analysis.oldestMessage = messageDate;
}
// Calculate average receive count
const receiveCount = parseInt(message.Attributes?.ApproximateReceiveCount || '0');
totalReceiveCount += receiveCount;
}
analysis.averageReceiveCount = totalReceiveCount / messages.Messages.length;
}
return analysis;
}
// Usage
const dlqAnalysis = await analyzeDLQMessages(dlqUrl);
console.log("DLQ Analysis:", dlqAnalysis);async function reprocessDLQMessages(dlqUrl: string, targetQueueUrl: string) {
const messages = await client.send(new ReceiveMessageCommand({
QueueUrl: dlqUrl,
MaxNumberOfMessages: 10,
MessageAttributeNames: ["All"]
}));
const reprocessed: string[] = [];
const failed: string[] = [];
for (const message of messages.Messages || []) {
try {
// Attempt to fix the message or apply business logic
const processedBody = await preprocessMessage(message.Body);
// Send to target queue
await client.send(new SendMessageCommand({
QueueUrl: targetQueueUrl,
MessageBody: processedBody,
MessageAttributes: message.MessageAttributes
}));
// Delete from DLQ if successfully reprocessed
await client.send(new DeleteMessageCommand({
QueueUrl: dlqUrl,
ReceiptHandle: message.ReceiptHandle!
}));
reprocessed.push(message.MessageId!);
} catch (error) {
console.error(`Failed to reprocess message ${message.MessageId}:`, error);
failed.push(message.MessageId!);
}
}
return { reprocessed, failed };
}
async function preprocessMessage(body: string | undefined): Promise<string> {
if (!body) return '';
try {
const data = JSON.parse(body);
// Apply fixes, validation, or transformation
return JSON.stringify(data);
} catch {
return body; // Return as-is if not JSON
}
}// Create a hierarchy: Main -> DLQ1 -> DLQ2 -> Final DLQ
const dlq2Result = await client.send(new CreateQueueCommand({
QueueName: "MyApp-DLQ-Level2"
}));
const finalDlqResult = await client.send(new CreateQueueCommand({
QueueName: "MyApp-DLQ-Final"
}));
// Configure Level 1 DLQ to use Level 2 DLQ
await client.send(new SetQueueAttributesCommand({
QueueUrl: dlqUrl,
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ-Level2",
maxReceiveCount: 2
})
}
}));
// Configure Level 2 DLQ to use Final DLQ
await client.send(new SetQueueAttributesCommand({
QueueUrl: dlq2Result.QueueUrl!,
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ-Final",
maxReceiveCount: 1
})
}
}));// Monitor DLQ depth and alert if messages accumulate
async function monitorDLQ(dlqUrl: string, alertThreshold: number = 10) {
const attributes = await client.send(new GetQueueAttributesCommand({
QueueUrl: dlqUrl,
AttributeNames: ["ApproximateNumberOfMessages", "QueueArn"]
}));
const messageCount = parseInt(attributes.Attributes?.ApproximateNumberOfMessages || '0');
if (messageCount >= alertThreshold) {
console.warn(`DLQ Alert: ${messageCount} messages in ${dlqUrl}`);
// Send alert (integrate with SNS, CloudWatch, etc.)
await sendDLQAlert({
queueUrl: dlqUrl,
messageCount,
queueArn: attributes.Attributes?.QueueArn
});
}
return messageCount;
}
async function sendDLQAlert(alert: {queueUrl: string, messageCount: number, queueArn?: string}) {
// Implementation would integrate with your alerting system
console.log("DLQ Alert:", alert);
}
// Set up periodic monitoring
setInterval(async () => {
await monitorDLQ(dlqUrl, 5);
}, 60000); // Check every minute// Production-ready DLQ setup
async function setupProductionDLQ(mainQueueName: string) {
const dlqName = `${mainQueueName}-DLQ`;
// Create DLQ with appropriate retention
const dlqResult = await client.send(new CreateQueueCommand({
QueueName: dlqName,
Attributes: {
MessageRetentionPeriod: "1209600", // 14 days max
VisibilityTimeoutSeconds: "300" // 5 minutes for DLQ processing
},
tags: {
Purpose: "DeadLetterQueue",
SourceQueue: mainQueueName,
Environment: "production"
}
}));
// Configure main queue with conservative retry count
await client.send(new SetQueueAttributesCommand({
QueueUrl: `https://sqs.us-east-1.amazonaws.com/123456789012/${mainQueueName}`,
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: `arn:aws:sqs:us-east-1:123456789012:${dlqName}`,
maxReceiveCount: 3 // Conservative retry count
})
}
}));
return dlqResult.QueueUrl!;
}