CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-aws-lambda-powertools--batch

The batch processing package for the Powertools for AWS Lambda (TypeScript) library.

Overview
Eval results
Files

AWS Lambda Powertools Batch Processing

Handles partial failures when processing batches from SQS, Kinesis Data Streams, and DynamoDB Streams. Only failed records return to the queue/stream for retry.

Package: @aws-lambda-powertools/batch@2.29.0 | License: MIT-0

Installation & Imports

// Install
npm install @aws-lambda-powertools/batch

// Core imports
import {
  BatchProcessor,
  BatchProcessorSync,
  EventType,
  processPartialResponse,
  processPartialResponseSync,
} from '@aws-lambda-powertools/batch';

// Parser (optional)
import { parser } from '@aws-lambda-powertools/batch/parser';

// Types
import type { ParsedRecord } from '@aws-lambda-powertools/batch';

Quick Start

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (record: SQSRecord): Promise<void> => {
  const payload = JSON.parse(record.body);
  // Process record - throw on error to mark for retry
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

Core APIs

BatchProcessor

Async processor for SQS, Kinesis, and DynamoDB batches with parallel/sequential execution.

class BatchProcessor extends BasePartialBatchProcessor {
  constructor(
    eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
    parserConfig?: BatchProcessorConfig
  );
}

Options: processInParallel (default: true), throwOnFullBatchFailure (default: true)

BatchProcessorSync

Sync processor for sequential processing. Deprecated - use BatchProcessor with processInParallel: false.

class BatchProcessorSync extends BasePartialBatchProcessor {
  constructor(
    eventType: 'SQS' | 'KinesisDataStreams' | 'DynamoDBStreams',
    parserConfig?: BatchProcessorConfig
  );
}

processPartialResponse

Main processing function - handles registration, execution, and response formatting.

function processPartialResponse<T extends BasePartialBatchProcessor>(
  event: { Records: BaseRecord[] },
  recordHandler: CallableFunction,
  processor: T,
  options?: BatchProcessingOptions<T>
): Promise<PartialItemFailureResponse>;

Options:

  • context?: Context - Lambda context
  • processInParallel?: boolean - Parallel execution (default: true, not for FIFO)
  • skipGroupOnError?: boolean - FIFO only: continue other message groups
  • throwOnFullBatchFailure?: boolean - Throw if all fail (default: true)

Event Types

const EventType = {
  SQS: 'SQS',
  KinesisDataStreams: 'KinesisDataStreams',
  DynamoDBStreams: 'DynamoDBStreams',
} as const;

Usage Patterns

Kinesis Data Streams

const processor = new BatchProcessor(EventType.KinesisDataStreams);

const recordHandler = async (record: KinesisStreamRecord) => {
  const data = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString());
  // Process data
};

export const handler: KinesisStreamHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

DynamoDB Streams

const processor = new BatchProcessor(EventType.DynamoDBStreams);

const recordHandler = async (record: DynamoDBRecord) => {
  if (record.dynamodb?.NewImage) {
    // Process new image
  }
};

export const handler: DynamoDBStreamHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

Sequential Processing

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, {
    context,
    processInParallel: false,
  });

Advanced Features

SQS FIFO Processing

FIFO-specific processors with message group tracking and short-circuit behavior.

Parser Integration

Schema validation with Zod and Standard Schema-compatible libraries.

Error Handling

Comprehensive error types: FullBatchFailureError, ParsingError, SqsFifoShortCircuitError, etc.

Type Definitions

TypeScript types for configuration and responses.

Common Patterns

With Custom Error Handling

const recordHandler = async (record: SQSRecord) => {
  const payload = JSON.parse(record.body);
  if (!payload.id) throw new Error('Missing id');
  // Process
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, {
    context,
    throwOnFullBatchFailure: false,
  });

Accessing Processor State

export const handler: SQSHandler = async (event, context) => {
  const response = await processPartialResponse(event, recordHandler, processor, { context });

  console.log(`Processed: ${processor.records.length} total, ${processor.successMessages.length} succeeded, ${processor.failureMessages.length} failed`);

  return response;
};

Response Format

type PartialItemFailureResponse = {
  batchItemFailures: Array<{ itemIdentifier: string }>;
};

Item Identifiers:

  • SQS: messageId
  • Kinesis: sequenceNumber
  • DynamoDB: SequenceNumber

Install with Tessl CLI

npx tessl i tessl/npm-aws-lambda-powertools--batch
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@aws-lambda-powertools/batch@2.29.x