The batch processing package for the Powertools for AWS Lambda (TypeScript) library.
npx @tessl/cli install tessl/npm-aws-lambda-powertools--batch@2.29.0Handles partial failures when processing batches from SQS, Kinesis Data Streams, and DynamoDB Streams. Only failed records return to the queue/stream for retry.
Package: @aws-lambda-powertools/batch@2.29.0 | License: MIT-0
// Install
npm install @aws-lambda-powertools/batch
// Core imports
import {
BatchProcessor,
BatchProcessorSync,
EventType,
processPartialResponse,
processPartialResponseSync,
} from '@aws-lambda-powertools/batch';
// Parser (optional)
import { parser } from '@aws-lambda-powertools/batch/parser';
// Types
import type { ParsedRecord } from '@aws-lambda-powertools/batch';const processor = new BatchProcessor(EventType.SQS);
const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = JSON.parse(record.body);
// Process record - throw on error to mark for retry
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });Async processor for SQS, Kinesis, and DynamoDB batches with parallel/sequential execution.
class BatchProcessor extends BasePartialBatchProcessor {
constructor(
eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
parserConfig?: BatchProcessorConfig
);
}Options: processInParallel (default: true), throwOnFullBatchFailure (default: true)
Sync processor for sequential processing. Deprecated - use BatchProcessor with processInParallel: false.
class BatchProcessorSync extends BasePartialBatchProcessor {
constructor(
eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
parserConfig?: BatchProcessorConfig
);
}Main processing function - handles registration, execution, and response formatting.
function processPartialResponse<T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: T,
options?: BatchProcessingOptions<T>
): Promise<PartialItemFailureResponse>;Options:
context?: Context - Lambda contextprocessInParallel?: boolean - Parallel execution (default: true, not for FIFO)skipGroupOnError?: boolean - FIFO only: continue other message groupsthrowOnFullBatchFailure?: boolean - Throw if all fail (default: true)const EventType = {
SQS: 'SQS',
KinesisDataStreams: 'KinesisDataStreams',
DynamoDBStreams: 'DynamoDBStreams',
} as const;const processor = new BatchProcessor(EventType.KinesisDataStreams);
const recordHandler = async (record: KinesisStreamRecord) => {
const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString());
// Process data
};
export const handler: KinesisStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });const processor = new BatchProcessor(EventType.DynamoDBStreams);
const recordHandler = async (record: DynamoDBRecord) => {
if (record.dynamodb?.NewImage) {
// Process new image
}
};
export const handler: DynamoDBStreamHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
processInParallel: false,
});FIFO-specific processors with message group tracking and short-circuit behavior.
Schema validation with Zod and Standard Schema-compatible libraries.
Comprehensive error types: FullBatchFailureError, ParsingError, SqsFifoShortCircuitError, etc.
TypeScript types for configuration and responses.
const recordHandler = async (record: SQSRecord) => {
const payload = JSON.parse(record.body);
if (!payload.id) throw new Error('Missing id');
// Process
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
throwOnFullBatchFailure: false,
});export const handler: SQSHandler = async (event, context) => {
const response = await processPartialResponse(event, recordHandler, processor, { context });
console.log(`Processed: ${processor.records.length} total, ${processor.successMessages.length} succeeded, ${processor.failureMessages.length} failed`);
return response;
};type PartialItemFailureResponse = {
batchItemFailures: Array<{ itemIdentifier: string }>;
};Item Identifiers:
messageIdsequenceNumberSequenceNumber