or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-iteration.mddebug-system.mdevent-emission.mdevent-subscription.mdindex.mdutilities.md
tile.json

async-iteration.mddocs/

Async Iteration

Modern async iterator patterns for consuming event streams with for await loops and manual iteration control.

Capabilities

Event Stream Iteration

Get an async iterator that buffers event data for consumption with modern iteration patterns.

/**
 * Get an async iterator which buffers data each time an event is emitted
 * @param eventName - Single event name or array of event names to iterate over
 * @returns AsyncIterableIterator that yields event data
 */
events<Name extends keyof EventData>(
  eventName: Name | readonly Name[]
): AsyncIterableIterator<EventData[Name]>;

Usage Examples:

import Emittery from "emittery";

const emitter = new Emittery();

// Single event iteration with for-await
const eventIterator = emitter.events('message');

// Consume events as they arrive
for await (const message of eventIterator) {
  console.log('Received message:', message);
  if (message.type === 'quit') {
    break; // This automatically unsubscribes
  }
}

// Multiple event iteration
const activityIterator = emitter.events(['user-login', 'user-logout']);
for await (const activity of activityIterator) {
  console.log('User activity:', activity);
}

// Manual iteration control
const dataIterator = emitter.events('data-chunk');
const firstChunk = await dataIterator.next();
if (!firstChunk.done) {
  console.log('First chunk:', firstChunk.value);
}

// Clean up when done
await dataIterator.return();

Any Event Stream Iteration

Get an async iterator for all events emitted by the instance, yielding tuples of event names and data.

/**
 * Get an async iterator which buffers tuples of event name and data for any event
 * @returns AsyncIterableIterator that yields [eventName, eventData] tuples
 */
anyEvent(): AsyncIterableIterator<[keyof EventData, EventData[keyof EventData]]>;

Usage Examples:

import Emittery from "emittery";

const emitter = new Emittery();

// Monitor all events
const allEventsIterator = emitter.anyEvent();

for await (const [eventName, eventData] of allEventsIterator) {
  console.log(`Event "${eventName}":`, eventData);
  
  // Stop monitoring after specific event
  if (eventName === 'shutdown') {
    break;
  }
}

// Manual control with any events
const monitor = emitter.anyEvent();

// Get first event
const firstEvent = await monitor.next();
if (!firstEvent.done) {
  const [name, data] = firstEvent.value;
  console.log(`First event was "${name}":`, data);
}

// Clean up
await monitor.return();

Iterator Control

All async iterators support manual control through the standard AsyncIterator interface:

interface AsyncIterator<T> {
  next(): Promise<IteratorResult<T>>;
  return?(value?: any): Promise<IteratorResult<T>>;
}

interface IteratorResult<T> {
  done: boolean;
  value: T;
}

Usage Examples:

import Emittery from "emittery";

const emitter = new Emittery();

// Manual iteration control
const iterator = emitter.events('task-complete');

// Start some async operations that will emit events
startAsyncTasks();

// Process exactly 3 events
for (let i = 0; i < 3; i++) {
  const result = await iterator.next();
  if (result.done) {
    break;
  }
  console.log(`Task ${i + 1} completed:`, result.value);
}

// Clean up subscription
await iterator.return();

// Conditional iteration
const dataIterator = emitter.events('data');
let result = await dataIterator.next();

while (!result.done) {
  const data = result.value;
  
  if (data.isComplete) {
    console.log('Processing complete');
    break;
  }
  
  console.log('Processing data chunk:', data);
  result = await dataIterator.next();
}

// Always clean up
await dataIterator.return();

Buffering Behavior

Event iterators buffer events that are emitted before the iterator is consumed:

import Emittery from "emittery";

const emitter = new Emittery();
const iterator = emitter.events('buffered-event');

// Events emitted before iteration are buffered
emitter.emit('buffered-event', 'data1');
emitter.emit('buffered-event', 'data2');
emitter.emit('buffered-event', 'data3');

// All buffered events are available when we start iterating
const first = await iterator.next(); // {done: false, value: 'data1'}
const second = await iterator.next(); // {done: false, value: 'data2'}
const third = await iterator.next(); // {done: false, value: 'data3'}

await iterator.return();

Practical Patterns

Event Processing Pipeline

import Emittery from "emittery";

const emitter = new Emittery();

async function processEventStream() {
  const events = emitter.events('process-item');
  
  for await (const item of events) {
    try {
      const processed = await processItem(item);
      await emitter.emit('item-processed', processed);
    } catch (error) {
      await emitter.emit('processing-error', {item, error});
    }
  }
}

// Start processing
processEventStream();

// Feed items to process
await emitter.emit('process-item', {id: 1, data: 'test'});
await emitter.emit('process-item', {id: 2, data: 'test2'});

Event Logging

import Emittery from "emittery";

const emitter = new Emittery();

async function logAllEvents() {
  const allEvents = emitter.anyEvent();
  
  for await (const [eventName, eventData] of allEvents) {
    const timestamp = new Date().toISOString();
    console.log(`[${timestamp}] ${eventName}:`, eventData);
  }
}

// Start logging
logAllEvents();

Event Batching

import Emittery from "emittery";

const emitter = new Emittery();

async function batchEvents(batchSize = 10) {
  const events = emitter.events('batch-item');
  const batch = [];
  
  for await (const item of events) {
    batch.push(item);
    
    if (batch.length >= batchSize) {
      await processBatch([...batch]);
      batch.length = 0;
    }
  }
  
  // Process remaining items
  if (batch.length > 0) {
    await processBatch(batch);
  }
}

Cleanup and Resource Management

Always ensure iterators are properly cleaned up to prevent memory leaks:

import Emittery from "emittery";

const emitter = new Emittery();

async function monitorEvents() {
  const iterator = emitter.events('monitor');
  
  try {
    for await (const event of iterator) {
      await processEvent(event);
      
      if (shouldStop(event)) {
        break; // Automatically calls return()
      }
    }
  } catch (error) {
    console.error('Event processing error:', error);
  } finally {
    // Ensure cleanup even if loop breaks early
    await iterator.return();
  }
}

// Or with timeout
async function monitorWithTimeout(timeoutMs = 30000) {
  const iterator = emitter.events('data');
  const timeout = setTimeout(() => iterator.return(), timeoutMs);
  
  try {
    for await (const data of iterator) {
      await processData(data);
    }
  } finally {
    clearTimeout(timeout);
  }
}