or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing-summary.mdindex.mdnetwork-utilities.mdprotocol-message-utilities.mdrequest-management.mdretry-rate-limiting.mdstorage-services.mdtree-blob-utilities.mdurl-compression.md
tile.json

request-management.mddocs/

Request Management and Parallel Operations

Advanced request management for handling concurrent operations while maintaining proper ordering. Provides stream-based interfaces and queue implementations for producer/consumer scenarios.

Capabilities

Parallel Request Management

Manages concurrent fetching of sequential data with proper ordering, retry logic, and throttling.

/**
 * Manages concurrent fetching of sequential data with proper ordering
 */
class ParallelRequests<T> {
  /**
   * @param from - Starting index for data range
   * @param to - Ending index for data range (undefined for open-ended)
   * @param payloadSize - Size of each request batch
   * @param logger - Telemetry logger for monitoring
   * @param requestCallback - Function to make individual requests
   * @param responseCallback - Function to handle responses
   */
  constructor(
    from: number,
    to: number | undefined,
    payloadSize: number,
    logger: ITelemetryLoggerExt,
    requestCallback: (
      request: number,
      from: number,
      to: number,
      strongTo: boolean,
      props: ITelemetryBaseProperties
    ) => Promise<{ partial: boolean; cancel: boolean; payload: T[] }>,
    responseCallback: (payload: T[]) => void
  );

  /**
   * Execute parallel requests with specified concurrency
   * @param concurrency - Maximum number of concurrent requests
   * @returns Promise that resolves when all requests complete
   */
  run(concurrency: number): Promise<void>;

  /**
   * Cancel all pending and in-flight requests
   */
  cancel(): void;
}

Usage Example:

import { ParallelRequests } from "@fluidframework/driver-utils";

// Fetch ops in parallel while maintaining order
const parallelRequests = new ParallelRequests(
  0,           // Start from op 0
  1000,        // Fetch through op 1000
  50,          // 50 ops per request
  logger,
  // Request callback
  async (from, to, telemetryProps) => {
    return await storageService.getMessages(from, to);
  },
  // Response callback - called in order
  (from, messages) => {
    processMessages(messages);
  }
);

// Run with concurrency of 3
await parallelRequests.run(3);

Stream-based Queue

Async-safe queue implementation for producer/consumer scenarios with stream interface.

/**
 * Async-safe queue implementation for producer/consumer scenarios
 */
class Queue<T> implements IStream<T> {
  /**
   * Add a value to the queue
   * @param value - Value to add
   */
  pushValue(value: T): void;

  /**
   * Add an error to the queue, terminating the stream
   * @param error - Error to propagate to consumers
   */
  pushError(error: any): void;

  /**
   * Mark the queue as complete, no more values will be added
   */
  pushDone(): void;

  /**
   * Read the next value from the queue
   * @returns Promise resolving to stream result
   */
  read(): Promise<IStreamResult<T>>;
}

Usage Example:

import { Queue } from "@fluidframework/driver-utils";

const queue = new Queue<string>();

// Producer
queue.pushValue("item1");
queue.pushValue("item2");
queue.pushDone();

// Consumer
while (true) {
  const result = await queue.read();
  if (result.done) break;
  
  if (result.value) {
    console.log("Received:", result.value);
  }
}

High-level Request Functions

High-level functions for common request patterns with built-in concurrency and error handling.

/**
 * High-level function to fetch ops from storage with concurrency and retry logic
 * @param get - Function to fetch ops for a given range
 * @param concurrency - Maximum concurrent requests
 * @param fromTotal - Starting op sequence number
 * @param toTotal - Ending op sequence number (undefined for all available)
 * @param payloadSize - Number of ops per request
 * @param logger - Telemetry logger
 * @param signal - Optional abort signal for cancellation
 * @param scenarioName - Optional scenario name for telemetry
 * @returns Stream of message arrays in correct order
 */
function requestOps(
  get: (from: number, to: number, telemetryProps: ITelemetryBaseProperties) => Promise<ISequencedDocumentMessage[]>,
  concurrency: number,
  fromTotal: number,
  toTotal: number | undefined,
  payloadSize: number,
  logger: ITelemetryLoggerExt,
  signal?: AbortSignal,
  scenarioName?: string
): IStream<ISequencedDocumentMessage[]>;

Usage Example:

import { requestOps } from "@fluidframework/driver-utils";

