Schema validation and type safety using Standard Schema-compatible libraries (especially Zod).
type BatchProcessorConfig =
| {
parser: typeof parser;
schema: StandardSchemaV1;
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
}
| {
parser: typeof parser;
innerSchema: StandardSchemaV1;
transformer?: 'json' | 'base64' | 'unmarshall';
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
};schema: Complete event schema (entire record structure).
innerSchema: Payload-only schema (Zod only). Use with transformer.
json: Parse JSON-stringified payloads (SQS)base64: Decode Base64-encoded payloads (Kinesis)unmarshall: Unmarshall DynamoDB data formatimport { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import { z } from 'zod';
const payloadSchema = z.object({
name: z.string(),
age: z.number(),
});
const processor = new BatchProcessor(EventType.SQS, {
parser,
innerSchema: payloadSchema,
transformer: 'json',
});
const recordHandler = async (record) => {
// record.body is typed as { name: string, age: number }
console.log(record.body.name, record.body.age);
};
export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, { context });import { SqsRecordSchema } from '@aws-lambda-powertools/parser/schemas/sqs';
const payloadSchema = z.object({ name: z.string() });
const processor = new BatchProcessor(EventType.SQS, {
parser,
schema: SqsRecordSchema.extend({ body: payloadSchema }),
});const eventSchema = z.object({
eventType: z.string(),
timestamp: z.number(),
});
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
parser,
innerSchema: eventSchema,
transformer: 'base64',
});
const recordHandler = async (record) => {
// record.kinesis.data is decoded and typed
console.log(record.kinesis.data.eventType);
};const itemSchema = z.object({
id: z.string(),
status: z.string(),
});
const processor = new BatchProcessor(EventType.DynamoDBStreams, {
parser,
innerSchema: itemSchema,
transformer: 'unmarshall',
});
const recordHandler = async (record) => {
// record.dynamodb.NewImage and OldImage are unmarshalled and typed
if (record.dynamodb.NewImage) {
console.log(record.dynamodb.NewImage.id);
}
};type ParsedRecord<TRecord, TPayload, TOldPayload = TPayload>;Create typed record handlers:
import type { ParsedRecord } from '@aws-lambda-powertools/batch';
const mySchema = z.object({ name: z.string(), age: z.number() });
type MyPayload = z.infer<typeof mySchema>;
type MySqsRecord = ParsedRecord<SQSRecord, MyPayload>;
const recordHandler = async (record: MySqsRecord) => {
console.log(record.body.name, record.body.age);
};const newSchema = z.object({ id: z.string(), status: z.string(), updatedAt: z.string() });
const oldSchema = z.object({ id: z.string(), status: z.string() });
type MyDynamoRecord = ParsedRecord<
DynamoDBRecord,
z.infer<typeof newSchema>,
z.infer<typeof oldSchema>
>;Schema validation failures throw ParsingError:
class ParsingError extends BatchProcessingError {
constructor(message: string);
}The record is marked as failed and will retry according to event source mapping configuration.
For schema: Zod or any Standard Schema-compatible library.
For innerSchema: Zod only (required for transformer support).