Advanced request management for handling concurrent operations while maintaining proper ordering. Provides stream-based interfaces and queue implementations for producer/consumer scenarios.
Manages concurrent fetching of sequential data with proper ordering, retry logic, and throttling.
/**
* Manages concurrent fetching of sequential data with proper ordering
*/
class ParallelRequests<T> {
/**
* @param from - Starting index for data range
* @param to - Ending index for data range (undefined for open-ended)
* @param payloadSize - Size of each request batch
* @param logger - Telemetry logger for monitoring
* @param requestCallback - Function to make individual requests
* @param responseCallback - Function to handle responses
*/
constructor(
from: number,
to: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
requestCallback: (
request: number,
from: number,
to: number,
strongTo: boolean,
props: ITelemetryBaseProperties
) => Promise<{ partial: boolean; cancel: boolean; payload: T[] }>,
responseCallback: (payload: T[]) => void
);
/**
* Execute parallel requests with specified concurrency
* @param concurrency - Maximum number of concurrent requests
* @returns Promise that resolves when all requests complete
*/
run(concurrency: number): Promise<void>;
/**
* Cancel all pending and in-flight requests
*/
cancel(): void;
}Usage Example:
import { ParallelRequests } from "@fluidframework/driver-utils";
// Fetch ops in parallel while maintaining order
const parallelRequests = new ParallelRequests(
0, // Start from op 0
1000, // Fetch through op 1000
50, // 50 ops per request
logger,
// Request callback
async (from, to, telemetryProps) => {
return await storageService.getMessages(from, to);
},
// Response callback - called in order
(from, messages) => {
processMessages(messages);
}
);
// Run with concurrency of 3
await parallelRequests.run(3);Async-safe queue implementation for producer/consumer scenarios with stream interface.
/**
* Async-safe queue implementation for producer/consumer scenarios
*/
class Queue<T> implements IStream<T> {
/**
* Add a value to the queue
* @param value - Value to add
*/
pushValue(value: T): void;
/**
* Add an error to the queue, terminating the stream
* @param error - Error to propagate to consumers
*/
pushError(error: any): void;
/**
* Mark the queue as complete, no more values will be added
*/
pushDone(): void;
/**
* Read the next value from the queue
* @returns Promise resolving to stream result
*/
read(): Promise<IStreamResult<T>>;
}Usage Example:
import { Queue } from "@fluidframework/driver-utils";
const queue = new Queue<string>();
// Producer
queue.pushValue("item1");
queue.pushValue("item2");
queue.pushDone();
// Consumer
while (true) {
const result = await queue.read();
if (result.done) break;
if (result.value) {
console.log("Received:", result.value);
}
}High-level functions for common request patterns with built-in concurrency and error handling.
/**
* High-level function to fetch ops from storage with concurrency and retry logic
* @param get - Function to fetch ops for a given range
* @param concurrency - Maximum concurrent requests
* @param fromTotal - Starting op sequence number
* @param toTotal - Ending op sequence number (undefined for all available)
* @param payloadSize - Number of ops per request
* @param logger - Telemetry logger
* @param signal - Optional abort signal for cancellation
* @param scenarioName - Optional scenario name for telemetry
* @returns Stream of message arrays in correct order
*/
function requestOps(
get: (from: number, to: number, telemetryProps: ITelemetryBaseProperties) => Promise<ISequencedDocumentMessage[]>,
concurrency: number,
fromTotal: number,
toTotal: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
signal?: AbortSignal,
scenarioName?: string
): IStream<ISequencedDocumentMessage[]>;Usage Example:
import { requestOps } from "@fluidframework/driver-utils";
// Fetch ops 0-999 with concurrency 3, 100 ops per request
const opsStream = requestOps(
async (from, to, telemetryProps) => {
return await storage.getMessages(from, to);
},
3, // concurrency
0, // from
999, // to
100, // payload size
logger,
abortSignal,
"loadDocument"
);
// Process the stream
while (true) {
const result = await opsStream.read();
if (result.done) break;
if (result.value) {
processOps(result.value);
}
}Utility functions for working with streams and converting between different data formats.
/**
* Pre-configured empty stream that immediately returns done
*/
const emptyMessageStream: IStream<ISequencedDocumentMessage[]>;
/**
* Converts promise of messages array to single-use stream
* @param messagesArg - Promise resolving to message array
* @returns Stream yielding the message array once
*/
function streamFromMessages(
messagesArg: Promise<ISequencedDocumentMessage[]>
): IStream<ISequencedDocumentMessage[]>;
/**
* Stream decorator that calls handler for each read operation
* @param stream - Source stream to observe
* @param handler - Function called for each stream result
* @returns New stream that forwards all operations while calling handler
*/
function streamObserver<T>(
stream: IStream<T>,
handler: (value: IStreamResult<T>) => void
): IStream<T>;Usage Examples:
import {
emptyMessageStream,
streamFromMessages,
streamObserver
} from "@fluidframework/driver-utils";
// Use empty stream for no-op scenarios
const noOpsStream = emptyMessageStream;
// Convert promise to stream
const messagesPromise = fetchMessagesFromAPI();
const messageStream = streamFromMessages(messagesPromise);
// Observe stream operations
const observedStream = streamObserver(messageStream, (result) => {
if (result.value) {
console.log(`Received ${result.value.length} messages`);
} else if (result.done) {
console.log("Stream completed");
}
});import { ParallelRequests, Queue } from "@fluidframework/driver-utils";
class DocumentLoader {
private async loadDocumentOps(
from: number,
to: number,
concurrency: number = 3
): Promise<ISequencedDocumentMessage[]> {
const resultQueue = new Queue<ISequencedDocumentMessage[]>();
const allMessages: ISequencedDocumentMessage[] = [];
const requests = new ParallelRequests(
from, to, 50, this.logger,
// Request function
async (rangeFrom, rangeTo, telemetryProps) => {
try {
return await this.storage.getMessages(rangeFrom, rangeTo);
} catch (error) {
this.logger.sendErrorEvent({ eventName: "LoadOpsFailed" }, error);
throw error;
}
},
// Response handler - maintains order
(rangeFrom, messages) => {
allMessages.push(...messages);
resultQueue.pushValue(messages);
}
);
try {
await requests.run(concurrency);
resultQueue.pushDone();
return allMessages;
} catch (error) {
resultQueue.pushError(error);
throw error;
}
}
}import {
Queue,
streamObserver,
IStream,
IStreamResult
} from "@fluidframework/driver-utils";
class StreamProcessor<T, U> {
constructor(
private transformer: (input: T) => U,
private batchSize: number = 10
) {}
process(inputStream: IStream<T>): IStream<U[]> {
const outputQueue = new Queue<U[]>();
let batch: U[] = [];
const observedStream = streamObserver(inputStream, (result) => {
if (result.value !== undefined) {
batch.push(this.transformer(result.value));
if (batch.length >= this.batchSize) {
outputQueue.pushValue([...batch]);
batch = [];
}
} else if (result.done) {
if (batch.length > 0) {
outputQueue.pushValue(batch);
}
outputQueue.pushDone();
} else if (result.error) {
outputQueue.pushError(result.error);
}
});
// Start processing
this.consumeStream(observedStream);
return outputQueue;
}
private async consumeStream(stream: IStream<T>): Promise<void> {
try {
while (true) {
const result = await stream.read();
if (result.done) break;
}
} catch (error) {
// Error handling is done in the observer
}
}
}