Observable pattern and event system implementation providing reactive extensions for asynchronous data streams and event-driven programming with .NET event model compatibility.
Reactive extensions implementation with Observer pattern for handling asynchronous data streams and event sequences.
// Observer interface for receiving notifications
interface IObserver<T> {
OnNext: (x: T) => void;
OnError: (e: any) => void;
OnCompleted: () => void;
}
// Observable interface for subscribing to notifications
interface IObservable<T> {
Subscribe: (o: IObserver<T>) => IDisposable;
}// Concrete observer implementation
class Observer<T> implements IObserver<T> {
constructor(
onNext: (x: T) => void,
onError?: (e: any) => void,
onCompleted?: () => void
);
OnNext: (x: T) => void;
OnError: (e: any) => void;
OnCompleted: () => void;
}
// Usage
import { Observer } from "fable-library/Observable.js";
const observer = new Observer<number>(
value => console.log(`Next: ${value}`),
error => console.log(`Error: ${error}`),
() => console.log("Completed")
);
// The observer can be used with any observable
// observer.OnNext(42); // Prints: "Next: 42"
// observer.OnCompleted(); // Prints: "Completed"// Protected execution wrapper
function protect<T>(f: () => T, succeed: (x: T) => void, fail: (e: any) => void): void;
// Usage
import { protect } from "fable-library/Observable.js";
// Safely execute potentially throwing functions
protect(
() => JSON.parse('{"valid": "json"}'),
result => console.log("Parsed:", result),
error => console.log("Parse failed:", error)
);
protect(
() => JSON.parse('invalid json'),
result => console.log("Parsed:", result),
error => console.log("Parse failed:", error) // This will be called
);// Simple subscription with callback
function add<T>(callback: (x: T) => void, source: IObservable<T>): void;
// Subscription with disposable return
function subscribe<T>(callback: (x: T) => void, source: IObservable<T>): IDisposable;
// Usage
import { add, subscribe } from "fable-library/Observable.js";
// Example observable (would be created by some other mechanism)
declare const numberStream: IObservable<number>;
// Simple subscription
add(value => console.log(`Received: ${value}`), numberStream);
// Subscription with cleanup capability
const subscription = subscribe(
value => console.log(`Subscribed: ${value}`),
numberStream
);
// Later, unsubscribe
subscription.Dispose();// Transform values
function map<T, U>(mapping: (x: T) => U, source: IObservable<T>): IObservable<U>;
// Choose/filter with transformation
function choose<T, U>(chooser: (x: T) => U, source: IObservable<T>): IObservable<U>;
// Filter values
function filter<T>(predicate: (x: T) => boolean, source: IObservable<T>): IObservable<T>;
// Usage
import { map, choose, filter } from "fable-library/Observable.js";
declare const numberStream: IObservable<number>;
// Transform all values
const doubled = map(x => x * 2, numberStream);
// Transform and filter in one operation
const evenDoubled = choose(x => x % 2 === 0 ? x * 2 : null, numberStream);
// Filter only positive numbers
const positiveOnly = filter(x => x > 0, numberStream);
// Chain operations
const processed = map(
x => x.toString(),
filter(x => x > 10, doubled)
);
// Subscribe to processed stream
subscribe(value => console.log(`Final: ${value}`), processed);// Merge two observables
function merge<T>(source1: IObservable<T>, source2: IObservable<T>): IObservable<T>;
// Usage
import { merge, map } from "fable-library/Observable.js";
declare const stream1: IObservable<number>;
declare const stream2: IObservable<number>;
// Combine multiple streams
const merged = merge(stream1, stream2);
// All values from both streams will be emitted
subscribe(value => console.log(`Merged: ${value}`), merged);
// Transform each stream before merging
const mappedStream1 = map(x => `A: ${x}`, stream1);
const mappedStream2 = map(x => `B: ${x}`, stream2);
const labeledMerge = merge(mappedStream1, mappedStream2);// Scan (accumulate with intermediate results)
function scan<U, T>(collector: (u: U, t: T) => U, state: U, source: IObservable<T>): IObservable<U>;
// Pairwise (consecutive pairs)
function pairwise<T>(source: IObservable<T>): IObservable<[T, T]>;
// Usage
import { scan, pairwise } from "fable-library/Observable.js";
declare const numberStream: IObservable<number>;
// Running total
const runningSum = scan((acc, x) => acc + x, 0, numberStream);
subscribe(total => console.log(`Running total: ${total}`), runningSum);
// Running maximum
const runningMax = scan((max, x) => Math.max(max, x), Number.NEGATIVE_INFINITY, numberStream);
// Consecutive pairs for trend analysis
const pairs = pairwise(numberStream);
subscribe(([prev, curr]) => {
const change = curr - prev;
console.log(`${prev} → ${curr} (${change > 0 ? '+' : ''}${change})`);
}, pairs);
// Complex aggregation: moving average of last 3 values
const movingAverage = scan(
(window, value) => {
const newWindow = [...window, value].slice(-3);
return newWindow;
},
[] as number[],
numberStream
);
const averages = map(
window => window.length > 0 ? window.reduce((a, b) => a + b) / window.length : 0,
movingAverage
);// Partition into two streams based on predicate
function partition<T>(predicate: (x: T) => boolean, source: IObservable<T>): [IObservable<T>, IObservable<T>];
// Split into different types based on discriminator
function split<T, U1, U2>(splitter: (x: T) => any, source: IObservable<T>): [IObservable<U1>, IObservable<U2>];
// Usage
import { partition, split } from "fable-library/Observable.js";
declare const numberStream: IObservable<number>;
// Partition into even and odd numbers
const [evens, odds] = partition(x => x % 2 === 0, numberStream);
subscribe(x => console.log(`Even: ${x}`), evens);
subscribe(x => console.log(`Odd: ${x}`), odds);
// Split mixed data stream
interface Message {
type: 'info' | 'error';
content: string;
}
declare const messageStream: IObservable<Message>;
const [infoMessages, errorMessages] = split(
msg => msg.type === 'info' ? 0 : 1,
messageStream
);
subscribe(msg => console.log(`Info: ${msg.content}`), infoMessages);
subscribe(msg => console.error(`Error: ${msg.content}`), errorMessages);import { Observer } from "fable-library/Observable.js";
// Create a timer observable
function createTimer(interval: number): IObservable<number> {
return {
Subscribe: (observer: IObserver<number>) => {
let count = 0;
const timer = setInterval(() => {
try {
observer.OnNext(count++);
} catch (e) {
observer.OnError(e);
}
}, interval);
// Return disposable to cleanup
return {
Dispose: () => {
clearInterval(timer);
observer.OnCompleted();
}
};
}
};
}
// Usage
const timer = createTimer(1000); // Emit every second
const subscription = subscribe(
count => console.log(`Timer tick: ${count}`),
timer
);
// Stop after 5 seconds
setTimeout(() => subscription.Dispose(), 5000);// Create observable from DOM events (browser environment)
function createMouseMoveObservable(): IObservable<{x: number, y: number}> {
return {
Subscribe: (observer: IObserver<{x: number, y: number}>) => {
const handler = (event: MouseEvent) => {
try {
observer.OnNext({ x: event.clientX, y: event.clientY });
} catch (e) {
observer.OnError(e);
}
};
document.addEventListener('mousemove', handler);
return {
Dispose: () => {
document.removeEventListener('mousemove', handler);
observer.OnCompleted();
}
};
}
};
}
// Usage with reactive operations
const mouseMove = createMouseMoveObservable();
// Track only significant movements (> 10 pixels)
const significantMoves = filter(
pos => Math.abs(pos.x) > 10 || Math.abs(pos.y) > 10,
mouseMove
);
// Calculate distance from origin
const distances = map(
pos => Math.sqrt(pos.x * pos.x + pos.y * pos.y),
significantMoves
);
// Subscribe to distance changes
subscribe(distance => {
console.log(`Mouse distance from origin: ${distance.toFixed(2)}`);
}, distances);F# event system with .NET event compatibility, providing type-safe event handling and functional event operations.
// Function delegate types
type Delegate<T> = (x: T) => void;
type DotNetDelegate<T> = (sender: any, x: T) => void;// Delegate event interface (.NET style)
interface IDelegateEvent<T> {
AddHandler(d: DotNetDelegate<T>): void;
RemoveHandler(d: DotNetDelegate<T>): void;
}
// Full event interface (combines Observable and Delegate)
interface IEvent<T> extends IObservable<T>, IDelegateEvent<T> {
Publish: IEvent<T>;
Trigger(x: T): void;
}// Event class implementing F# events
class Event<T> implements IEvent<T> {
constructor(_subscriber?: (o: IObserver<T>) => IDisposable, delegates?: Array<Delegate<T>>);
// Delegate management
delegates: Array<Delegate<T>>;
Add(f: Delegate<T>): void;
// Properties
readonly Publish: IEvent<T>;
// Event operations
Trigger(value: T): void;
// .NET style handlers
AddHandler(handler: DotNetDelegate<T>): void;
RemoveHandler(handler: DotNetDelegate<T>): void;
// Observable interface
Subscribe(arg: IObserver<T> | Delegate<T>): IDisposable;
}
// Usage
import { Event } from "fable-library/Event.js";
// Create event
const buttonClicked = new Event<string>();
// Add handlers using different methods
buttonClicked.Add(msg => console.log(`Handler 1: ${msg}`));
const handler2 = (msg: string) => console.log(`Handler 2: ${msg}`);
buttonClicked.Add(handler2);
// .NET style handler
const dotNetHandler = (sender: any, msg: string) => {
console.log(`DotNet Handler - Sender: ${sender}, Message: ${msg}`);
};
buttonClicked.AddHandler(dotNetHandler);
// Trigger event
buttonClicked.Trigger("Button was clicked!");
// Remove handler
buttonClicked.RemoveHandler(dotNetHandler);// Basic event subscription
function add<T>(callback: (x: T) => void, sourceEvent: IEvent<T>): void;
// Usage
import { add } from "fable-library/Event.js";
const dataReceived = new Event<{id: number, data: string}>();
// Simple subscription
add(item => console.log(`Received: ${item.id} - ${item.data}`), dataReceived);
// Multiple subscribers
add(item => updateUI(item), dataReceived);
add(item => logToServer(item), dataReceived);
add(item => cacheData(item), dataReceived);// Transform event values
function map<T, U>(mapping: (x: T) => U, sourceEvent: IEvent<T>): IEvent<U>;
// Choose/filter with transformation
function choose<T, U>(chooser: (x: T) => U, sourceEvent: IEvent<T>): IEvent<U>;
// Filter events
function filter<T>(predicate: (x: T) => boolean, sourceEvent: IEvent<T>): IEvent<T>;
// Usage
import { map, choose, filter } from "fable-library/Event.js";
const mouseEvents = new Event<{x: number, y: number, button: number}>();
// Transform to just coordinates
const coordinates = map(e => `(${e.x}, ${e.y})`, mouseEvents);
add(coord => console.log(`Coordinates: ${coord}`), coordinates);
// Filter for left clicks only
const leftClicks = filter(e => e.button === 0, mouseEvents);
add(e => console.log(`Left click at (${e.x}, ${e.y})`), leftClicks);
// Choose significant movements (> 50px from origin)
const significantMoves = choose(e => {
const distance = Math.sqrt(e.x * e.x + e.y * e.y);
return distance > 50 ? distance : null;
}, mouseEvents);
add(distance => console.log(`Significant move: ${distance}`), significantMoves);// Merge multiple events
function merge<T>(event1: IEvent<T>, event2: IEvent<T>): IEvent<T>;
// Usage
import { merge, map } from "fable-library/Event.js";
const keyboardEvents = new Event<{key: string, type: 'up' | 'down'}>();
const mouseEvents = new Event<{button: number, type: 'up' | 'down'}>();
// Convert to common format
const keyboardInputs = map(e => `Key ${e.key} ${e.type}`, keyboardEvents);
const mouseInputs = map(e => `Mouse ${e.button} ${e.type}`, mouseEvents);
// Merge all input events
const allInputs = merge(keyboardInputs, mouseInputs);
add(input => console.log(`Input: ${input}`), allInputs);
// Trigger events
keyboardEvents.Trigger({key: 'A', type: 'down'});
mouseEvents.Trigger({button: 0, type: 'down'});
// Output:
// Input: Key A down
// Input: Mouse 0 down// Scan (accumulate state)
function scan<U, T>(collector: (u: U, t: T) => U, state: U, sourceEvent: IEvent<T>): IEvent<U>;
// Pairwise (consecutive pairs)
function pairwise<T>(sourceEvent: IEvent<T>): IEvent<[T, T]>;
// Usage
import { scan, pairwise } from "fable-library/Event.js";
const numberEvents = new Event<number>();
// Running total
const runningTotal = scan((total, num) => total + num, 0, numberEvents);
add(total => console.log(`Running total: ${total}`), runningTotal);
// Count events
const eventCount = scan((count, _) => count + 1, 0, numberEvents);
add(count => console.log(`Event count: ${count}`), eventCount);
// Track pairs for trends
const numberPairs = pairwise(numberEvents);
add(([prev, curr]) => {
const trend = curr > prev ? "↑" : curr < prev ? "↓" : "→";
console.log(`${prev} ${trend} ${curr}`);
}, numberPairs);
// Trigger some events
numberEvents.Trigger(10);
numberEvents.Trigger(15);
numberEvents.Trigger(12);
// Output:
// Running total: 10, Event count: 1
// Running total: 25, Event count: 2, 10 ↑ 15
// Running total: 37, Event count: 3, 15 ↓ 12// Partition events based on predicate
function partition<T>(predicate: (x: T) => boolean, sourceEvent: IEvent<T>): [IEvent<T>, IEvent<T>];
// Split events into different types
function split<T, U1, U2>(splitter: (x: T) => any, sourceEvent: IEvent<T>): [IEvent<U1>, IEvent<U2>];
// Usage
import { partition, split } from "fable-library/Event.js";
const networkEvents = new Event<{status: 'success' | 'error', data?: any, error?: string}>();
// Partition into success and error events
const [successes, errors] = partition(e => e.status === 'success', networkEvents);
add(e => console.log(`Success: ${JSON.stringify(e.data)}`), successes);
add(e => console.log(`Error: ${e.error}`), errors);
// Split log events by severity
const logEvents = new Event<{level: 'info' | 'warning' | 'error', message: string}>();
const [infoLogs, criticalLogs] = split(log => log.level === 'info' ? 0 : 1, logEvents);
add(log => console.log(`ℹ️ ${log.message}`), infoLogs);
add(log => console.log(`⚠️ ${log.message}`), criticalLogs);import { Event, map, filter, scan } from "fable-library/Event.js";
interface FormField {
name: string;
value: string;
isValid: boolean;
}
// Create events for form interactions
const fieldChanged = new Event<FormField>();
const formSubmitted = new Event<void>();
// Track field validity
const validFields = filter(field => field.isValid, fieldChanged);
const invalidFields = filter(field => !field.isValid, fieldChanged);
// Count valid fields
const validFieldCount = scan(
(validFields, field) => {
const newSet = new Set(validFields);
if (field.isValid) {
newSet.add(field.name);
} else {
newSet.delete(field.name);
}
return newSet;
},
new Set<string>(),
fieldChanged
);
const isFormValid = map(validSet => validSet.size >= 3, validFieldCount); // Need 3+ valid fields
// Subscribe to validation state
add(isValid => {
const submitButton = document.getElementById('submit');
if (submitButton) {
submitButton.disabled = !isValid;
}
}, isFormValid);
// Handle form submission
add(() => {
console.log("Form submitted!");
}, formSubmitted);
// Simulate field changes
fieldChanged.Trigger({name: 'email', value: 'user@example.com', isValid: true});
fieldChanged.Trigger({name: 'password', value: '12345678', isValid: true});
fieldChanged.Trigger({name: 'name', value: 'John Doe', isValid: true});import { Event, map, scan, filter, merge } from "fable-library/Event.js";
interface DataPoint {
timestamp: number;
metric: string;
value: number;
}
// Data source events
const cpuUsage = new Event<DataPoint>();
const memoryUsage = new Event<DataPoint>();
const networkTraffic = new Event<DataPoint>();
// Merge all metrics
const allMetrics = merge(
merge(cpuUsage.Publish, memoryUsage.Publish),
networkTraffic.Publish
);
// Calculate rolling averages
const rollingAverages = scan(
(averages, dataPoint) => {
const key = dataPoint.metric;
const current = averages.get(key) || [];
const updated = [...current, dataPoint.value].slice(-10); // Last 10 values
const average = updated.reduce((a, b) => a + b) / updated.length;
return new Map(averages).set(key, {values: updated, average});
},
new Map<string, {values: number[], average: number}>(),
allMetrics
);
// Alert on high usage
const highUsageAlerts = filter(
dataPoint => dataPoint.metric === 'cpu' && dataPoint.value > 90,
allMetrics
);
// Subscribe to updates
add(averages => {
console.log("Current averages:");
for (const [metric, data] of averages) {
console.log(` ${metric}: ${data.average.toFixed(2)}%`);
}
}, rollingAverages);
add(alert => {
console.warn(`🚨 High CPU usage: ${alert.value}% at ${new Date(alert.timestamp)}`);
}, highUsageAlerts);
// Simulate data
setInterval(() => {
const timestamp = Date.now();
cpuUsage.Trigger({timestamp, metric: 'cpu', value: Math.random() * 100});
memoryUsage.Trigger({timestamp, metric: 'memory', value: Math.random() * 100});
networkTraffic.Trigger({timestamp, metric: 'network', value: Math.random() * 1000});
}, 1000);