Type-safe event emitters and event forwarding utilities for building reactive systems with strong TypeScript integration and proper resource management.
EventEmitter with typed events providing compile-time safety for event names and handler signatures.
/**
* Union type handling different event emitter event types across environments
*/
type EventEmitterEventType = string | symbol | number;
/**
* Complex type transform for typed event emitter methods with proper typing
* Maps event names to their handler function signatures
*/
type TypedEventTransform<TEvents> = {
[K in keyof TEvents]: TEvents[K] extends (...args: any[]) => any
? TEvents[K]
: (...args: any[]) => any;
};
/**
* EventEmitter with typed events extending Node.js EventEmitter
* Provides type safety for event names and handler signatures
* Implements IEventProvider interface for Fluid Framework compatibility
*/
class TypedEventEmitter<TEvents> implements IEventProvider<TEvents> {
/**
* Adds an event listener for the specified event
* @param event - Event name (strongly typed)
* @param listener - Event handler function (signature must match event type)
* @returns this (for method chaining)
*/
on<K extends keyof TEvents>(event: K, listener: TEvents[K]): this;
/**
* Adds a one-time event listener
* @param event - Event name (strongly typed)
* @param listener - Event handler function (executed once then removed)
* @returns this (for method chaining)
*/
once<K extends keyof TEvents>(event: K, listener: TEvents[K]): this;
/**
* Removes an event listener
* @param event - Event name (strongly typed)
* @param listener - Event handler function to remove (must be same reference)
* @returns this (for method chaining)
*/
off<K extends keyof TEvents>(event: K, listener: TEvents[K]): this;
/**
* Emits an event with typed arguments
* @param event - Event name (strongly typed)
* @param args - Event arguments (types must match event signature)
* @returns true if event had listeners, false otherwise
*/
emit<K extends keyof TEvents>(
event: K,
...args: Parameters<TEvents[K]>
): boolean;
/**
* Gets all listeners for a specific event
* @param event - Event name (strongly typed)
* @returns Array of listener functions
*/
listeners<K extends keyof TEvents>(event: K): Array<TEvents[K]>;
/**
* Removes all listeners for a specific event or all events
* @param event - Optional event name (omit to remove all listeners)
* @returns this (for method chaining)
*/
removeAllListeners<K extends keyof TEvents>(event?: K): this;
}Usage Examples:
import { TypedEventEmitter } from "@fluidframework/common-utils";
// Define event interface with typed signatures
interface MyEvents {
data: (value: string, timestamp: number) => void;
error: (error: Error) => void;
statusChange: (status: "online" | "offline") => void;
userAction: (userId: string, action: string, metadata?: any) => void;
}
// Create typed event emitter
class MyService extends TypedEventEmitter<MyEvents> {
private status: "online" | "offline" = "offline";
connect(): void {
// Simulate connection
setTimeout(() => {
this.status = "online";
this.emit("statusChange", "online"); // Fully typed!
this.emit("data", "Connected successfully", Date.now());
}, 1000);
}
processData(data: string): void {
try {
// Process data logic
this.emit("data", `Processed: ${data}`, Date.now());
} catch (error) {
this.emit("error", error as Error); // Type-safe error emission
}
}
simulateUserAction(userId: string): void {
this.emit("userAction", userId, "click", { button: "submit" });
}
}
// Usage with type safety
const service = new MyService();
// Type-safe event handling
service.on("data", (value, timestamp) => {
// TypeScript knows: value is string, timestamp is number
console.log(`Data received: ${value} at ${timestamp}`);
});
service.on("error", (error) => {
// TypeScript knows: error is Error
console.error("Service error:", error.message);
});
service.on("statusChange", (status) => {
// TypeScript knows: status is "online" | "offline"
console.log(`Status changed to: ${status}`);
});
service.on("userAction", (userId, action, metadata) => {
// TypeScript knows all parameter types
console.log(`User ${userId} performed ${action}`, metadata);
});
// Compile-time error if you use wrong types:
// service.emit("data", 123, "wrong type"); // Error!
// service.on("nonexistent", () => {}); // Error!
service.connect();
service.processData("sample data");
service.simulateUserAction("user123");Base class for forwarding events from one EventEmitter to another with automatic cleanup.
/**
* Base class for forwarding events from source EventEmitter
* Extends TypedEventEmitter and implements IDisposable for resource management
* Automatically manages event subscription lifecycle
*/
class EventForwarder<TEvents> extends TypedEventEmitter<TEvents> implements IDisposable {
/**
* Creates a new EventForwarder
* @param from - Source event emitter to forward events from
* @param to - Optional target event emitter to forward events to
*/
constructor(from: IEventProvider<TEvents>, to?: IEventProvider<TEvents>);
/** Whether the forwarder has been disposed */
readonly disposed: boolean;
/**
* Disposes the forwarder, cleaning up all event subscriptions
* After disposal, no more events will be forwarded
*/
dispose(): void;
}Usage Examples:
import { EventForwarder } from "@fluidframework/common-utils";
// Event forwarding for service composition
interface ServiceEvents {
data: (payload: any) => void;
error: (error: Error) => void;
status: (status: string) => void;
}
class DatabaseService extends TypedEventEmitter<ServiceEvents> {
query(sql: string): void {
// Simulate database query
setTimeout(() => {
this.emit("data", { result: "query result" });
this.emit("status", "query completed");
}, 100);
}
simulateError(): void {
this.emit("error", new Error("Database connection failed"));
}
}
class ApiService extends TypedEventEmitter<ServiceEvents> {
private dbService = new DatabaseService();
private forwarder: EventForwarder<ServiceEvents>;
constructor() {
super();
// Forward database events through API service
this.forwarder = new EventForwarder(this.dbService, this);
}
getData(): void {
this.dbService.query("SELECT * FROM users");
}
shutdown(): void {
// Clean up event forwarding
this.forwarder.dispose();
}
}
// Usage
const apiService = new ApiService();
// Listen to forwarded events
apiService.on("data", (payload) => {
console.log("Received data:", payload);
});
apiService.on("error", (error) => {
console.error("Service error:", error.message);
});
apiService.on("status", (status) => {
console.log("Status:", status);
});
apiService.getData(); // Triggers forwarded events
// Clean shutdown
apiService.shutdown(); // Disposes forwarder, stops event forwarding
// Event aggregation pattern
class EventAggregator<TEvents> extends TypedEventEmitter<TEvents> {
private forwarders: EventForwarder<TEvents>[] = [];
addSource(source: IEventProvider<TEvents>): void {
const forwarder = new EventForwarder(source, this);
this.forwarders.push(forwarder);
}
dispose(): void {
// Clean up all forwarders
this.forwarders.forEach(f => f.dispose());
this.forwarders.length = 0;
}
}
// Aggregate events from multiple sources
const aggregator = new EventAggregator<ServiceEvents>();
const db1 = new DatabaseService();
const db2 = new DatabaseService();
aggregator.addSource(db1);
aggregator.addSource(db2);
// Listen to aggregated events
aggregator.on("data", (payload) => {
console.log("Data from any source:", payload);
});
// Events from both db1 and db2 will be forwarded to aggregator listenersimport { TypedEventEmitter } from "@fluidframework/common-utils";
// Global event bus for application-wide communication
interface GlobalEvents {
userLogin: (user: { id: string; name: string }) => void;
userLogout: (userId: string) => void;
notification: (message: string, type: "info" | "warning" | "error") => void;
dataUpdate: (entityType: string, entityId: string, data: any) => void;
}
class EventBus extends TypedEventEmitter<GlobalEvents> {
private static instance: EventBus | null = null;
static getInstance(): EventBus {
if (!EventBus.instance) {
EventBus.instance = new EventBus();
}
return EventBus.instance;
}
// Convenience methods for common operations
notifyInfo(message: string): void {
this.emit("notification", message, "info");
}
notifyError(message: string): void {
this.emit("notification", message, "error");
}
updateData(entityType: string, entityId: string, data: any): void {
this.emit("dataUpdate", entityType, entityId, data);
}
}
// Usage across application modules
const eventBus = EventBus.getInstance();
// Module 1: User authentication
class AuthModule {
login(credentials: any): void {
// Authentication logic
const user = { id: "123", name: "John Doe" };
eventBus.emit("userLogin", user);
eventBus.notifyInfo(`Welcome, ${user.name}!`);
}
logout(userId: string): void {
eventBus.emit("userLogout", userId);
eventBus.notifyInfo("You have been logged out");
}
}
// Module 2: UI notification handler
class NotificationHandler {
constructor() {
eventBus.on("notification", (message, type) => {
this.showNotification(message, type);
});
eventBus.on("userLogin", (user) => {
this.updateUserDisplay(user);
});
}
private showNotification(message: string, type: string): void {
console.log(`[${type.toUpperCase()}] ${message}`);
}
private updateUserDisplay(user: any): void {
console.log(`UI: User ${user.name} is now logged in`);
}
}import { TypedEventEmitter, Deferred } from "@fluidframework/common-utils";
interface RequestResponseEvents {
request: (requestId: string, data: any, respond: (response: any) => void) => void;
response: (requestId: string, data: any) => void;
}
class RequestResponseBus extends TypedEventEmitter<RequestResponseEvents> {
private pendingRequests = new Map<string, Deferred<any>>();
constructor() {
super();
// Handle responses
this.on("response", (requestId, data) => {
const deferred = this.pendingRequests.get(requestId);
if (deferred) {
deferred.resolve(data);
this.pendingRequests.delete(requestId);
}
});
}
async request<T>(data: any, timeout: number = 5000): Promise<T> {
const requestId = this.generateRequestId();
const deferred = new Deferred<T>();
this.pendingRequests.set(requestId, deferred);
// Set timeout
const timeoutHandle = setTimeout(() => {
if (this.pendingRequests.has(requestId)) {
this.pendingRequests.delete(requestId);
deferred.reject(new Error(`Request ${requestId} timed out`));
}
}, timeout);
// Send request
this.emit("request", requestId, data, (response: T) => {
clearTimeout(timeoutHandle);
this.emit("response", requestId, response);
});
return deferred.promise;
}
onRequest(handler: (data: any) => any): void {
this.on("request", async (requestId, data, respond) => {
try {
const result = await handler(data);
respond(result);
} catch (error) {
respond({ error: error.message });
}
});
}
private generateRequestId(): string {
return `req_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
// Usage
const rrBus = new RequestResponseBus();
// Service that handles requests
rrBus.onRequest(async (data) => {
if (data.type === "getUserData") {
// Simulate async data fetch
await new Promise(resolve => setTimeout(resolve, 100));
return { userId: data.userId, name: "John Doe", email: "john@example.com" };
}
throw new Error("Unknown request type");
});
// Client making requests
async function fetchUserData(userId: string) {
try {
const userData = await rrBus.request({ type: "getUserData", userId }, 3000);
console.log("User data:", userData);
} catch (error) {
console.error("Failed to fetch user data:", error.message);
}
}
fetchUserData("123");These event handling utilities provide the foundation for building reactive, type-safe event-driven architectures in Fluid Framework applications.