Collection of utility functions for Fluid drivers
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Advanced request management for handling concurrent operations while maintaining proper ordering. Provides stream-based interfaces and queue implementations for producer/consumer scenarios.
Manages concurrent fetching of sequential data with proper ordering, retry logic, and throttling.
/**
* Manages concurrent fetching of sequential data with proper ordering
*/
class ParallelRequests<T> {
/**
* @param from - Starting index for data range
* @param to - Ending index for data range (undefined for open-ended)
* @param payloadSize - Size of each request batch
* @param logger - Telemetry logger for monitoring
* @param requestCallback - Function to make individual requests
* @param responseCallback - Function to handle responses
*/
constructor(
from: number,
to: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
requestCallback: (
request: number,
from: number,
to: number,
strongTo: boolean,
props: ITelemetryBaseProperties
) => Promise<{ partial: boolean; cancel: boolean; payload: T[] }>,
responseCallback: (payload: T[]) => void
);
/**
* Execute parallel requests with specified concurrency
* @param concurrency - Maximum number of concurrent requests
* @returns Promise that resolves when all requests complete
*/
run(concurrency: number): Promise<void>;
/**
* Cancel all pending and in-flight requests
*/
cancel(): void;
}Usage Example:
import { ParallelRequests } from "@fluidframework/driver-utils";
// Fetch ops in parallel while maintaining order
const parallelRequests = new ParallelRequests(
0, // Start from op 0
1000, // Fetch through op 1000
50, // 50 ops per request
logger,
// Request callback
async (from, to, telemetryProps) => {
return await storageService.getMessages(from, to);
},
// Response callback - called in order
(from, messages) => {
processMessages(messages);
}
);
// Run with concurrency of 3
await parallelRequests.run(3);Async-safe queue implementation for producer/consumer scenarios with stream interface.
/**
* Async-safe queue implementation for producer/consumer scenarios
*/
class Queue<T> implements IStream<T> {
/**
* Add a value to the queue
* @param value - Value to add
*/
pushValue(value: T): void;
/**
* Add an error to the queue, terminating the stream
* @param error - Error to propagate to consumers
*/
pushError(error: any): void;
/**
* Mark the queue as complete, no more values will be added
*/
pushDone(): void;
/**
* Read the next value from the queue
* @returns Promise resolving to stream result
*/
read(): Promise<IStreamResult<T>>;
}Usage Example:
import { Queue } from "@fluidframework/driver-utils";
const queue = new Queue<string>();
// Producer
queue.pushValue("item1");
queue.pushValue("item2");
queue.pushDone();
// Consumer
while (true) {
const result = await queue.read();
if (result.done) break;
if (result.value) {
console.log("Received:", result.value);
}
}High-level functions for common request patterns with built-in concurrency and error handling.
/**
* High-level function to fetch ops from storage with concurrency and retry logic
* @param get - Function to fetch ops for a given range
* @param concurrency - Maximum concurrent requests
* @param fromTotal - Starting op sequence number
* @param toTotal - Ending op sequence number (undefined for all available)
* @param payloadSize - Number of ops per request
* @param logger - Telemetry logger
* @param signal - Optional abort signal for cancellation
* @param scenarioName - Optional scenario name for telemetry
* @returns Stream of message arrays in correct order
*/
function requestOps(
get: (from: number, to: number, telemetryProps: ITelemetryBaseProperties) => Promise<ISequencedDocumentMessage[]>,
concurrency: number,
fromTotal: number,
toTotal: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
signal?: AbortSignal,
scenarioName?: string
): IStream<ISequencedDocumentMessage[]>;Usage Example:
import { requestOps } from "@fluidframework/driver-utils";
// Fetch ops 0-999 with concurrency 3, 100 ops per request
const opsStream = requestOps(
async (from, to, telemetryProps) => {
return await storage.getMessages(from, to);
},
3, // concurrency
0, // from
999, // to
100, // payload size
logger,
abortSignal,
"loadDocument"
);
// Process the stream
while (true) {
const result = await opsStream.read();
if (result.done) break;
if (result.value) {
processOps(result.value);
}
}Utility functions for working with streams and converting between different data formats.
/**
* Pre-configured empty stream that immediately returns done
*/
const emptyMessageStream: IStream<ISequencedDocumentMessage[]>;
/**
* Converts promise of messages array to single-use stream
* @param messagesArg - Promise resolving to message array
* @returns Stream yielding the message array once
*/
function streamFromMessages(
messagesArg: Promise<ISequencedDocumentMessage[]>
): IStream<ISequencedDocumentMessage[]>;
/**
* Stream decorator that calls handler for each read operation
* @param stream - Source stream to observe
* @param handler - Function called for each stream result
* @returns New stream that forwards all operations while calling handler
*/
function streamObserver<T>(
stream: IStream<T>,
handler: (value: IStreamResult<T>) => void
): IStream<T>;Usage Examples:
import {
emptyMessageStream,
streamFromMessages,
streamObserver
} from "@fluidframework/driver-utils";
// Use empty stream for no-op scenarios
const noOpsStream = emptyMessageStream;
// Convert promise to stream
const messagesPromise = fetchMessagesFromAPI();
const messageStream = streamFromMessages(messagesPromise);
// Observe stream operations
const observedStream = streamObserver(messageStream, (result) => {
if (result.value) {
console.log(`Received ${result.value.length} messages`);
} else if (result.done) {
console.log("Stream completed");
}
});import { ParallelRequests, Queue } from "@fluidframework/driver-utils";
class DocumentLoader {
private async loadDocumentOps(
from: number,
to: number,
concurrency: number = 3
): Promise<ISequencedDocumentMessage[]> {
const resultQueue = new Queue<ISequencedDocumentMessage[]>();
const allMessages: ISequencedDocumentMessage[] = [];
const requests = new ParallelRequests(
from, to, 50, this.logger,
// Request function
async (rangeFrom, rangeTo, telemetryProps) => {
try {
return await this.storage.getMessages(rangeFrom, rangeTo);
} catch (error) {
this.logger.sendErrorEvent({ eventName: "LoadOpsFailed" }, error);
throw error;
}
},
// Response handler - maintains order
(rangeFrom, messages) => {
allMessages.push(...messages);
resultQueue.pushValue(messages);
}
);
try {
await requests.run(concurrency);
resultQueue.pushDone();
return allMessages;
} catch (error) {
resultQueue.pushError(error);
throw error;
}
}
}import {
Queue,
streamObserver,
IStream,
IStreamResult
} from "@fluidframework/driver-utils";
class StreamProcessor<T, U> {
constructor(
private transformer: (input: T) => U,
private batchSize: number = 10
) {}
process(inputStream: IStream<T>): IStream<U[]> {
const outputQueue = new Queue<U[]>();
let batch: U[] = [];
const observedStream = streamObserver(inputStream, (result) => {
if (result.value !== undefined) {
batch.push(this.transformer(result.value));
if (batch.length >= this.batchSize) {
outputQueue.pushValue([...batch]);
batch = [];
}
} else if (result.done) {
if (batch.length > 0) {
outputQueue.pushValue(batch);
}
outputQueue.pushDone();
} else if (result.error) {
outputQueue.pushError(result.error);
}
});
// Start processing
this.consumeStream(observedStream);
return outputQueue;
}
private async consumeStream(stream: IStream<T>): Promise<void> {
try {
while (true) {
const result = await stream.read();
if (result.done) break;
}
} catch (error) {
// Error handling is done in the observer
}
}
}