Modern async iterator patterns for consuming event streams with for await loops and manual iteration control.
Get an async iterator that buffers event data for consumption with modern iteration patterns.
/**
* Get an async iterator which buffers data each time an event is emitted
* @param eventName - Single event name or array of event names to iterate over
* @returns AsyncIterableIterator that yields event data
*/
events<Name extends keyof EventData>(
eventName: Name | readonly Name[]
): AsyncIterableIterator<EventData[Name]>;Usage Examples:
import Emittery from "emittery";
const emitter = new Emittery();
// Single event iteration with for-await
const eventIterator = emitter.events('message');
// Consume events as they arrive
for await (const message of eventIterator) {
console.log('Received message:', message);
if (message.type === 'quit') {
break; // This automatically unsubscribes
}
}
// Multiple event iteration
const activityIterator = emitter.events(['user-login', 'user-logout']);
for await (const activity of activityIterator) {
console.log('User activity:', activity);
}
// Manual iteration control
const dataIterator = emitter.events('data-chunk');
const firstChunk = await dataIterator.next();
if (!firstChunk.done) {
console.log('First chunk:', firstChunk.value);
}
// Clean up when done
await dataIterator.return();Get an async iterator for all events emitted by the instance, yielding tuples of event names and data.
/**
* Get an async iterator which buffers tuples of event name and data for any event
* @returns AsyncIterableIterator that yields [eventName, eventData] tuples
*/
anyEvent(): AsyncIterableIterator<[keyof EventData, EventData[keyof EventData]]>;Usage Examples:
import Emittery from "emittery";
const emitter = new Emittery();
// Monitor all events
const allEventsIterator = emitter.anyEvent();
for await (const [eventName, eventData] of allEventsIterator) {
console.log(`Event "${eventName}":`, eventData);
// Stop monitoring after specific event
if (eventName === 'shutdown') {
break;
}
}
// Manual control with any events
const monitor = emitter.anyEvent();
// Get first event
const firstEvent = await monitor.next();
if (!firstEvent.done) {
const [name, data] = firstEvent.value;
console.log(`First event was "${name}":`, data);
}
// Clean up
await monitor.return();All async iterators support manual control through the standard AsyncIterator interface:
interface AsyncIterator<T> {
next(): Promise<IteratorResult<T>>;
return?(value?: any): Promise<IteratorResult<T>>;
}
interface IteratorResult<T> {
done: boolean;
value: T;
}Usage Examples:
import Emittery from "emittery";
const emitter = new Emittery();
// Manual iteration control
const iterator = emitter.events('task-complete');
// Start some async operations that will emit events
startAsyncTasks();
// Process exactly 3 events
for (let i = 0; i < 3; i++) {
const result = await iterator.next();
if (result.done) {
break;
}
console.log(`Task ${i + 1} completed:`, result.value);
}
// Clean up subscription
await iterator.return();
// Conditional iteration
const dataIterator = emitter.events('data');
let result = await dataIterator.next();
while (!result.done) {
const data = result.value;
if (data.isComplete) {
console.log('Processing complete');
break;
}
console.log('Processing data chunk:', data);
result = await dataIterator.next();
}
// Always clean up
await dataIterator.return();Event iterators buffer events that are emitted before the iterator is consumed:
import Emittery from "emittery";
const emitter = new Emittery();
const iterator = emitter.events('buffered-event');
// Events emitted before iteration are buffered
emitter.emit('buffered-event', 'data1');
emitter.emit('buffered-event', 'data2');
emitter.emit('buffered-event', 'data3');
// All buffered events are available when we start iterating
const first = await iterator.next(); // {done: false, value: 'data1'}
const second = await iterator.next(); // {done: false, value: 'data2'}
const third = await iterator.next(); // {done: false, value: 'data3'}
await iterator.return();import Emittery from "emittery";
const emitter = new Emittery();
async function processEventStream() {
const events = emitter.events('process-item');
for await (const item of events) {
try {
const processed = await processItem(item);
await emitter.emit('item-processed', processed);
} catch (error) {
await emitter.emit('processing-error', {item, error});
}
}
}
// Start processing
processEventStream();
// Feed items to process
await emitter.emit('process-item', {id: 1, data: 'test'});
await emitter.emit('process-item', {id: 2, data: 'test2'});import Emittery from "emittery";
const emitter = new Emittery();
async function logAllEvents() {
const allEvents = emitter.anyEvent();
for await (const [eventName, eventData] of allEvents) {
const timestamp = new Date().toISOString();
console.log(`[${timestamp}] ${eventName}:`, eventData);
}
}
// Start logging
logAllEvents();import Emittery from "emittery";
const emitter = new Emittery();
async function batchEvents(batchSize = 10) {
const events = emitter.events('batch-item');
const batch = [];
for await (const item of events) {
batch.push(item);
if (batch.length >= batchSize) {
await processBatch([...batch]);
batch.length = 0;
}
}
// Process remaining items
if (batch.length > 0) {
await processBatch(batch);
}
}Always ensure iterators are properly cleaned up to prevent memory leaks:
import Emittery from "emittery";
const emitter = new Emittery();
async function monitorEvents() {
const iterator = emitter.events('monitor');
try {
for await (const event of iterator) {
await processEvent(event);
if (shouldStop(event)) {
break; // Automatically calls return()
}
}
} catch (error) {
console.error('Event processing error:', error);
} finally {
// Ensure cleanup even if loop breaks early
await iterator.return();
}
}
// Or with timeout
async function monitorWithTimeout(timeoutMs = 30000) {
const iterator = emitter.events('data');
const timeout = setTimeout(() => iterator.return(), timeoutMs);
try {
for await (const data of iterator) {
await processData(data);
}
} finally {
clearTimeout(timeout);
}
}