or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

appsync-events.mdappsync-graphql.mdbedrock-agent.mdhttp-middleware.mdhttp-routing.mdindex.md
tile.json

appsync-events.mddocs/

AppSync Events API

Real-time publish/subscribe event handling for AWS AppSync with automatic channel routing, wildcard patterns, and parallel event processing.

AppSyncEventsResolver

class AppSyncEventsResolver {
  constructor(options?: { logger?: GenericLogger });
  resolve(event, context, options?: ResolveOptions): Promise<void | OnPublishOutput | OnPublishAggregateOutput>;
  onPublish<T extends boolean = false>(path: string, handler: OnPublishHandler<T>, options?: { aggregate?: T }): void;
  onPublish<T extends boolean = false>(path: string, options?: { aggregate?: T }): MethodDecorator;
  onSubscribe(path: string, handler: OnSubscribeHandler): void;
  onSubscribe(path: string): MethodDecorator;
}

Setup

import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';
const app = new AppSyncEventsResolver();
export const handler = async (event, context) => app.resolve(event, context);

Publish Handlers

Route and process events published to channels.

Individual Processing (default: one event at a time)

app.onPublish('/default/chat', async (payload) => {
  const processed = await processMessage(payload);
  return processed;
});

Aggregate Processing (all events at once)

app.onPublish('/default/batch', async (payloads) => {
  const results = [];
  for (const payload of payloads) {
    results.push(await processBatch(payload));
  }
  return results;
}, { aggregate: true });

Wildcard Patterns

app.onPublish('/default/*', async (payload) => {
  console.log('Received event on any /default channel');
  return payload;
});

Decorator API

class MyEventHandler {
  resolver = new AppSyncEventsResolver();
  @resolver.onPublish('/default/notifications')
  async handleNotification(payload: any) {
    return { ...payload, processed: true };
  }
  handler = async (event, context) => this.resolver.resolve(event, context, { scope: this });
}
export const handler = new MyEventHandler().handler;

Subscribe Handlers

Validate or authorize subscription requests. Throw UnauthorizedException to reject.

import { UnauthorizedException } from '@aws-lambda-powertools/event-handler/appsync-events';

app.onSubscribe('/private/messages', async (event) => {
  const userId = event.identity?.sub;
  if (!userId) throw new UnauthorizedException('Authentication required');
  const hasAccess = await checkUserAccess(userId);
  if (!hasAccess) throw new UnauthorizedException('Access denied');
});

app.onSubscribe('/public/announcements', async (event) => {
  // No validation - allow all
});

Error Handling

Individual Event Processing (errors included in response)

app.onPublish('/default/process', async (payload) => {
  if (!payload.valid) throw new Error('Invalid payload');
  return payload;
});

// Response includes errors per event:
// {
//   events: [
//     { id: '1', payload: {...} },
//     { id: '2', error: 'Invalid payload' },
//     { id: '3', payload: {...} }
//   ]
// }

Aggregate Processing (errors reject entire batch)

app.onPublish('/default/batch', async (payloads) => {
  // If this throws, all events fail
  return await processAllAtOnce(payloads);
}, { aggregate: true });

Subscription Rejection

app.onSubscribe('/secure/channel', async (event) => {
  if (!authorized) {
    throw new UnauthorizedException('Not authorized'); // Client receives 401/403
  }
});

Types

interface AppSyncEventsEvent {
  identity: {
    sub?: string;
    issuer?: string;
    username?: string;
    claims?: Record<string, unknown>;
    sourceIp?: string[];
    defaultAuthStrategy?: string;
  };
  request: { headers: Record<string, string> };
  info: { channel: string; namespace: string };
}

interface AppSyncEventsPublishEvent extends AppSyncEventsEvent {
  events: Array<{ id: string; payload: unknown }>;
}

interface AppSyncEventsSubscribeEvent extends AppSyncEventsEvent {
  events: null;
}

type OnPublishHandler<T extends boolean = false> = T extends true ? OnPublishHandlerAggregateFn : OnPublishHandlerFn;

type OnPublishHandlerFn = (
  payload: unknown,
  options: { event: AppSyncEventsPublishEvent; context: Context }
) => Promise<unknown> | unknown;

type OnPublishHandlerAggregateFn = (
  payloads: unknown[],
  options: { event: AppSyncEventsPublishEvent; context: Context }
) => Promise<unknown[]> | unknown[];

type OnSubscribeHandler = (
  event: AppSyncEventsSubscribeEvent,
  context: Context
) => Promise<unknown> | unknown;

interface OnPublishOutput<T = unknown> {
  events: Array<{
    id: string;
    payload?: T;
    error?: string;
  }>;
}

type OnPublishAggregateOutput<T = unknown> = OnPublishOutput<T>;

interface ResolveOptions {
  scope?: unknown; // For decorator binding
}

class UnauthorizedException extends Error {
  constructor(message: string, options?: ErrorOptions);
}