Complete async programming infrastructure with F# async workflows, cancellation support, actor model implementation, and integration with JavaScript Promises for comprehensive asynchronous operations.
Core infrastructure for F# async computations providing the foundation for asynchronous workflows with proper cancellation and error handling.
// Continuation function type
type Continuation<T> = (x: T) => void;
// Tuple of continuations for success/error/cancellation
type Continuations<T> = [
Continuation<T>, // Success continuation
Continuation<Error>, // Error continuation
Continuation<OperationCanceledError> // Cancellation continuation
];
// Async computation type
type IAsync<T> = (ctx: IAsyncContext<T>) => void;// Cancellation token for cooperative cancellation
class CancellationToken {
constructor(cancelled?: boolean);
// Properties
readonly isCancelled: boolean;
// Operations
cancel(): void;
addListener(f: () => void): number;
removeListener(id: number): boolean;
register(f: (state?: any) => void, state?: any): IDisposable;
}
// Exception thrown when operation is cancelled
class OperationCanceledError extends Error {
constructor();
message: string;
}
// Usage
import { CancellationToken, OperationCanceledError } from "fable-library/AsyncBuilder.js";
// Create cancellation token
const token = new CancellationToken();
// Register cleanup on cancellation
const cleanup = token.register(() => {
console.log("Operation was cancelled, cleaning up...");
});
// Simulate cancellation after 3 seconds
setTimeout(() => {
token.cancel();
}, 3000);
// Check for cancellation
if (token.isCancelled) {
throw new OperationCanceledError();
}
// Remove cleanup registration
cleanup.Dispose();// Trampoline for preventing stack overflow in deeply nested async operations
class Trampoline {
constructor();
static maxTrampolineCallCount: number;
incrementAndCheck(): boolean;
hijack(f: () => void): void;
}
// Usage
import { Trampoline } from "fable-library/AsyncBuilder.js";
const trampoline = new Trampoline();
// Check if we need to yield control
if (trampoline.incrementAndCheck()) {
// Too many nested calls, yield control
trampoline.hijack(() => {
// Continue operation on next tick
continueAsync();
});
return;
}
// Continue synchronously
continueSync();// Execution context for async operations
interface IAsyncContext<T> {
onSuccess: Continuation<T>;
onError: Continuation<Error>;
onCancel: Continuation<OperationCanceledError>;
cancelToken: CancellationToken;
trampoline: Trampoline;
}
// Usage context is typically created by the async runtime
// and passed to async computation functionsCore builder for F# async workflow syntax.
class AsyncBuilder {
// Monadic bind operation
Bind<T, U>(computation: IAsync<T>, binder: (x: T) => IAsync<U>): IAsync<U>;
// Combine sequential operations
Combine<T>(computation1: IAsync<void>, computation2: IAsync<T>): IAsync<T>;
// Delay execution
Delay<T>(generator: () => IAsync<T>): IAsync<T>;
// Loop over sequence
For<T>(sequence: Iterable<T>, body: (x: T) => IAsync<void>): IAsync<void>;
// Return value
Return<T>(value?: T): IAsync<T>;
// Return from another async computation
ReturnFrom<T>(computation: IAsync<T>): IAsync<T>;
// Try/finally block
TryFinally<T>(computation: IAsync<T>, compensation: () => void): IAsync<T>;
// Try/catch block
TryWith<T>(computation: IAsync<T>, catchHandler: (e: any) => IAsync<T>): IAsync<T>;
// Using block for resource management
Using<T extends IDisposable, U>(resource: T, binder: (x: T) => IAsync<U>): IAsync<U>;
// While loop
While(guard: () => boolean, computation: IAsync<void>): IAsync<void>;
// Return unit (void)
Zero(): IAsync<void>;
}
// Usage (typically used internally by F# compiler-generated code)
import { AsyncBuilder } from "fable-library/AsyncBuilder.js";
const builder = new AsyncBuilder();
// Example of builder operations (normally generated by F# compiler)
const asyncOperation = builder.Bind(
someAsyncComputation,
result => builder.Return(result * 2)
);// Protected continuation wrapper
function protectedCont<T>(f: IAsync<T>): IAsync<T>;
// Protected bind operation
function protectedBind<T, U>(computation: IAsync<T>, binder: (x: T) => IAsync<U>): IAsync<U>;
// Protected return
function protectedReturn<T>(value?: T): IAsync<T>;
// Singleton builder instance
const singleton: AsyncBuilder;
// Usage
import { protectedCont, protectedBind, singleton } from "fable-library/AsyncBuilder.js";
// These functions provide error handling and stack overflow protection
// for async operations, typically used by higher-level async functionsHigh-level async operations built on the AsyncBuilder infrastructure, providing practical async programming capabilities.
// Async computation class (for type reference)
class Async<T> {}
// Create async computation
function makeAsync<T>(body: IAsync<T>): IAsync<T>;
// Invoke async computation with context
function invoke<T>(computation: IAsync<T>, ctx: IAsyncContext<T>): void;
// Continuation-based operations
function callThenInvoke<T, U>(ctx: IAsyncContext<T>, result1: U, part2: (x: U) => IAsync<T>): void;
function bind<T, U>(ctx: IAsyncContext<T>, part1: IAsync<U>, part2: (x: U) => IAsync<T>): void;
// Usage
import { makeAsync, invoke } from "fable-library/Async.js";
// Create simple async computation
const simpleAsync = makeAsync<number>(ctx => {
// Simulate async operation
setTimeout(() => {
if (ctx.cancelToken.isCancelled) {
ctx.onCancel(new OperationCanceledError());
} else {
ctx.onSuccess(42);
}
}, 1000);
});// Create cancellation tokens
function createCancellationToken(arg?: boolean | number): CancellationToken;
// Cancel token
function cancel(token: CancellationToken): void;
// Cancel after timeout
function cancelAfter(token: CancellationToken, ms: number): void;
// Check cancellation status
function isCancellationRequested(token: CancellationToken): boolean;
// Get current cancellation token in async context
function cancellationToken(): IAsync<CancellationToken>;
// Default cancellation token
const defaultCancellationToken: CancellationToken;
// Usage
import {
createCancellationToken, cancel, cancelAfter,
isCancellationRequested, cancellationToken
} from "fable-library/Async.js";
// Create token that cancels after 5 seconds
const token = createCancellationToken();
cancelAfter(token, 5000);
// Check cancellation in async operation
const cancellableAsync = makeAsync<string>(ctx => {
const checkAndContinue = () => {
if (isCancellationRequested(ctx.cancelToken)) {
ctx.onCancel(new OperationCanceledError());
return;
}
// Continue with operation
setTimeout(() => ctx.onSuccess("Completed"), 1000);
};
checkAndContinue();
});
// Get current token within async workflow
const getCurrentToken = cancellationToken();// Convert Promise to async computation
function awaitPromise<T>(p: Promise<T>): IAsync<T>;
// Convert async computation to Promise
function startAsPromise<T>(computation: IAsync<T>, cancellationToken?: CancellationToken): Promise<T>;
// Usage
import { awaitPromise, startAsPromise } from "fable-library/Async.js";
// Use existing Promise in async workflow
const fetchDataAsync = awaitPromise(
fetch('/api/data').then(response => response.json())
);
// Convert F# async to Promise for JavaScript consumption
const asyncComputation = makeAsync<number>(ctx => {
setTimeout(() => ctx.onSuccess(42), 1000);
});
const promise = startAsPromise(asyncComputation);
promise
.then(result => console.log(`Result: ${result}`))
.catch(error => console.error(`Error: ${error}`));
// With cancellation
const token = createCancellationToken();
const cancellablePromise = startAsPromise(asyncComputation, token);
// Cancel after 500ms (before completion)
setTimeout(() => cancel(token), 500);
cancellablePromise.catch(error => {
if (error instanceof OperationCanceledError) {
console.log("Operation was cancelled");
}
});// Start child computation
function startChild<T>(computation: IAsync<T>): IAsync<IAsync<T>>;
// Catch exceptions and return Choice
function catchAsync<T>(work: IAsync<T>): IAsync<any>; // Returns Choice<T, exn>
// Create from continuations
function fromContinuations<T>(f: (conts: Continuations<T>) => void): IAsync<T>;
// Ignore result
function ignore<T>(computation: IAsync<T>): IAsync<void>;
// Run computations in parallel
function parallel<T>(computations: Iterable<IAsync<T>>): IAsync<T[]>;
// Usage
import {
startChild, catchAsync, fromContinuations,
ignore, parallel
} from "fable-library/Async.js";
// Start child computation for concurrent execution
const parentAsync = makeAsync<string>(ctx => {
const childAsync = makeAsync<number>(childCtx => {
setTimeout(() => childCtx.onSuccess(42), 2000);
});
// Start child and continue immediately
invoke(startChild(childAsync), {
onSuccess: (childHandle) => {
console.log("Child started, continuing parent...");
// Can await child later with: invoke(childHandle, ...)
ctx.onSuccess("Parent completed");
},
onError: ctx.onError,
onCancel: ctx.onCancel,
cancelToken: ctx.cancelToken,
trampoline: ctx.trampoline
});
});
// Catch exceptions safely
const safeAsync = catchAsync(makeAsync<number>(ctx => {
if (Math.random() > 0.5) {
throw new Error("Random failure");
}
ctx.onSuccess(42);
}));
// Create from continuation-passing style
const cpsAsync = fromContinuations<string>(([onSuccess, onError, onCancel]) => {
setTimeout(() => {
if (Math.random() > 0.5) {
onSuccess("Success!");
} else {
onError(new Error("CPS failure"));
}
}, 1000);
});
// Run multiple computations in parallel
const async1 = makeAsync<number>(ctx => setTimeout(() => ctx.onSuccess(1), 1000));
const async2 = makeAsync<number>(ctx => setTimeout(() => ctx.onSuccess(2), 1500));
const async3 = makeAsync<number>(ctx => setTimeout(() => ctx.onSuccess(3), 800));
const parallelResults = parallel([async1, async2, async3]);
// Results will be [1, 2, 3] after ~1500ms (when slowest completes)// Sleep for specified milliseconds
function sleep(millisecondsDueTime: number): IAsync<void>;
// Usage
import { sleep } from "fable-library/Async.js";
// Create async operation with delay
const delayedOperation = makeAsync<string>(async ctx => {
console.log("Starting operation...");
// Wait 2 seconds
await invoke(sleep(2000), {
onSuccess: () => {
console.log("Delay completed, finishing operation...");
ctx.onSuccess("Operation completed after delay");
},
onError: ctx.onError,
onCancel: ctx.onCancel,
cancelToken: ctx.cancelToken,
trampoline: ctx.trampoline
});
});
// Timeout pattern
const timeoutAsync = <T>(computation: IAsync<T>, timeoutMs: number): IAsync<T> => {
return makeAsync<T>(ctx => {
let completed = false;
// Start the computation
invoke(computation, {
onSuccess: (result) => {
if (!completed) {
completed = true;
ctx.onSuccess(result);
}
},
onError: (error) => {
if (!completed) {
completed = true;
ctx.onError(error);
}
},
onCancel: ctx.onCancel,
cancelToken: ctx.cancelToken,
trampoline: ctx.trampoline
});
// Start timeout
setTimeout(() => {
if (!completed) {
completed = true;
ctx.onError(new Error(`Operation timed out after ${timeoutMs}ms`));
}
}, timeoutMs);
});
};// Start async computation (fire and forget)
function start<T>(computation: IAsync<void>, cancellationToken?: CancellationToken): void;
// Start immediately without trampoline
function startImmediate(computation: IAsync<void>, cancellationToken?: CancellationToken): void;
// Start with explicit continuations
function startWithContinuations<T>(
computation: IAsync<T>,
continuation?: Continuation<T> | CancellationToken,
exceptionContinuation?: Continuation<any>,
cancellationContinuation?: Continuation<any>,
cancelToken?: CancellationToken
): void;
// Usage
import { start, startImmediate, startWithContinuations } from "fable-library/Async.js";
// Fire and forget execution
const backgroundTask = makeAsync<void>(ctx => {
setTimeout(() => {
console.log("Background task completed");
ctx.onSuccess(undefined);
}, 3000);
});
start(backgroundTask);
// Start with explicit error handling
const monitoredTask = makeAsync<string>(ctx => {
setTimeout(() => {
if (Math.random() > 0.5) {
ctx.onSuccess("Success!");
} else {
ctx.onError(new Error("Task failed"));
}
}, 1000);
});
startWithContinuations(
monitoredTask,
result => console.log(`Task result: ${result}`),
error => console.error(`Task error: ${error.message}`),
() => console.log("Task was cancelled")
);Implementation of F# MailboxProcessor for actor-based concurrent programming, providing type-safe message passing and state management.
// Actor body function type
type MailboxBody<Msg> = (m: MailboxProcessor<Msg>) => IAsync<void>;// Reply channel for request-response patterns
interface AsyncReplyChannel<Reply> {
reply: (r: Reply) => void;
}
// Usage in message types
interface GetValueMessage {
type: 'getValue';
replyChannel: AsyncReplyChannel<number>;
}
interface SetValueMessage {
type: 'setValue';
value: number;
}
type ActorMessage = GetValueMessage | SetValueMessage;// Internal queue cell (implementation detail)
class QueueCell<Msg> {
// Internal queue structure
}
// Thread-safe message queue
class MailboxQueue<Msg> {
// Internal queue operations
}class MailboxProcessor<Msg> {
constructor(body: MailboxBody<Msg>, cancellationToken?: CancellationToken);
// Properties
body: MailboxBody<Msg>;
cancellationToken: CancellationToken;
messages: MailboxQueue<Msg>;
continuation: Continuation<Msg>;
// Message operations
Post(message: Msg): void;
// Request-response operations
PostAndReply<Reply>(buildMessage: (reply: AsyncReplyChannel<Reply>) => Msg): IAsync<Reply>;
PostAndTryAsyncReply<Reply>(
buildMessage: (reply: AsyncReplyChannel<Reply>) => Msg,
timeout?: number
): IAsync<Reply | null>;
// Message receiving
Receive(timeout?: number): IAsync<Msg>;
Scan<T>(scanner: (msg: Msg) => IAsync<T> | null, timeout?: number): IAsync<T>;
// Lifecycle
Start(): void;
// Static factory
static Start<Msg>(body: MailboxBody<Msg>, cancellationToken?: CancellationToken): MailboxProcessor<Msg>;
}
// Usage
import { MailboxProcessor } from "fable-library/MailboxProcessor.js";
// Define message types
interface CounterMessage {
type: 'increment' | 'decrement' | 'get';
replyChannel?: AsyncReplyChannel<number>;
}
// Create actor
const counter = MailboxProcessor.Start<CounterMessage>(async mailbox => {
let count = 0;
while (true) {
const msg = await mailbox.Receive();
switch (msg.type) {
case 'increment':
count++;
break;
case 'decrement':
count--;
break;
case 'get':
if (msg.replyChannel) {
msg.replyChannel.reply(count);
}
break;
}
}
});
// Send messages
counter.Post({ type: 'increment' });
counter.Post({ type: 'increment' });
counter.Post({ type: 'decrement' });
// Request-response
const currentValue = await counter.PostAndReply<number>(reply => ({
type: 'get',
replyChannel: reply
}));
console.log(`Current count: ${currentValue}`); // Current count: 1interface StateMachineMessage {
type: 'start' | 'stop' | 'pause' | 'resume' | 'getState';
replyChannel?: AsyncReplyChannel<string>;
}
const stateMachine = MailboxProcessor.Start<StateMachineMessage>(async mailbox => {
let state: 'stopped' | 'running' | 'paused' = 'stopped';
while (true) {
const msg = await mailbox.Receive();
switch (msg.type) {
case 'start':
if (state === 'stopped') {
state = 'running';
console.log("State machine started");
}
break;
case 'stop':
state = 'stopped';
console.log("State machine stopped");
break;
case 'pause':
if (state === 'running') {
state = 'paused';
console.log("State machine paused");
}
break;
case 'resume':
if (state === 'paused') {
state = 'running';
console.log("State machine resumed");
}
break;
case 'getState':
if (msg.replyChannel) {
msg.replyChannel.reply(state);
}
break;
}
}
});
// Use the state machine
stateMachine.Post({ type: 'start' });
stateMachine.Post({ type: 'pause' });
stateMachine.Post({ type: 'resume' });
const currentState = await stateMachine.PostAndReply<string>(reply => ({
type: 'getState',
replyChannel: reply
}));
console.log(`Current state: ${currentState}`);interface WorkItem {
id: number;
data: any;
replyChannel: AsyncReplyChannel<any>;
}
interface PoolMessage {
type: 'addWork' | 'workerAvailable';
workItem?: WorkItem;
workerId?: number;
}
const workerPool = MailboxProcessor.Start<PoolMessage>(async mailbox => {
const pendingWork: WorkItem[] = [];
const availableWorkers: number[] = [];
let nextWorkId = 1;
// Initialize workers
for (let i = 1; i <= 3; i++) {
availableWorkers.push(i);
}
const assignWork = () => {
while (pendingWork.length > 0 && availableWorkers.length > 0) {
const work = pendingWork.shift()!;
const workerId = availableWorkers.shift()!;
console.log(`Assigning work ${work.id} to worker ${workerId}`);
// Simulate work completion
setTimeout(() => {
work.replyChannel.reply(`Work ${work.id} completed by worker ${workerId}`);
// Worker becomes available again
mailbox.Post({ type: 'workerAvailable', workerId });
}, Math.random() * 2000 + 500);
}
};
while (true) {
const msg = await mailbox.Receive();
switch (msg.type) {
case 'addWork':
if (msg.workItem) {
pendingWork.push(msg.workItem);
assignWork();
}
break;
case 'workerAvailable':
if (msg.workerId) {
availableWorkers.push(msg.workerId);
assignWork();
}
break;
}
}
});
// Submit work items
for (let i = 1; i <= 5; i++) {
const result = await workerPool.PostAndReply<string>(reply => ({
type: 'addWork',
workItem: {
id: i,
data: `Task ${i}`,
replyChannel: reply
}
}));
console.log(`Result: ${result}`);
}// Use Scan to selectively process messages
const selectiveProcessor = MailboxProcessor.Start<{type: string, priority: number, data: any}>(async mailbox => {
while (true) {
// Process high-priority messages first
const highPriorityMsg = await mailbox.Scan(msg => {
if (msg.priority >= 5) {
return makeAsync<{type: string, priority: number, data: any}>(ctx => {
ctx.onSuccess(msg);
});
}
return null;
});
console.log(`Processing high-priority message: ${highPriorityMsg.type}`);
// Then process any remaining message
const anyMsg = await mailbox.Receive(100); // 100ms timeout
if (anyMsg) {
console.log(`Processing normal message: ${anyMsg.type}`);
}
}
});
// Send mixed priority messages
selectiveProcessor.Post({ type: 'normal', priority: 2, data: 'low priority' });
selectiveProcessor.Post({ type: 'urgent', priority: 8, data: 'high priority' });
selectiveProcessor.Post({ type: 'regular', priority: 3, data: 'medium priority' });
// Output: "Processing high-priority message: urgent" comes first.NET-compatible timer implementation with event-based notifications and precise interval control.
class Timer implements IDisposable {
constructor(interval?: number);
// Properties
Interval: number; // Timer interval in milliseconds
AutoReset: boolean; // Whether timer repeats
readonly Enabled: boolean; // Current enabled state
readonly Elapsed: Event<Date>; // Elapsed event
// Methods
Dispose(): void; // Stop and cleanup timer
Close(): void; // Alias for Dispose
Start(): void; // Start the timer
Stop(): void; // Stop the timer
// Enabled property setter
set Enabled(value: boolean);
}
// Usage
import { Timer } from "fable-library/Timer.js";
import { add } from "fable-library/Event.js";
// Create timer that fires every 1 second
const timer = new Timer(1000);
// Subscribe to elapsed events
add(date => {
console.log(`Timer elapsed at: ${date.toISOString()}`);
}, timer.Elapsed);
// Configure timer
timer.AutoReset = true; // Repeat automatically
timer.Enabled = true; // Start timer
// Alternative: use Start() method
// timer.Start();
// Stop after 5 seconds
setTimeout(() => {
timer.Stop();
console.log("Timer stopped");
// Cleanup resources
timer.Dispose();
}, 5000);// Timer that fires only once
const oneShot = new Timer(2000);
oneShot.AutoReset = false; // Don't repeat
add(date => {
console.log("One-shot timer fired!");
oneShot.Dispose(); // Clean up
}, oneShot.Elapsed);
oneShot.Start();// Execute task every 30 seconds
const taskTimer = new Timer(30000);
let taskCount = 0;
add(date => {
taskCount++;
console.log(`Executing periodic task #${taskCount} at ${date}`);
// Simulate task work
performPeriodicTask()
.then(() => console.log(`Task #${taskCount} completed`))
.catch(error => console.error(`Task #${taskCount} failed:`, error));
// Stop after 10 executions
if (taskCount >= 10) {
taskTimer.Stop();
taskTimer.Dispose();
console.log("Periodic task timer stopped");
}
}, taskTimer.Elapsed);
async function performPeriodicTask(): Promise<void> {
// Simulate async work
await new Promise(resolve => setTimeout(resolve, 1000));
// Task logic here
}
taskTimer.Start();// Watchdog timer that must be reset periodically
class WatchdogTimer {
private timer: Timer;
private onTimeout: () => void;
constructor(timeoutMs: number, onTimeout: () => void) {
this.onTimeout = onTimeout;
this.timer = new Timer(timeoutMs);
this.timer.AutoReset = false; // One-shot behavior
add(date => {
console.log(`Watchdog timeout at ${date}`);
this.onTimeout();
}, this.timer.Elapsed);
}
start(): void {
this.timer.Start();
}
reset(): void {
this.timer.Stop();
this.timer.Start(); // Restart the countdown
}
stop(): void {
this.timer.Stop();
}
dispose(): void {
this.timer.Dispose();
}
}
// Usage
const watchdog = new WatchdogTimer(5000, () => {
console.log("System watchdog timeout - taking recovery action!");
// Recovery logic here
});
watchdog.start();
// Simulate system activity that resets the watchdog
const activitySimulator = setInterval(() => {
console.log("System activity detected, resetting watchdog");
watchdog.reset();
}, Math.random() * 3000 + 1000); // Reset every 1-4 seconds
// Stop simulation after 20 seconds
setTimeout(() => {
clearInterval(activitySimulator);
watchdog.stop();
watchdog.dispose();
console.log("Watchdog simulation ended");
}, 20000);// Rate limiter using timer
class RateLimiter {
private timer: Timer;
private queue: (() => void)[] = [];
private isProcessing = false;
constructor(intervalMs: number) {
this.timer = new Timer(intervalMs);
this.timer.AutoReset = true;
add(date => {
this.processNext();
}, this.timer.Elapsed);
this.timer.Start();
}
enqueue(action: () => void): void {
this.queue.push(action);
// Process immediately if not currently processing
if (!this.isProcessing) {
this.processNext();
}
}
private processNext(): void {
if (this.queue.length > 0) {
this.isProcessing = true;
const action = this.queue.shift()!;
try {
action();
} catch (error) {
console.error("Rate limited action failed:", error);
} finally {
this.isProcessing = false;
}
}
}
dispose(): void {
this.timer.Dispose();
this.queue.length = 0; // Clear queue
}
}
// Usage: limit API calls to once per second
const apiRateLimiter = new RateLimiter(1000);
// Queue multiple API calls
for (let i = 1; i <= 5; i++) {
apiRateLimiter.enqueue(() => {
console.log(`Making API call #${i} at ${new Date().toISOString()}`);
// Simulate API call
});
}
// Calls will be executed one per second
setTimeout(() => {
apiRateLimiter.dispose();
}, 10000);// Convert Timer to async computation
function timerToAsync(intervalMs: number): IAsync<Date> {
return makeAsync<Date>(ctx => {
const timer = new Timer(intervalMs);
timer.AutoReset = false; // One-shot
const subscription = add(date => {
timer.Dispose();
ctx.onSuccess(date);
}, timer.Elapsed);
// Handle cancellation
const cancelRegistration = ctx.cancelToken.register(() => {
timer.Dispose();
ctx.onCancel(new OperationCanceledError());
});
timer.Start();
});
}
// Usage in async workflow
const timedOperation = makeAsync<string>(async ctx => {
console.log("Starting timed operation...");
// Wait for timer
const elapsedDate = await timerToAsync(3000);
console.log(`Timer fired at: ${elapsedDate}`);
ctx.onSuccess("Timed operation completed");
});
// Execute
startAsPromise(timedOperation)
.then(result => console.log(result))
.catch(error => console.error(error));