or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-programming.mdcharacter-operations.mdcollections.mdconfiguration.mdcore-infrastructure.mddata-encoding.mddate-time.mdexternal-integration.mdindex.mdnumeric-types.mdreactive-programming.mdstring-operations.mdtype-system.md
tile.json

reactive-programming.mddocs/

Reactive Programming

Observable pattern and event system implementation providing reactive extensions for asynchronous data streams and event-driven programming with .NET event model compatibility.

Observable Module - Observable Pattern

Reactive extensions implementation with Observer pattern for handling asynchronous data streams and event sequences.

Core Interfaces

// Observer interface for receiving notifications
interface IObserver<T> {
    OnNext: (x: T) => void;
    OnError: (e: any) => void;
    OnCompleted: () => void;
}

// Observable interface for subscribing to notifications
interface IObservable<T> {
    Subscribe: (o: IObserver<T>) => IDisposable;
}

Observer Implementation

// Concrete observer implementation
class Observer<T> implements IObserver<T> {
    constructor(
        onNext: (x: T) => void, 
        onError?: (e: any) => void, 
        onCompleted?: () => void
    );
    
    OnNext: (x: T) => void;
    OnError: (e: any) => void;
    OnCompleted: () => void;
}

// Usage
import { Observer } from "fable-library/Observable.js";

const observer = new Observer<number>(
    value => console.log(`Next: ${value}`),
    error => console.log(`Error: ${error}`),
    () => console.log("Completed")
);

// The observer can be used with any observable
// observer.OnNext(42); // Prints: "Next: 42"
// observer.OnCompleted(); // Prints: "Completed"

Utility Functions

// Protected execution wrapper
function protect<T>(f: () => T, succeed: (x: T) => void, fail: (e: any) => void): void;

// Usage
import { protect } from "fable-library/Observable.js";

// Safely execute potentially throwing functions
protect(
    () => JSON.parse('{"valid": "json"}'),
    result => console.log("Parsed:", result),
    error => console.log("Parse failed:", error)
);

protect(
    () => JSON.parse('invalid json'),
    result => console.log("Parsed:", result),
    error => console.log("Parse failed:", error) // This will be called
);

Subscription Operations

// Simple subscription with callback
function add<T>(callback: (x: T) => void, source: IObservable<T>): void;

// Subscription with disposable return
function subscribe<T>(callback: (x: T) => void, source: IObservable<T>): IDisposable;

// Usage
import { add, subscribe } from "fable-library/Observable.js";

// Example observable (would be created by some other mechanism)
declare const numberStream: IObservable<number>;

// Simple subscription
add(value => console.log(`Received: ${value}`), numberStream);

// Subscription with cleanup capability
const subscription = subscribe(
    value => console.log(`Subscribed: ${value}`), 
    numberStream
);

// Later, unsubscribe
subscription.Dispose();

Transformation Operations

// Transform values
function map<T, U>(mapping: (x: T) => U, source: IObservable<T>): IObservable<U>;

// Choose/filter with transformation
function choose<T, U>(chooser: (x: T) => U, source: IObservable<T>): IObservable<U>;

// Filter values
function filter<T>(predicate: (x: T) => boolean, source: IObservable<T>): IObservable<T>;

// Usage
import { map, choose, filter } from "fable-library/Observable.js";

declare const numberStream: IObservable<number>;

// Transform all values
const doubled = map(x => x * 2, numberStream);

// Transform and filter in one operation
const evenDoubled = choose(x => x % 2 === 0 ? x * 2 : null, numberStream);

// Filter only positive numbers
const positiveOnly = filter(x => x > 0, numberStream);

// Chain operations
const processed = map(
    x => x.toString(), 
    filter(x => x > 10, doubled)
);

// Subscribe to processed stream
subscribe(value => console.log(`Final: ${value}`), processed);

Combination Operations

