or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batch-operations.mdclient-configuration.mddead-letter-queues.mdindex.mdmessage-move-tasks.mdmessage-operations.mdqueue-attributes-tags.mdqueue-management.mdqueue-permissions.md
tile.json

message-move-tasks.mddocs/

Message Move Tasks

Advanced message management for moving messages between queues with progress tracking and cancellation support.

Capabilities

Start Message Move Task

Initiates a task to move messages from one queue to another, typically used for redriving messages from dead letter queues.

class StartMessageMoveTaskCommand {
  constructor(input: StartMessageMoveTaskCommandInput);
}

interface StartMessageMoveTaskCommandInput {
  /** ARN of the source queue */
  SourceArn: string;
  /** ARN of the destination queue (optional, defaults to original source) */
  DestinationArn?: string;
  /** Maximum messages to move per second (1-500) */
  MaxNumberOfMessagesPerSecond?: number;
}

interface StartMessageMoveTaskCommandOutput {
  /** Unique task handle for tracking */
  TaskHandle?: string;
}

Usage Examples:

import { SQSClient, StartMessageMoveTaskCommand } from "@aws-sdk/client-sqs";

const client = new SQSClient({ region: "us-east-1" });

// Move messages from DLQ back to main queue
const moveTask = await client.send(new StartMessageMoveTaskCommand({
  SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ",
  DestinationArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-Main",
  MaxNumberOfMessagesPerSecond: 100
}));

console.log("Move task started with handle:", moveTask.TaskHandle);

// Move all messages from DLQ back to original source (auto-determined)
const autoMoveTask = await client.send(new StartMessageMoveTaskCommand({
  SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ"
  // DestinationArn omitted - SQS determines original source queue
}));

// Throttled move for large volumes
const throttledTask = await client.send(new StartMessageMoveTaskCommand({
  SourceArn: "arn:aws:sqs:us-east-1:123456789012:LargeDLQ", 
  DestinationArn: "arn:aws:sqs:us-east-1:123456789012:ProcessingQueue",
  MaxNumberOfMessagesPerSecond: 10 // Slow, controlled move
}));

Cancel Message Move Task

Stops an active message move task using its task handle.

class CancelMessageMoveTaskCommand {
  constructor(input: CancelMessageMoveTaskCommandInput);
}

interface CancelMessageMoveTaskCommandInput {
  /** Task handle from StartMessageMoveTask */
  TaskHandle: string;
}

interface CancelMessageMoveTaskCommandOutput {
  /** Approximate number of messages moved before cancellation */
  ApproximateNumberOfMessagesMoved?: number;
}

Usage Examples:

// Cancel a running move task
const cancelResult = await client.send(new CancelMessageMoveTaskCommand({
  TaskHandle: moveTask.TaskHandle!
}));

console.log("Task cancelled. Messages moved:", cancelResult.ApproximateNumberOfMessagesMoved);

// Cancel with error handling
try {
  await client.send(new CancelMessageMoveTaskCommand({
    TaskHandle: "invalid-task-handle"
  }));
} catch (error) {
  if (error.name === 'ResourceNotFoundException') {
    console.log("Task not found or already completed");
  }
}

List Message Move Tasks

Retrieves active and recent message move tasks for a source queue.

class ListMessageMoveTasksCommand {
  constructor(input: ListMessageMoveTasksCommandInput);
}

interface ListMessageMoveTasksCommandInput {
  /** ARN of the source queue */
  SourceArn: string;
  /** Maximum results to return */
  MaxResults?: number;
}

interface ListMessageMoveTasksCommandOutput {
  /** Array of move task entries */
  Results?: ListMessageMoveTasksResultEntry[];
}

interface ListMessageMoveTasksResultEntry {
  /** Task handle */
  TaskHandle?: string;
  /** Current task status */
  Status?: string;
  /** Source queue ARN */
  SourceArn?: string;
  /** Destination queue ARN */
  DestinationArn?: string;
  /** Maximum messages per second */
  MaxNumberOfMessagesPerSecond?: number;
  /** Approximate messages moved */
  ApproximateNumberOfMessagesMoved?: number;
  /** Approximate messages to move */
  ApproximateNumberOfMessagesToMove?: number;
  /** Failure reason if task failed */
  FailureReason?: string;
  /** Task start timestamp */
  StartedTimestamp?: number;
}

Usage Examples:

// List all move tasks for a queue
const tasks = await client.send(new ListMessageMoveTasksCommand({
  SourceArn: "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ"
}));

console.log(`Found ${tasks.Results?.length || 0} move tasks`);

for (const task of tasks.Results || []) {
  console.log("Task:", {
    handle: task.TaskHandle,
    status: task.Status,
    moved: task.ApproximateNumberOfMessagesMoved,
    total: task.ApproximateNumberOfMessagesToMove,
    rate: task.MaxNumberOfMessagesPerSecond
  });
}

