or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

errors.mdfifo.mdindex.mdparser.mdtypes.md
tile.json

tessl/npm-aws-lambda-powertools--batch

The batch processing package for the Powertools for AWS Lambda (TypeScript) library.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@aws-lambda-powertools/batch@2.29.x

To install, run

npx @tessl/cli install tessl/npm-aws-lambda-powertools--batch@2.29.0

index.mddocs/

AWS Lambda Powertools Batch Processing

Handles partial failures when processing batches from SQS, Kinesis Data Streams, and DynamoDB Streams. Only failed records return to the queue/stream for retry.

Package: @aws-lambda-powertools/batch@2.29.0 | License: MIT-0

Installation & Imports

// Install
npm install @aws-lambda-powertools/batch

// Core imports
import {
  BatchProcessor,
  BatchProcessorSync,
  EventType,
  processPartialResponse,
  processPartialResponseSync,
} from '@aws-lambda-powertools/batch';

// Parser (optional)
import { parser } from '@aws-lambda-powertools/batch/parser';

// Types
import type { ParsedRecord } from '@aws-lambda-powertools/batch';

Quick Start

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<void> => {
  const payload = JSON.parse(record.body);
  // Process record - throw on error to mark for retry
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

Core APIs

BatchProcessor

Async processor for SQS, Kinesis, and DynamoDB batches with parallel/sequential execution.

class BatchProcessor extends BasePartialBatchProcessor {
  constructor(
    eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
    parserConfig?: BatchProcessorConfig
  );
}

Options: processInParallel (default: true), throwOnFullBatchFailure (default: true)

BatchProcessorSync

Sync processor for sequential processing. Deprecated - use BatchProcessor with processInParallel: false.

class BatchProcessorSync extends BasePartialBatchProcessor {
  constructor(
    eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
    parserConfig?: BatchProcessorConfig
  );
}

processPartialResponse

Main processing function - handles registration, execution, and response formatting.

function processPartialResponse<T extends BasePartialBatchProcessor>(
  event: { Records: BaseRecord[] },
  recordHandler: CallableFunction,
  processor: T,
  options?: BatchProcessingOptions<T>
): Promise<PartialItemFailureResponse>;

Options:

  • context?: Context - Lambda context
  • processInParallel?: boolean - Parallel execution (default: true, not for FIFO)
  • skipGroupOnError?: boolean - FIFO only: continue other message groups
  • throwOnFullBatchFailure?: boolean - Throw if all fail (default: true)

Event Types

const EventType = {
  SQS: 'SQS',
  KinesisDataStreams: 'KinesisDataStreams',
  DynamoDBStreams: 'DynamoDBStreams',
} as const;

Usage Patterns

Kinesis Data Streams

const processor = new BatchProcessor(EventType.KinesisDataStreams);

const recordHandler = async (record: KinesisStreamRecord) => {
  const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString());
  // Process data
};

export const handler: KinesisStreamHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

DynamoDB Streams

const processor = new BatchProcessor(EventType.DynamoDBStreams);

const recordHandler = async (record: DynamoDBRecord) => {
  if (record.dynamodb?.NewImage) {
    // Process new image
  }
};

export const handler: DynamoDBStreamHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

Sequential Processing

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, {
    context,
    processInParallel: false,
  });

Advanced Features

SQS FIFO Processing

FIFO-specific processors with message group tracking and short-circuit behavior.

Parser Integration

Schema validation with Zod and Standard Schema-compatible libraries.

Error Handling

Comprehensive error types: FullBatchFailureError, ParsingError, SqsFifoShortCircuitError, etc.

Type Definitions

TypeScript types for configuration and responses.

Common Patterns

With Custom Error Handling

const recordHandler = async (record: SQSRecord) => {
  const payload = JSON.parse(record.body);
  if (!payload.id) throw new Error('Missing id');
  // Process
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, {
    context,
    throwOnFullBatchFailure: false,
  });

Accessing Processor State

export const handler: SQSHandler = async (event, context) => {
  const response = await processPartialResponse(event, recordHandler, processor, { context });

  console.log(`Processed: ${processor.records.length} total, ${processor.successMessages.length} succeeded, ${processor.failureMessages.length} failed`);

  return response;
};

Response Format

type PartialItemFailureResponse = {
  batchItemFailures: Array<{ itemIdentifier: string }>;
};

Item Identifiers:

  • SQS: messageId
  • Kinesis: sequenceNumber
  • DynamoDB: SequenceNumber