// Merge two observables
function merge<T>(source1: IObservable<T>, source2: IObservable<T>): IObservable<T>;

// Usage
import { merge, map } from "fable-library/Observable.js";

declare const stream1: IObservable<number>;
declare const stream2: IObservable<number>;

// Combine multiple streams
const merged = merge(stream1, stream2);

// All values from both streams will be emitted
subscribe(value => console.log(`Merged: ${value}`), merged);

// Transform each stream before merging
const mappedStream1 = map(x => `A: ${x}`, stream1);
const mappedStream2 = map(x => `B: ${x}`, stream2);
const labeledMerge = merge(mappedStream1, mappedStream2);

Aggregation Operations

// Scan (accumulate with intermediate results)
function scan<U, T>(collector: (u: U, t: T) => U, state: U, source: IObservable<T>): IObservable<U>;

// Pairwise (consecutive pairs)
function pairwise<T>(source: IObservable<T>): IObservable<[T, T]>;

// Usage
import { scan, pairwise } from "fable-library/Observable.js";

declare const numberStream: IObservable<number>;

// Running total
const runningSum = scan((acc, x) => acc + x, 0, numberStream);
subscribe(total => console.log(`Running total: ${total}`), runningSum);

// Running maximum
const runningMax = scan((max, x) => Math.max(max, x), Number.NEGATIVE_INFINITY, numberStream);

// Consecutive pairs for trend analysis
const pairs = pairwise(numberStream);
subscribe(([prev, curr]) => {
    const change = curr - prev;
    console.log(`${prev} → ${curr} (${change > 0 ? '+' : ''}${change})`);
}, pairs);

// Complex aggregation: moving average of last 3 values
const movingAverage = scan(
    (window, value) => {
        const newWindow = [...window, value].slice(-3);
        return newWindow;
    },
    [] as number[],
    numberStream
);

const averages = map(
    window => window.length > 0 ? window.reduce((a, b) => a + b) / window.length : 0,
    movingAverage
);

Partitioning Operations

// Partition into two streams based on predicate
function partition<T>(predicate: (x: T) => boolean, source: IObservable<T>): [IObservable<T>, IObservable<T>];

// Split into different types based on discriminator
function split<T, U1, U2>(splitter: (x: T) => any, source: IObservable<T>): [IObservable<U1>, IObservable<U2>];

// Usage
import { partition, split } from "fable-library/Observable.js";

declare const numberStream: IObservable<number>;

// Partition into even and odd numbers
const [evens, odds] = partition(x => x % 2 === 0, numberStream);

subscribe(x => console.log(`Even: ${x}`), evens);
subscribe(x => console.log(`Odd: ${x}`), odds);

// Split mixed data stream
interface Message {
    type: 'info' | 'error';
    content: string;
}

declare const messageStream: IObservable<Message>;

const [infoMessages, errorMessages] = split(
    msg => msg.type === 'info' ? 0 : 1,
    messageStream
);

subscribe(msg => console.log(`Info: ${msg.content}`), infoMessages);
subscribe(msg => console.error(`Error: ${msg.content}`), errorMessages);

Practical Observable Examples

Creating Custom Observables

import { Observer } from "fable-library/Observable.js";

// Create a timer observable
function createTimer(interval: number): IObservable<number> {
    return {
        Subscribe: (observer: IObserver<number>) => {
            let count = 0;
            const timer = setInterval(() => {
                try {
                    observer.OnNext(count++);
                } catch (e) {
                    observer.OnError(e);
                }
            }, interval);
            
            // Return disposable to cleanup
            return {
                Dispose: () => {
                    clearInterval(timer);
                    observer.OnCompleted();
                }
            };
        }
    };
}

// Usage
const timer = createTimer(1000); // Emit every second
const subscription = subscribe(
    count => console.log(`Timer tick: ${count}`),
    timer
);

// Stop after 5 seconds
setTimeout(() => subscription.Dispose(), 5000);

Mouse Movement Observable