// Fetch ops 0-999 with concurrency 3, 100 ops per request
const opsStream = requestOps(
  async (from, to, telemetryProps) => {
    return await storage.getMessages(from, to);
  },
  3,      // concurrency
  0,      // from
  999,    // to
  100,    // payload size
  logger,
  abortSignal,
  "loadDocument"
);

// Process the stream
while (true) {
  const result = await opsStream.read();
  if (result.done) break;
  
  if (result.value) {
    processOps(result.value);
  }
}

Stream Utilities

Utility functions for working with streams and converting between different data formats.

/**
 * Pre-configured empty stream that immediately returns done
 */
const emptyMessageStream: IStream<ISequencedDocumentMessage[]>;

/**
 * Converts promise of messages array to single-use stream
 * @param messagesArg - Promise resolving to message array
 * @returns Stream yielding the message array once
 */
function streamFromMessages(
  messagesArg: Promise<ISequencedDocumentMessage[]>
): IStream<ISequencedDocumentMessage[]>;

/**
 * Stream decorator that calls handler for each read operation
 * @param stream - Source stream to observe
 * @param handler - Function called for each stream result
 * @returns New stream that forwards all operations while calling handler
 */
function streamObserver<T>(
  stream: IStream<T>,
  handler: (value: IStreamResult<T>) => void
): IStream<T>;

Usage Examples:

import { 
  emptyMessageStream, 
  streamFromMessages, 
  streamObserver 
} from "@fluidframework/driver-utils";

// Use empty stream for no-op scenarios
const noOpsStream = emptyMessageStream;

// Convert promise to stream
const messagesPromise = fetchMessagesFromAPI();
const messageStream = streamFromMessages(messagesPromise);

// Observe stream operations
const observedStream = streamObserver(messageStream, (result) => {
  if (result.value) {
    console.log(`Received ${result.value.length} messages`);
  } else if (result.done) {
    console.log("Stream completed");
  }
});

Advanced Patterns

Request Coordination with Error Handling

import { ParallelRequests, Queue } from "@fluidframework/driver-utils";

class DocumentLoader {
  private async loadDocumentOps(
    from: number, 
    to: number, 
    concurrency: number = 3
  ): Promise<ISequencedDocumentMessage[]> {
    const resultQueue = new Queue<ISequencedDocumentMessage[]>();
    const allMessages: ISequencedDocumentMessage[] = [];
    
    const requests = new ParallelRequests(
      from, to, 50, this.logger,
      // Request function
      async (rangeFrom, rangeTo, telemetryProps) => {
        try {
          return await this.storage.getMessages(rangeFrom, rangeTo);
        } catch (error) {
          this.logger.sendErrorEvent({ eventName: "LoadOpsFailed" }, error);
          throw error;
        }
      },
      // Response handler - maintains order
      (rangeFrom, messages) => {
        allMessages.push(...messages);
        resultQueue.pushValue(messages);
      }
    );
    
    try {
      await requests.run(concurrency);
      resultQueue.pushDone();
      return allMessages;
    } catch (error) {
      resultQueue.pushError(error);
      throw error;
    }
  }
}

Stream Processing Pipeline

import { 
  Queue, 
  streamObserver, 
  IStream, 
  IStreamResult 
} from "@fluidframework/driver-utils";

class StreamProcessor<T, U> {
  constructor(
    private transformer: (input: T) => U,
    private batchSize: number = 10
  ) {}
  
  process(inputStream: IStream<T>): IStream<U[]> {
    const outputQueue = new Queue<U[]>();
    let batch: U[] = [];
    
    const observedStream = streamObserver(inputStream, (result) => {
      if (result.value !== undefined) {
        batch.push(this.transformer(result.value));
        
        if (batch.length >= this.batchSize) {
          outputQueue.pushValue([...batch]);
          batch = [];
        }
      } else if (result.done) {
        if (batch.length > 0) {
          outputQueue.pushValue(batch);
        }
        outputQueue.pushDone();
      } else if (result.error) {
        outputQueue.pushError(result.error);
      }
    });
    
    // Start processing
    this.consumeStream(observedStream);
    
    return outputQueue;
  }
  
  private async consumeStream(stream: IStream<T>): Promise<void> {
    try {
      while (true) {
        const result = await stream.read();
        if (result.done) break;
      }
    } catch (error) {
      // Error handling is done in the observer
    }
  }
}