Advanced message management for moving messages between queues with progress tracking and cancellation support.
Initiates a task to move messages from one queue to another, typically used for redriving messages from dead letter queues.
class StartMessageMoveTaskCommand {
constructor(input: StartMessageMoveTaskCommandInput);
}
interface StartMessageMoveTaskCommandInput {
/** ARN of the source queue */
SourceArn: string;
/** ARN of the destination queue (optional, defaults to original source) */
DestinationArn?: string;
/** Maximum messages to move per second (1-500) */
MaxNumberOfMessagesPerSecond?: number;
}
interface StartMessageMoveTaskCommandOutput {
/** Unique task handle for tracking */
TaskHandle?: string;
}Usage Examples:
import { SQSClient, StartMessageMoveTaskCommand } from "@aws-sdk/client-sqs";
const client = new SQSClient({ region: "us-east-1" });
// Move messages from DLQ back to main queue
const moveTask = await client.send(new StartMessageMoveTaskCommand({
SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ",
DestinationArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-Main",
MaxNumberOfMessagesPerSecond: 100
}));
console.log("Move task started with handle:", moveTask.TaskHandle);
// Move all messages from DLQ back to original source (auto-determined)
const autoMoveTask = await client.send(new StartMessageMoveTaskCommand({
SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ"
// DestinationArn omitted - SQS determines original source queue
}));
// Throttled move for large volumes
const throttledTask = await client.send(new StartMessageMoveTaskCommand({
SourceArn: "arn:aws:sqs:us-east-1:123456789012:LargeDLQ",
DestinationArn: "arn:aws:sqs:us-east-1:123456789012:ProcessingQueue",
MaxNumberOfMessagesPerSecond: 10 // Slow, controlled move
}));Stops an active message move task using its task handle.
class CancelMessageMoveTaskCommand {
constructor(input: CancelMessageMoveTaskCommandInput);
}
interface CancelMessageMoveTaskCommandInput {
/** Task handle from StartMessageMoveTask */
TaskHandle: string;
}
interface CancelMessageMoveTaskCommandOutput {
/** Approximate number of messages moved before cancellation */
ApproximateNumberOfMessagesMoved?: number;
}Usage Examples:
// Cancel a running move task
const cancelResult = await client.send(new CancelMessageMoveTaskCommand({
TaskHandle: moveTask.TaskHandle!
}));
console.log("Task cancelled. Messages moved:", cancelResult.ApproximateNumberOfMessagesMoved);
// Cancel with error handling
try {
await client.send(new CancelMessageMoveTaskCommand({
TaskHandle: "invalid-task-handle"
}));
} catch (error) {
if (error.name === 'ResourceNotFoundException') {
console.log("Task not found or already completed");
}
}Retrieves active and recent message move tasks for a source queue.
class ListMessageMoveTasksCommand {
constructor(input: ListMessageMoveTasksCommandInput);
}
interface ListMessageMoveTasksCommandInput {
/** ARN of the source queue */
SourceArn: string;
/** Maximum results to return */
MaxResults?: number;
}
interface ListMessageMoveTasksCommandOutput {
/** Array of move task entries */
Results?: ListMessageMoveTasksResultEntry[];
}
interface ListMessageMoveTasksResultEntry {
/** Task handle */
TaskHandle?: string;
/** Current task status */
Status?: string;
/** Source queue ARN */
SourceArn?: string;
/** Destination queue ARN */
DestinationArn?: string;
/** Maximum messages per second */
MaxNumberOfMessagesPerSecond?: number;
/** Approximate messages moved */
ApproximateNumberOfMessagesMoved?: number;
/** Approximate messages to move */
ApproximateNumberOfMessagesToMove?: number;
/** Failure reason if task failed */
FailureReason?: string;
/** Task start timestamp */
StartedTimestamp?: number;
}Usage Examples:
// List all move tasks for a queue
const tasks = await client.send(new ListMessageMoveTasksCommand({
SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ"
}));
console.log(`Found ${tasks.Results?.length || 0} move tasks`);
for (const task of tasks.Results || []) {
console.log("Task:", {
handle: task.TaskHandle,
status: task.Status,
moved: task.ApproximateNumberOfMessagesMoved,
total: task.ApproximateNumberOfMessagesToMove,
rate: task.MaxNumberOfMessagesPerSecond
});
}
// Monitor task progress
async function monitorMoveTask(sourceArn: string, taskHandle: string) {
let completed = false;
while (!completed) {
const tasks = await client.send(new ListMessageMoveTasksCommand({
SourceArn: sourceArn
}));
const currentTask = tasks.Results?.find(t => t.TaskHandle === taskHandle);
if (!currentTask) {
console.log("Task not found or completed");
completed = true;
} else {
console.log(`Task ${currentTask.Status}: ${currentTask.ApproximateNumberOfMessagesMoved}/${currentTask.ApproximateNumberOfMessagesToMove} messages`);
if (currentTask.Status === 'COMPLETED' || currentTask.Status === 'CANCELLED' || currentTask.Status === 'FAILED') {
completed = true;
if (currentTask.FailureReason) {
console.error("Task failed:", currentTask.FailureReason);
}
} else {
// Wait before checking again
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
// Usage
await monitorMoveTask("arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ", moveTask.TaskHandle!);async function recoverFromDLQ(dlqArn: string, targetArn: string, batchSize: number = 1000) {
// Get DLQ message count first
const dlqUrl = arnToUrl(dlqArn);
const attributes = await client.send(new GetQueueAttributesCommand({
QueueUrl: dlqUrl,
AttributeNames: ["ApproximateNumberOfMessages"]
}));
const totalMessages = parseInt(attributes.Attributes?.ApproximateNumberOfMessages || '0');
console.log(`Recovering ${totalMessages} messages from DLQ`);
if (totalMessages === 0) {
console.log("No messages to recover");
return;
}
// Calculate optimal rate based on batch size
const optimalRate = Math.min(500, Math.max(10, Math.floor(batchSize / 10)));
const moveTask = await client.send(new StartMessageMoveTaskCommand({
SourceArn: dlqArn,
DestinationArn: targetArn,
MaxNumberOfMessagesPerSecond: optimalRate
}));
console.log(`Started recovery task: ${moveTask.TaskHandle}`);
// Monitor progress
return monitorMoveTask(dlqArn, moveTask.TaskHandle!);
}
function arnToUrl(arn: string): string {
// Convert ARN to SQS URL format
const parts = arn.split(':');
const region = parts[3];
const accountId = parts[4];
const queueName = parts[5];
return `https://sqs.${region}.amazonaws.com/${accountId}/${queueName}`;
}
// Usage
await recoverFromDLQ(
"arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ",
"arn:aws:sqs:us-east-1:123456789012:MyApp-Main",
500
);async function moveFilteredMessages(
sourceArn: string,
targetArn: string,
filter: (message: any) => boolean
) {
// For selective moving, we need to manually process messages
// as move tasks move ALL messages
const sourceUrl = arnToUrl(sourceArn);
const targetUrl = arnToUrl(targetArn);
let processed = 0;
let moved = 0;
while (true) {
const messages = await client.send(new ReceiveMessageCommand({
QueueUrl: sourceUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1, // Short poll
MessageAttributeNames: ["All"]
}));
if (!messages.Messages || messages.Messages.length === 0) {
break; // No more messages
}
const messagesToMove: any[] = [];
const messagesToDelete: DeleteMessageBatchRequestEntry[] = [];
for (let i = 0; i < messages.Messages.length; i++) {
const message = messages.Messages[i];
processed++;
try {
const messageData = JSON.parse(message.Body || '{}');
if (filter(messageData)) {
// Send to target queue
await client.send(new SendMessageCommand({
QueueUrl: targetUrl,
MessageBody: message.Body!,
MessageAttributes: message.MessageAttributes
}));
moved++;
}
// Mark for deletion from source
messagesToDelete.push({
Id: `del_${i}`,
ReceiptHandle: message.ReceiptHandle!
});
} catch (error) {
console.error(`Error processing message ${message.MessageId}:`, error);
}
}
// Delete processed messages from source
if (messagesToDelete.length > 0) {
await client.send(new DeleteMessageBatchCommand({
QueueUrl: sourceUrl,
Entries: messagesToDelete
}));
}
console.log(`Processed: ${processed}, Moved: ${moved}`);
}
return { processed, moved };
}
// Example: Move only high-priority messages
const result = await moveFilteredMessages(
"arn:aws:sqs:us-east-1:123456789012:Mixed-DLQ",
"arn:aws:sqs:us-east-1:123456789012:Priority-Queue",
(message) => message.priority === 'high'
);
console.log(`Selective move completed: ${result.moved}/${result.processed} messages moved`);async function moveMessagesCrossRegion(
sourceArn: string,
targetRegion: string,
targetQueueName: string
) {
// Message move tasks don't support cross-region moves
// Must manually copy messages
const sourceUrl = arnToUrl(sourceArn);
const sourceRegion = sourceArn.split(':')[3];
// Create clients for each region
const sourceClient = new SQSClient({ region: sourceRegion });
const targetClient = new SQSClient({ region: targetRegion });
// Get target queue URL
const targetUrlResult = await targetClient.send(new GetQueueUrlCommand({
QueueName: targetQueueName
}));
const targetUrl = targetUrlResult.QueueUrl!;
let totalMoved = 0;
while (true) {
// Receive from source region
const messages = await sourceClient.send(new ReceiveMessageCommand({
QueueUrl: sourceUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1
}));
if (!messages.Messages || messages.Messages.length === 0) {
break;
}
// Send to target region
const sendEntries: SendMessageBatchRequestEntry[] = [];
const deleteEntries: DeleteMessageBatchRequestEntry[] = [];
for (let i = 0; i < messages.Messages.length; i++) {
const message = messages.Messages[i];
sendEntries.push({
Id: `send_${i}`,
MessageBody: message.Body!,
MessageAttributes: message.MessageAttributes
});
deleteEntries.push({
Id: `del_${i}`,
ReceiptHandle: message.ReceiptHandle!
});
}
// Send batch to target
await targetClient.send(new SendMessageBatchCommand({
QueueUrl: targetUrl,
Entries: sendEntries
}));
// Delete from source
await sourceClient.send(new DeleteMessageBatchCommand({
QueueUrl: sourceUrl,
Entries: deleteEntries
}));
totalMoved += messages.Messages.length;
console.log(`Moved ${totalMoved} messages to ${targetRegion}`);
}
return totalMoved;
}Common errors for message move operations:
try {
const moveTask = await client.send(new StartMessageMoveTaskCommand({
SourceArn: "arn:aws:sqs:us-east-1:123456789012:NonExistentQueue"
}));
} catch (error) {
if (error.name === 'ResourceNotFoundException') {
console.error("Source queue not found");
} else if (error.name === 'UnsupportedOperation') {
console.error("Move operation not supported for this queue type");
} else if (error.name === 'RequestThrottled') {
console.error("Too many concurrent move tasks, try again later");
}
}// Track and manage multiple move tasks
class MoveTaskManager {
private activeTasks = new Map<string, {
handle: string;
sourceArn: string;
startTime: Date;
}>();
async startMove(sourceArn: string, destinationArn?: string, rate?: number) {
const result = await client.send(new StartMessageMoveTaskCommand({
SourceArn: sourceArn,
DestinationArn: destinationArn,
MaxNumberOfMessagesPerSecond: rate
}));
this.activeTasks.set(result.TaskHandle!, {
handle: result.TaskHandle!,
sourceArn,
startTime: new Date()
});
return result.TaskHandle!;
}
async cancelAll() {
const promises = Array.from(this.activeTasks.values()).map(task =>
client.send(new CancelMessageMoveTaskCommand({
TaskHandle: task.handle
}))
);
await Promise.allSettled(promises);
this.activeTasks.clear();
}
async getStatus() {
const status = new Map();
for (const [handle, task] of this.activeTasks) {
const tasks = await client.send(new ListMessageMoveTasksCommand({
SourceArn: task.sourceArn
}));
const currentTask = tasks.Results?.find(t => t.TaskHandle === handle);
if (currentTask) {
status.set(handle, currentTask);
} else {
this.activeTasks.delete(handle); // Task completed
}
}
return status;
}
}