// Monitor task progress
async function monitorMoveTask(sourceArn: string, taskHandle: string) {
  let completed = false;
  
  while (!completed) {
    const tasks = await client.send(new ListMessageMoveTasksCommand({
      SourceArn: sourceArn
    }));
    
    const currentTask = tasks.Results?.find(t => t.TaskHandle === taskHandle);
    
    if (!currentTask) {
      console.log("Task not found or completed");
      completed = true;
    } else {
      console.log(`Task ${currentTask.Status}: ${currentTask.ApproximateNumberOfMessagesMoved}/${currentTask.ApproximateNumberOfMessagesToMove} messages`);
      
      if (currentTask.Status === 'COMPLETED' || currentTask.Status === 'CANCELLED' || currentTask.Status === 'FAILED') {
        completed = true;
        if (currentTask.FailureReason) {
          console.error("Task failed:", currentTask.FailureReason);
        }
      } else {
        // Wait before checking again
        await new Promise(resolve => setTimeout(resolve, 5000));
      }
    }
  }
}

// Usage
await monitorMoveTask("arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ", moveTask.TaskHandle!);

Advanced Message Move Scenarios

Batch DLQ Recovery

async function recoverFromDLQ(dlqArn: string, targetArn: string, batchSize: number = 1000) {
  // Get DLQ message count first
  const dlqUrl = arnToUrl(dlqArn);
  const attributes = await client.send(new GetQueueAttributesCommand({
    QueueUrl: dlqUrl,
    AttributeNames: ["ApproximateNumberOfMessages"]
  }));
  
  const totalMessages = parseInt(attributes.Attributes?.ApproximateNumberOfMessages || '0');
  console.log(`Recovering ${totalMessages} messages from DLQ`);
  
  if (totalMessages === 0) {
    console.log("No messages to recover");
    return;
  }
  
  // Calculate optimal rate based on batch size
  const optimalRate = Math.min(500, Math.max(10, Math.floor(batchSize / 10)));
  
  const moveTask = await client.send(new StartMessageMoveTaskCommand({
    SourceArn: dlqArn,
    DestinationArn: targetArn,
    MaxNumberOfMessagesPerSecond: optimalRate
  }));
  
  console.log(`Started recovery task: ${moveTask.TaskHandle}`);
  
  // Monitor progress
  return monitorMoveTask(dlqArn, moveTask.TaskHandle!);
}

function arnToUrl(arn: string): string {
  // Convert ARN to SQS URL format
  const parts = arn.split(':');
  const region = parts[3];
  const accountId = parts[4];
  const queueName = parts[5];
  return `https://sqs.${region}.amazonaws.com/${accountId}/${queueName}`;
}

// Usage
await recoverFromDLQ(
  "arn:aws:sqs:us-east-1:123456789012:MyApp-DLQ",
  "arn:aws:sqs:us-east-1:123456789012:MyApp-Main",
  500
);

Selective Message Moving

async function moveFilteredMessages(
  sourceArn: string, 
  targetArn: string,
  filter: (message: any) => boolean
) {
  // For selective moving, we need to manually process messages
  // as move tasks move ALL messages
  
  const sourceUrl = arnToUrl(sourceArn);
  const targetUrl = arnToUrl(targetArn);
  
  let processed = 0;
  let moved = 0;
  
  while (true) {
    const messages = await client.send(new ReceiveMessageCommand({
      QueueUrl: sourceUrl,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 1, // Short poll
      MessageAttributeNames: ["All"]
    }));
    
    if (!messages.Messages || messages.Messages.length === 0) {
      break; // No more messages
    }
    
    const messagesToMove: any[] = [];
    const messagesToDelete: DeleteMessageBatchRequestEntry[] = [];
    
    for (let i = 0; i < messages.Messages.length; i++) {
      const message = messages.Messages[i];
      processed++;
      
      try {
        const messageData = JSON.parse(message.Body || '{}');
        
        if (filter(messageData)) {
          // Send to target queue
          await client.send(new SendMessageCommand({
            QueueUrl: targetUrl,
            MessageBody: message.Body!,
            MessageAttributes: message.MessageAttributes
          }));
          
          moved++;
        }
        
        // Mark for deletion from source
        messagesToDelete.push({
          Id: `del_${i}`,
          ReceiptHandle: message.ReceiptHandle!
        });
        
      } catch (error) {
        console.error(`Error processing message ${message.MessageId}:`, error);
      }
    }
    
    // Delete processed messages from source
    if (messagesToDelete.length > 0) {
      await client.send(new DeleteMessageBatchCommand({
        QueueUrl: sourceUrl,
        Entries: messagesToDelete
      }));
    }
    
    console.log(`Processed: ${processed}, Moved: ${moved}`);
  }
  
  return { processed, moved };
}

