or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

errors.mdfifo.mdindex.mdparser.mdtypes.md
tile.json

parser.mddocs/

Parser Integration

Schema validation and type safety using Standard Schema-compatible libraries (especially Zod).

Configuration

type BatchProcessorConfig =
  | {
      parser: typeof parser;
      schema: StandardSchemaV1;
      logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
    }
  | {
      parser: typeof parser;
      innerSchema: StandardSchemaV1;
      transformer?: 'json' | 'base64' | 'unmarshall';
      logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
    };

schema: Complete event schema (entire record structure). innerSchema: Payload-only schema (Zod only). Use with transformer.

Transformers

  • json: Parse JSON-stringified payloads (SQS)
  • base64: Decode Base64-encoded payloads (Kinesis)
  • unmarshall: Unmarshall DynamoDB data format

Usage Patterns

With innerSchema (Recommended)

import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import { z } from 'zod';

const payloadSchema = z.object({
  name: z.string(),
  age: z.number(),
});

const processor = new BatchProcessor(EventType.SQS, {
  parser,
  innerSchema: payloadSchema,
  transformer: 'json',
});

const recordHandler = async (record) => {
  // record.body is typed as { name: string, age: number }
  console.log(record.body.name, record.body.age);
};

export const handler: SQSHandler = async (event, context) =>
  processPartialResponse(event, recordHandler, processor, { context });

With Complete Schema

import { SqsRecordSchema } from '@aws-lambda-powertools/parser/schemas/sqs';

const payloadSchema = z.object({ name: z.string() });

const processor = new BatchProcessor(EventType.SQS, {
  parser,
  schema: SqsRecordSchema.extend({ body: payloadSchema }),
});

Kinesis with Base64

const eventSchema = z.object({
  eventType: z.string(),
  timestamp: z.number(),
});

const processor = new BatchProcessor(EventType.KinesisDataStreams, {
  parser,
  innerSchema: eventSchema,
  transformer: 'base64',
});

const recordHandler = async (record) => {
  // record.kinesis.data is decoded and typed
  console.log(record.kinesis.data.eventType);
};

DynamoDB with Unmarshaller

const itemSchema = z.object({
  id: z.string(),
  status: z.string(),
});

const processor = new BatchProcessor(EventType.DynamoDBStreams, {
  parser,
  innerSchema: itemSchema,
  transformer: 'unmarshall',
});

const recordHandler = async (record) => {
  // record.dynamodb.NewImage and OldImage are unmarshalled and typed
  if (record.dynamodb.NewImage) {
    console.log(record.dynamodb.NewImage.id);
  }
};

Type Safety

ParsedRecord Type

type ParsedRecord<TRecord, TPayload, TOldPayload = TPayload>;

Create typed record handlers:

import type { ParsedRecord } from '@aws-lambda-powertools/batch';

const mySchema = z.object({ name: z.string(), age: z.number() });
type MyPayload = z.infer<typeof mySchema>;
type MySqsRecord = ParsedRecord<SQSRecord, MyPayload>;

const recordHandler = async (record: MySqsRecord) => {
  console.log(record.body.name, record.body.age);
};

DynamoDB with Different OldImage

const newSchema = z.object({ id: z.string(), status: z.string(), updatedAt: z.string() });
const oldSchema = z.object({ id: z.string(), status: z.string() });

type MyDynamoRecord = ParsedRecord<
  DynamoDBRecord,
  z.infer<typeof newSchema>,
  z.infer<typeof oldSchema>
>;

Error Handling

Schema validation failures throw ParsingError:

class ParsingError extends BatchProcessingError {
  constructor(message: string);
}

The record is marked as failed and will retry according to event source mapping configuration.

Supported Libraries

For schema: Zod or any Standard Schema-compatible library.

For innerSchema: Zod only (required for transformer support).