The batch processing package for the Powertools for AWS Lambda (TypeScript) library.
Handles partial failures when processing batches from SQS, Kinesis Data Streams, and DynamoDB Streams. Only failed records return to the queue/stream for retry.
Package: @aws-lambda-powertools/batch@2.29.0 | License: MIT-0
// Install
npm install @aws-lambda-powertools/batch
// Core imports
import {
BatchProcessor,
BatchProcessorSync,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
// Parser (optional)
import { parser } from '@aws-lambda-powertools/batch/parser';
// Types
import type { ParsedRecord } from '@aws-lambda-powertools/batch';const processor = new BatchProcessor(EventType.SQS);
const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = JSON.parse(record.body);
// Process record - throw on error to mark for retry
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });Async processor for SQS, Kinesis, and DynamoDB batches with parallel/sequential execution.
class BatchProcessor extends BasePartialBatchProcessor {
constructor(
eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
parserConfig?: BatchProcessorConfig
);
}Options: processInParallel (default: true), throwOnFullBatchFailure (default: true)
Sync processor for sequential processing. Deprecated - use BatchProcessor with processInParallel: false.
class BatchProcessorSync extends BasePartialBatchProcessor {
constructor(
eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
parserConfig?: BatchProcessorConfig
);
}Main processing function - handles registration, execution, and response formatting.
function processPartialResponse<T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: T,
options?: BatchProcessingOptions<T>
): Promise<PartialItemFailureResponse>;Options:
context?: Context - Lambda contextprocessInParallel?: boolean - Parallel execution (default: true, not for FIFO)skipGroupOnError?: boolean - FIFO only: continue other message groupsthrowOnFullBatchFailure?: boolean - Throw if all fail (default: true)const EventType = {
SQS: 'SQS',
KinesisDataStreams: 'KinesisDataStreams',
DynamoDBStreams: 'DynamoDBStreams',
} as const;const processor = new BatchProcessor(EventType.KinesisDataStreams);
const recordHandler = async (record: KinesisStreamRecord) => {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString());
// Process data
};
export const handler: KinesisStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });const processor = new BatchProcessor(EventType.DynamoDBStreams);
const recordHandler = async (record: DynamoDBRecord) => {
if (record.dynamodb?.NewImage) {
// Process new image
}
};
export const handler: DynamoDBStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
processInParallel: false,
});FIFO-specific processors with message group tracking and short-circuit behavior.
Schema validation with Zod and Standard Schema-compatible libraries.
Comprehensive error types: FullBatchFailureError, ParsingError, SqsFifoShortCircuitError, etc.
TypeScript types for configuration and responses.
const recordHandler = async (record: SQSRecord) => {
const payload = JSON.parse(record.body);
if (!payload.id) throw new Error('Missing id');
// Process
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
throwOnFullBatchFailure: false,
});export const handler: SQSHandler = async (event, context) => {
const response = await processPartialResponse(event, recordHandler, processor, { context });
console.log(`Processed: ${processor.records.length} total, ${processor.successMessages.length} succeeded, ${processor.failureMessages.length} failed`);
return response;
};type PartialItemFailureResponse = {
batchItemFailures: Array<{ itemIdentifier: string }>;
};Item Identifiers:
messageIdsequenceNumberSequenceNumberInstall with Tessl CLI
npx tessl i tessl/npm-aws-lambda-powertools--batch