// Example: Move only high-priority messages
const result = await moveFilteredMessages(
  "arn:aws:sqs:us-east-1:123456789012:Mixed-DLQ",
  "arn:aws:sqs:us-east-1:123456789012:Priority-Queue",
  (message) => message.priority === 'high'
);

console.log(`Selective move completed: ${result.moved}/${result.processed} messages moved`);

Cross-Region Message Moving

async function moveMessagesCrossRegion(
  sourceArn: string,
  targetRegion: string,
  targetQueueName: string
) {
  // Message move tasks don't support cross-region moves
  // Must manually copy messages
  
  const sourceUrl = arnToUrl(sourceArn);
  const sourceRegion = sourceArn.split(':')[3];
  
  // Create clients for each region
  const sourceClient = new SQSClient({ region: sourceRegion });
  const targetClient = new SQSClient({ region: targetRegion });
  
  // Get target queue URL
  const targetUrlResult = await targetClient.send(new GetQueueUrlCommand({
    QueueName: targetQueueName
  }));
  const targetUrl = targetUrlResult.QueueUrl!;
  
  let totalMoved = 0;
  
  while (true) {
    // Receive from source region
    const messages = await sourceClient.send(new ReceiveMessageCommand({
      QueueUrl: sourceUrl,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 1
    }));
    
    if (!messages.Messages || messages.Messages.length === 0) {
      break;
    }
    
    // Send to target region
    const sendEntries: SendMessageBatchRequestEntry[] = [];
    const deleteEntries: DeleteMessageBatchRequestEntry[] = [];
    
    for (let i = 0; i < messages.Messages.length; i++) {
      const message = messages.Messages[i];
      
      sendEntries.push({
        Id: `send_${i}`,
        MessageBody: message.Body!,
        MessageAttributes: message.MessageAttributes
      });
      
      deleteEntries.push({
        Id: `del_${i}`,
        ReceiptHandle: message.ReceiptHandle!
      });
    }
    
    // Send batch to target
    await targetClient.send(new SendMessageBatchCommand({
      QueueUrl: targetUrl,
      Entries: sendEntries
    }));
    
    // Delete from source
    await sourceClient.send(new DeleteMessageBatchCommand({
      QueueUrl: sourceUrl,
      Entries: deleteEntries
    }));
    
    totalMoved += messages.Messages.length;
    console.log(`Moved ${totalMoved} messages to ${targetRegion}`);
  }
  
  return totalMoved;
}

Error Handling

Common errors for message move operations:

  • ResourceNotFoundException - Task handle not found or queue doesn't exist
  • UnsupportedOperation - Operation not supported for queue type
  • InvalidAddress - Invalid queue ARN format
  • RequestThrottled - Too many concurrent move tasks
try {
  const moveTask = await client.send(new StartMessageMoveTaskCommand({
    SourceArn: "arn:aws:sqs:us-east-1:123456789012:NonExistentQueue"
  }));
} catch (error) {
  if (error.name === 'ResourceNotFoundException') {
    console.error("Source queue not found");
  } else if (error.name === 'UnsupportedOperation') {
    console.error("Move operation not supported for this queue type");
  } else if (error.name === 'RequestThrottled') {
    console.error("Too many concurrent move tasks, try again later");
  }
}

Best Practices

Move Task Management

// Track and manage multiple move tasks
class MoveTaskManager {
  private activeTasks = new Map<string, {
    handle: string;
    sourceArn: string;
    startTime: Date;
  }>();
  
  async startMove(sourceArn: string, destinationArn?: string, rate?: number) {
    const result = await client.send(new StartMessageMoveTaskCommand({
      SourceArn: sourceArn,
      DestinationArn: destinationArn,
      MaxNumberOfMessagesPerSecond: rate
    }));
    
    this.activeTasks.set(result.TaskHandle!, {
      handle: result.TaskHandle!,
      sourceArn,
      startTime: new Date()
    });
    
    return result.TaskHandle!;
  }
  
  async cancelAll() {
    const promises = Array.from(this.activeTasks.values()).map(task =>
      client.send(new CancelMessageMoveTaskCommand({
        TaskHandle: task.handle
      }))
    );
    
    await Promise.allSettled(promises);
    this.activeTasks.clear();
  }
  
  async getStatus() {
    const status = new Map();
    
    for (const [handle, task] of this.activeTasks) {
      const tasks = await client.send(new ListMessageMoveTasksCommand({
        SourceArn: task.sourceArn
      }));
      
      const currentTask = tasks.Results?.find(t => t.TaskHandle === handle);
      if (currentTask) {
        status.set(handle, currentTask);
      } else {
        this.activeTasks.delete(handle); // Task completed
      }
    }
    
    return status;
  }
}