// Create observable from DOM events (browser environment)
function createMouseMoveObservable(): IObservable<{x: number, y: number}> {
    return {
        Subscribe: (observer: IObserver<{x: number, y: number}>) => {
            const handler = (event: MouseEvent) => {
                try {
                    observer.OnNext({ x: event.clientX, y: event.clientY });
                } catch (e) {
                    observer.OnError(e);
                }
            };
            
            document.addEventListener('mousemove', handler);
            
            return {
                Dispose: () => {
                    document.removeEventListener('mousemove', handler);
                    observer.OnCompleted();
                }
            };
        }
    };
}

// Usage with reactive operations
const mouseMove = createMouseMoveObservable();

// Track only significant movements (> 10 pixels)
const significantMoves = filter(
    pos => Math.abs(pos.x) > 10 || Math.abs(pos.y) > 10,
    mouseMove
);

// Calculate distance from origin
const distances = map(
    pos => Math.sqrt(pos.x * pos.x + pos.y * pos.y),
    significantMoves
);

// Subscribe to distance changes
subscribe(distance => {
    console.log(`Mouse distance from origin: ${distance.toFixed(2)}`);
}, distances);

Event Module - Event System

F# event system with .NET event compatibility, providing type-safe event handling and functional event operations.

Core Types

// Function delegate types
type Delegate<T> = (x: T) => void;
type DotNetDelegate<T> = (sender: any, x: T) => void;

Event Interfaces

// Delegate event interface (.NET style)
interface IDelegateEvent<T> {
    AddHandler(d: DotNetDelegate<T>): void;
    RemoveHandler(d: DotNetDelegate<T>): void;
}

// Full event interface (combines Observable and Delegate)
interface IEvent<T> extends IObservable<T>, IDelegateEvent<T> {
    Publish: IEvent<T>;
    Trigger(x: T): void;
}

Event Implementation

// Event class implementing F# events
class Event<T> implements IEvent<T> {
    constructor(_subscriber?: (o: IObserver<T>) => IDisposable, delegates?: Array<Delegate<T>>);
    
    // Delegate management
    delegates: Array<Delegate<T>>;
    Add(f: Delegate<T>): void;
    
    // Properties
    readonly Publish: IEvent<T>;
    
    // Event operations
    Trigger(value: T): void;
    
    // .NET style handlers
    AddHandler(handler: DotNetDelegate<T>): void;
    RemoveHandler(handler: DotNetDelegate<T>): void;
    
    // Observable interface
    Subscribe(arg: IObserver<T> | Delegate<T>): IDisposable;
}

// Usage
import { Event } from "fable-library/Event.js";

// Create event
const buttonClicked = new Event<string>();

// Add handlers using different methods
buttonClicked.Add(msg => console.log(`Handler 1: ${msg}`));

const handler2 = (msg: string) => console.log(`Handler 2: ${msg}`);
buttonClicked.Add(handler2);

// .NET style handler
const dotNetHandler = (sender: any, msg: string) => {
    console.log(`DotNet Handler - Sender: ${sender}, Message: ${msg}`);
};
buttonClicked.AddHandler(dotNetHandler);

// Trigger event
buttonClicked.Trigger("Button was clicked!");

// Remove handler
buttonClicked.RemoveHandler(dotNetHandler);

Event Operations

// Basic event subscription
function add<T>(callback: (x: T) => void, sourceEvent: IEvent<T>): void;

// Usage
import { add } from "fable-library/Event.js";

const dataReceived = new Event<{id: number, data: string}>();

// Simple subscription
add(item => console.log(`Received: ${item.id} - ${item.data}`), dataReceived);

// Multiple subscribers
add(item => updateUI(item), dataReceived);
add(item => logToServer(item), dataReceived);
add(item => cacheData(item), dataReceived);

Event Transformation

// Transform event values
function map<T, U>(mapping: (x: T) => U, sourceEvent: IEvent<T>): IEvent<U>;

