Real-time publish/subscribe event handling for AWS AppSync with automatic channel routing, wildcard patterns, and parallel event processing.
class AppSyncEventsResolver {
constructor(options?: { logger?: GenericLogger });
resolve(event, context, options?: ResolveOptions): Promise<void | OnPublishOutput | OnPublishAggregateOutput>;
onPublish<T extends boolean = false>(path: string, handler: OnPublishHandler<T>, options?: { aggregate?: T }): void;
onPublish<T extends boolean = false>(path: string, options?: { aggregate?: T }): MethodDecorator;
onSubscribe(path: string, handler: OnSubscribeHandler): void;
onSubscribe(path: string): MethodDecorator;
}Setup
import { AppSyncEventsResolver } from '@aws-lambda-powertools/event-handler/appsync-events';
const app = new AppSyncEventsResolver();
export const handler = async (event, context) => app.resolve(event, context);Route and process events published to channels.
Individual Processing (default: one event at a time)
app.onPublish('/default/chat', async (payload) => {
const processed = await processMessage(payload);
return processed;
});Aggregate Processing (all events at once)
app.onPublish('/default/batch', async (payloads) => {
const results = [];
for (const payload of payloads) {
results.push(await processBatch(payload));
}
return results;
}, { aggregate: true });Wildcard Patterns
app.onPublish('/default/*', async (payload) => {
console.log('Received event on any /default channel');
return payload;
});Decorator API
class MyEventHandler {
resolver = new AppSyncEventsResolver();
@resolver.onPublish('/default/notifications')
async handleNotification(payload: any) {
return { ...payload, processed: true };
}
handler = async (event, context) => this.resolver.resolve(event, context, { scope: this });
}
export const handler = new MyEventHandler().handler;Validate or authorize subscription requests. Throw UnauthorizedException to reject.
import { UnauthorizedException } from '@aws-lambda-powertools/event-handler/appsync-events';
app.onSubscribe('/private/messages', async (event) => {
const userId = event.identity?.sub;
if (!userId) throw new UnauthorizedException('Authentication required');
const hasAccess = await checkUserAccess(userId);
if (!hasAccess) throw new UnauthorizedException('Access denied');
});
app.onSubscribe('/public/announcements', async (event) => {
// No validation - allow all
});Individual Event Processing (errors included in response)
app.onPublish('/default/process', async (payload) => {
if (!payload.valid) throw new Error('Invalid payload');
return payload;
});
// Response includes errors per event:
// {
// events: [
// { id: '1', payload: {...} },
// { id: '2', error: 'Invalid payload' },
// { id: '3', payload: {...} }
// ]
// }Aggregate Processing (errors reject entire batch)
app.onPublish('/default/batch', async (payloads) => {
// If this throws, all events fail
return await processAllAtOnce(payloads);
}, { aggregate: true });Subscription Rejection
app.onSubscribe('/secure/channel', async (event) => {
if (!authorized) {
throw new UnauthorizedException('Not authorized'); // Client receives 401/403
}
});interface AppSyncEventsEvent {
identity: {
sub?: string;
issuer?: string;
username?: string;
claims?: Record<string, unknown>;
sourceIp?: string[];
defaultAuthStrategy?: string;
};
request: { headers: Record<string, string> };
info: { channel: string; namespace: string };
}
interface AppSyncEventsPublishEvent extends AppSyncEventsEvent {
events: Array<{ id: string; payload: unknown }>;
}
interface AppSyncEventsSubscribeEvent extends AppSyncEventsEvent {
events: null;
}
type OnPublishHandler<T extends boolean = false> = T extends true ? OnPublishHandlerAggregateFn : OnPublishHandlerFn;
type OnPublishHandlerFn = (
payload: unknown,
options: { event: AppSyncEventsPublishEvent; context: Context }
) => Promise<unknown> | unknown;
type OnPublishHandlerAggregateFn = (
payloads: unknown[],
options: { event: AppSyncEventsPublishEvent; context: Context }
) => Promise<unknown[]> | unknown[];
type OnSubscribeHandler = (
event: AppSyncEventsSubscribeEvent,
context: Context
) => Promise<unknown> | unknown;
interface OnPublishOutput<T = unknown> {
events: Array<{
id: string;
payload?: T;
error?: string;
}>;
}
type OnPublishAggregateOutput<T = unknown> = OnPublishOutput<T>;
interface ResolveOptions {
scope?: unknown; // For decorator binding
}
class UnauthorizedException extends Error {
constructor(message: string, options?: ErrorOptions);
}