type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
context?: Context;
skipGroupOnError?: boolean; // FIFO only
throwOnFullBatchFailure?: boolean; // default: true
processInParallel?: boolean; // default: true, not for FIFO
};type BatchProcessorConfig =
| {
parser: typeof parser;
schema: StandardSchemaV1;
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
}
| {
parser: typeof parser;
innerSchema: StandardSchemaV1;
transformer?: 'json' | 'base64' | 'unmarshall';
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
};type PartialItemFailureResponse = {
batchItemFailures: PartialItemFailures[];
};
type PartialItemFailures = {
itemIdentifier: string;
};
type SuccessResponse = ['success', unknown, EventSourceDataClassTypes];
type FailureResponse = ['fail', string, EventSourceDataClassTypes];type EventSourceDataClassTypes =
| SQSRecord
| KinesisStreamRecord
| DynamoDBRecord;
type BaseRecord = { [key: string]: unknown } | EventSourceDataClassTypes;Utility type for typed record handlers with custom payload schemas.
type ParsedRecord<TRecord, TPayload, TOldPayload = TPayload> =
TRecord extends { body: string }
? Omit<TRecord, 'body'> & { body: TPayload }
: TRecord extends { kinesis: { data: string } }
? Omit<TRecord, 'kinesis'> & {
kinesis: Omit<TRecord['kinesis'], 'data'> & { data: TPayload };
}
: TRecord extends { dynamodb?: StreamRecord }
? Omit<TRecord, 'dynamodb'> & {
dynamodb: Omit<StreamRecord, 'NewImage' | 'OldImage'> & {
NewImage: TPayload;
OldImage: TOldPayload;
};
}
: TRecord;processPartialResponse(event, recordHandler, processor, {
context,
processInParallel: false,
throwOnFullBatchFailure: false,
});import type { ParsedRecord } from '@aws-lambda-powertools/batch';
import type { SQSRecord } from 'aws-lambda';
import { z } from 'zod';
const mySchema = z.object({ name: z.string(), age: z.number() });
type MyPayload = z.infer<typeof mySchema>;
type MySqsRecord = ParsedRecord<SQSRecord, MyPayload>;
const recordHandler = async (record: MySqsRecord) => {
console.log(record.body.name, record.body.age);
};const kinesisSchema = z.object({ userId: z.string(), action: z.string() });
type MyKinesisRecord = ParsedRecord<KinesisStreamRecord, z.infer<typeof kinesisSchema>>;
const recordHandler = async (record: MyKinesisRecord) => {
console.log(record.kinesis.data.userId, record.kinesis.data.action);
};const newSchema = z.object({ id: z.string(), status: z.string(), updatedAt: z.string() });
const oldSchema = z.object({ id: z.string(), status: z.string() });
type MyDynamoRecord = ParsedRecord<
DynamoDBRecord,
z.infer<typeof newSchema>,
z.infer<typeof oldSchema>
>;
const recordHandler = async (record: MyDynamoRecord) => {
console.log(record.dynamodb.NewImage.updatedAt);
};Create custom type guards for event sources:
function isSQSRecord(record: EventSourceDataClassTypes): record is SQSRecord {
return 'messageId' in record;
}
function isKinesisRecord(record: EventSourceDataClassTypes): record is KinesisStreamRecord {
return 'kinesis' in record && 'sequenceNumber' in record.kinesis;
}
function isDynamoDBRecord(record: EventSourceDataClassTypes): record is DynamoDBRecord {
return 'dynamodb' in record && 'SequenceNumber' in record.dynamodb!;
}TypeScript infers types automatically with Zod:
const mySchema = z.object({
userId: z.string(),
action: z.enum(['create', 'update', 'delete']),
timestamp: z.number(),
});
const processor = new BatchProcessor(EventType.SQS, {
parser,
innerSchema: mySchema,
transformer: 'json',
});
const recordHandler = async (record) => {
// record.body is inferred as: { userId: string, action: 'create' | 'update' | 'delete', timestamp: number }
console.log(record.body.userId, record.body.action, record.body.timestamp);
};