// Choose/filter with transformation  
function choose<T, U>(chooser: (x: T) => U, sourceEvent: IEvent<T>): IEvent<U>;

// Filter events
function filter<T>(predicate: (x: T) => boolean, sourceEvent: IEvent<T>): IEvent<T>;

// Usage
import { map, choose, filter } from "fable-library/Event.js";

const mouseEvents = new Event<{x: number, y: number, button: number}>();

// Transform to just coordinates
const coordinates = map(e => `(${e.x}, ${e.y})`, mouseEvents);
add(coord => console.log(`Coordinates: ${coord}`), coordinates);

// Filter for left clicks only
const leftClicks = filter(e => e.button === 0, mouseEvents);
add(e => console.log(`Left click at (${e.x}, ${e.y})`), leftClicks);

// Choose significant movements (> 50px from origin)
const significantMoves = choose(e => {
    const distance = Math.sqrt(e.x * e.x + e.y * e.y);
    return distance > 50 ? distance : null;
}, mouseEvents);

add(distance => console.log(`Significant move: ${distance}`), significantMoves);

Event Combination

// Merge multiple events
function merge<T>(event1: IEvent<T>, event2: IEvent<T>): IEvent<T>;

// Usage
import { merge, map } from "fable-library/Event.js";

const keyboardEvents = new Event<{key: string, type: 'up' | 'down'}>();
const mouseEvents = new Event<{button: number, type: 'up' | 'down'}>();

// Convert to common format
const keyboardInputs = map(e => `Key ${e.key} ${e.type}`, keyboardEvents);
const mouseInputs = map(e => `Mouse ${e.button} ${e.type}`, mouseEvents);

// Merge all input events
const allInputs = merge(keyboardInputs, mouseInputs);
add(input => console.log(`Input: ${input}`), allInputs);

// Trigger events
keyboardEvents.Trigger({key: 'A', type: 'down'});
mouseEvents.Trigger({button: 0, type: 'down'});
// Output: 
// Input: Key A down
// Input: Mouse 0 down

Event Aggregation

// Scan (accumulate state)
function scan<U, T>(collector: (u: U, t: T) => U, state: U, sourceEvent: IEvent<T>): IEvent<U>;

// Pairwise (consecutive pairs)
function pairwise<T>(sourceEvent: IEvent<T>): IEvent<[T, T]>;

// Usage
import { scan, pairwise } from "fable-library/Event.js";

const numberEvents = new Event<number>();

// Running total
const runningTotal = scan((total, num) => total + num, 0, numberEvents);
add(total => console.log(`Running total: ${total}`), runningTotal);

// Count events
const eventCount = scan((count, _) => count + 1, 0, numberEvents);
add(count => console.log(`Event count: ${count}`), eventCount);

// Track pairs for trends
const numberPairs = pairwise(numberEvents);
add(([prev, curr]) => {
    const trend = curr > prev ? "↑" : curr < prev ? "↓" : "→";
    console.log(`${prev} ${trend} ${curr}`);
}, numberPairs);

// Trigger some events
numberEvents.Trigger(10);
numberEvents.Trigger(15);
numberEvents.Trigger(12);
// Output:
// Running total: 10, Event count: 1
// Running total: 25, Event count: 2, 10 ↑ 15
// Running total: 37, Event count: 3, 15 ↓ 12

Event Partitioning

// Partition events based on predicate
function partition<T>(predicate: (x: T) => boolean, sourceEvent: IEvent<T>): [IEvent<T>, IEvent<T>];

// Split events into different types
function split<T, U1, U2>(splitter: (x: T) => any, sourceEvent: IEvent<T>): [IEvent<U1>, IEvent<U2>];

// Usage
import { partition, split } from "fable-library/Event.js";

const networkEvents = new Event<{status: 'success' | 'error', data?: any, error?: string}>();

// Partition into success and error events
const [successes, errors] = partition(e => e.status === 'success', networkEvents);

add(e => console.log(`Success: ${JSON.stringify(e.data)}`), successes);
add(e => console.log(`Error: ${e.error}`), errors);

// Split log events by severity
const logEvents = new Event<{level: 'info' | 'warning' | 'error', message: string}>();
const [infoLogs, criticalLogs] = split(log => log.level === 'info' ? 0 : 1, logEvents);

add(log => console.log(`ℹ️ ${log.message}`), infoLogs);
add(log => console.log(`⚠️ ${log.message}`), criticalLogs);

Practical Event Examples

Form Validation System

import { Event, map, filter, scan } from "fable-library/Event.js";

interface FormField {
    name: string;
    value: string;
    isValid: boolean;
}

// Create events for form interactions
const fieldChanged = new Event<FormField>();
const formSubmitted = new Event<void>();

// Track field validity
const validFields = filter(field => field.isValid, fieldChanged);
const invalidFields = filter(field => !field.isValid, fieldChanged);

// Count valid fields
const validFieldCount = scan(
    (validFields, field) => {
        const newSet = new Set(validFields);
        if (field.isValid) {
            newSet.add(field.name);
        } else {
            newSet.delete(field.name);
        }
        return newSet;
    },
    new Set<string>(),
    fieldChanged
);

const isFormValid = map(validSet => validSet.size >= 3, validFieldCount); // Need 3+ valid fields

// Subscribe to validation state
add(isValid => {
    const submitButton = document.getElementById('submit');
    if (submitButton) {
        submitButton.disabled = !isValid;
    }
}, isFormValid);

// Handle form submission
add(() => {
    console.log("Form submitted!");
}, formSubmitted);

// Simulate field changes
fieldChanged.Trigger({name: 'email', value: 'user@example.com', isValid: true});
fieldChanged.Trigger({name: 'password', value: '12345678', isValid: true});
fieldChanged.Trigger({name: 'name', value: 'John Doe', isValid: true});

Real-time Data Dashboard

import { Event, map, scan, filter, merge } from "fable-library/Event.js";

interface DataPoint {
    timestamp: number;
    metric: string;
    value: number;
}

// Data source events
const cpuUsage = new Event<DataPoint>();
const memoryUsage = new Event<DataPoint>();
const networkTraffic = new Event<DataPoint>();

// Merge all metrics
const allMetrics = merge(
    merge(cpuUsage.Publish, memoryUsage.Publish),
    networkTraffic.Publish
);

// Calculate rolling averages
const rollingAverages = scan(
    (averages, dataPoint) => {
        const key = dataPoint.metric;
        const current = averages.get(key) || [];
        const updated = [...current, dataPoint.value].slice(-10); // Last 10 values
        const average = updated.reduce((a, b) => a + b) / updated.length;
        
        return new Map(averages).set(key, {values: updated, average});
    },
    new Map<string, {values: number[], average: number}>(),
    allMetrics
);

// Alert on high usage
const highUsageAlerts = filter(
    dataPoint => dataPoint.metric === 'cpu' && dataPoint.value > 90,
    allMetrics
);

// Subscribe to updates
add(averages => {
    console.log("Current averages:");
    for (const [metric, data] of averages) {
        console.log(`  ${metric}: ${data.average.toFixed(2)}%`);
    }
}, rollingAverages);

add(alert => {
    console.warn(`🚨 High CPU usage: ${alert.value}% at ${new Date(alert.timestamp)}`);
}, highUsageAlerts);

// Simulate data
setInterval(() => {
    const timestamp = Date.now();
    cpuUsage.Trigger({timestamp, metric: 'cpu', value: Math.random() * 100});
    memoryUsage.Trigger({timestamp, metric: 'memory', value: Math.random() * 100});
    networkTraffic.Trigger({timestamp, metric: 'network', value: Math.random() * 1000});
}, 1000);