Collection of utility functions for Fluid drivers that provides comprehensive networking, storage, and data processing utilities specifically designed for Fluid Framework driver implementations. It offers essential infrastructure for handling network errors with retry logic, managing parallel requests, proxying storage services, and performing data transformations.
npm install @fluidframework/driver-utilsimport {
NetworkErrorBasic,
ParallelRequests,
DocumentStorageServiceProxy,
buildSnapshotTree,
runWithRetry,
readAndParse
} from "@fluidframework/driver-utils";For legacy APIs:
import { /* legacy exports */ } from "@fluidframework/driver-utils/legacy";For internal APIs:
import { /* internal exports */ } from "@fluidframework/driver-utils/internal";import {
runWithRetry,
createGenericNetworkError,
ParallelRequests,
DocumentStorageServiceProxy
} from "@fluidframework/driver-utils";
// Retry an operation with exponential backoff
const result = await runWithRetry(
async () => await someNetworkOperation(),
"fetchOperation",
logger,
{ cancel: abortSignal }
);
// Create a network error with retry capability
const error = createGenericNetworkError(
"Network request failed",
{ canRetry: true, retryAfterMs: 5000 },
{ driverVersion: "2.60.0" }
);
// Manage parallel requests for sequential data
const parallelRequests = new ParallelRequests(
0, 100, 50, logger,
requestCallback, responseCallback
);
await parallelRequests.run(3); // Run with concurrency of 3
// Proxy a storage service
const proxy = new DocumentStorageServiceProxy(internalStorageService);
const snapshot = await proxy.getSnapshot();@fluidframework/driver-utils is built around several key architectural components:
Comprehensive network error handling with typed error classes, retry logic, and network status detection. Essential for robust driver implementations that need to handle various network conditions and service errors.
function isOnline(): OnlineStatus;
function canRetryOnError(error: any): boolean;
function createGenericNetworkError(
message: string,
retryInfo: {canRetry: boolean; retryAfterMs?: number},
props: DriverErrorTelemetryProps
): ThrottlingError | GenericNetworkError;
enum OnlineStatus {
Offline = 0,
Online = 1,
Unknown = 2
}
class GenericNetworkError extends LoggingError implements IDriverErrorBase {
constructor(message: string, canRetry: boolean, props: DriverErrorTelemetryProps);
}
class AuthorizationError extends LoggingError implements IAuthorizationError {
constructor(message: string, claims?: string, tenantId?: string, props: DriverErrorTelemetryProps);
}
class ThrottlingError extends LoggingError implements IThrottlingWarning {
constructor(message: string, retryAfterSeconds: number, props: DriverErrorTelemetryProps);
}Advanced request management for handling concurrent operations while maintaining proper ordering. Provides stream-based interfaces and queue implementations for producer/consumer scenarios.
class ParallelRequests<T> {
constructor(
from: number,
to: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
requestCallback: Function,
responseCallback: Function
);
run(concurrency: number): Promise<void>;
cancel(): void;
}
class Queue<T> implements IStream<T> {
pushValue(value: T): void;
pushError(error: any): void;
pushDone(): void;
read(): Promise<IStreamResult<T>>;
}
function requestOps(
get: Function,
concurrency: number,
fromTotal: number,
toTotal: number | undefined,
payloadSize: number,
logger: ITelemetryLoggerExt,
signal?: AbortSignal,
scenarioName?: string
): IStream<ISequencedDocumentMessage[]>;Storage service implementations using proxy patterns with optional intelligent prefetching. Provides abstraction layers for document storage operations.
class DocumentStorageServiceProxy implements IDocumentStorageService {
constructor(internalStorageService: IDocumentStorageService);
get policies(): IDocumentStorageServicePolicies | undefined;
set policies(policies: IDocumentStorageServicePolicies | undefined);
getSnapshotTree(version?: IVersion, scenarioName?: string): Promise<ISnapshotTree | null>;
getSnapshot(snapshotFetchOptions?: ISnapshotFetchOptions): Promise<ISnapshot>;
getVersions(versionId: string | null, count: number, scenarioName?: string, fetchSource?: FetchSource): Promise<IVersion[]>;
createBlob(file: ArrayBufferLike): Promise<ICreateBlobResponse>;
readBlob(id: string): Promise<ArrayBufferLike>;
}
class PrefetchDocumentStorageService extends DocumentStorageServiceProxy {
stopPrefetch(): void;
}Data structure utilities for converting between different tree formats and handling blob operations. Essential for working with Fluid Framework's tree-based data structures.
class BlobTreeEntry implements ITreeEntry {
constructor(path: string, contents: string, encoding?: "utf-8" | "base64");
readonly mode: FileMode.File;
readonly type: TreeEntry.Blob;
readonly value: IBlob;
}
function buildSnapshotTree(entries: ITreeEntry[], blobMap: Map<string, ArrayBufferLike>): ISnapshotTree;
function convertSummaryTreeToSnapshotITree(summaryTree: ISummaryTree): ITree;
function getSnapshotTree(tree: ISnapshotTree | ISnapshot): ISnapshotTree;
function isInstanceOfISnapshot(obj: ISnapshotTree | ISnapshot | undefined): obj is ISnapshot;Message recognition and Git protocol operations for Fluid Framework's underlying protocols. Handles message classification and Git tree operations.
function isRuntimeMessage(message: {type: string}): boolean;
function canBeCoalescedByService(message: ISequencedDocumentMessage | IDocumentMessage): boolean;
function buildGitTreeHierarchy(
flatTree: IGitTree,
blobsShaToPathCache?: Map<string, string>,
removeAppTreePrefix?: boolean
): ISnapshotTreeEx;
function getGitMode(value: SummaryObject): string;
function getGitType(value: SummaryObject): "blob" | "tree";Protocol and Message Utilities
Robust retry mechanisms with exponential backoff and rate limiting for managing concurrent operations. Critical for handling transient failures and respecting service limits.
function runWithRetry<T>(
api: (cancel?: AbortSignal) => Promise<T>,
fetchCallName: string,
logger: ITelemetryLoggerExt,
progress: IProgress
): Promise<T>;
function calculateMaxWaitTime(delayMs: number, error: unknown): number;
class RateLimiter {
constructor(maxRequests: number);
schedule<T>(work: () => Promise<T>): Promise<T>;
get waitQueueLength(): number;
}
interface IProgress {
cancel?: AbortSignal;
onRetry?(delayInMs: number, error: any): void;
}Utilities for parsing JSON data from storage and managing summary structures. Provides type-safe data processing and summary format handling.
function readAndParse<T>(
storage: Pick<IDocumentStorageService, "readBlob">,
id: string
): Promise<T>;
interface CombinedAppAndProtocolSummary extends ISummaryTree {
tree: {
".app": ISummaryTree;
".protocol": ISummaryTree;
};
}
function isCombinedAppAndProtocolSummary(
summary: ISummaryTree | undefined,
...optionalRootTrees: string[]
): summary is CombinedAppAndProtocolSummary;
function getDocAttributesFromProtocolSummary(protocolSummary: ISummaryTree): IDocumentAttributes;
function getQuorumValuesFromProtocolSummary(protocolSummary: ISummaryTree): [string, ICommittedProposal][];Data Processing and Summary Management
Development utilities for URL resolution and optional compression support for storage operations. Includes compression configuration and factory adapters.
class InsecureUrlResolver implements IUrlResolver {
constructor(
hostUrl: string,
ordererUrl: string,
storageUrl: string,
deltaStreamUrl: string,
tenantId: string,
bearer: string,
isForNodeTest?: boolean
);
resolve(request: IRequest): Promise<IResolvedUrl>;
getAbsoluteUrl(resolvedUrl: IResolvedUrl, relativeUrl: string): Promise<string>;
createCreateNewRequest(fileName?: string): IRequest;
}
enum SummaryCompressionAlgorithm {
None = 0,
LZ4 = 1
}
interface ICompressionStorageConfig {
algorithm: SummaryCompressionAlgorithm;
minSizeToCompress: number;
}
const DefaultCompressionStorageConfig: ICompressionStorageConfig;
function applyStorageCompression(
documentServiceFactory: IDocumentServiceFactory,
config?: ICompressionStorageConfig | boolean
): IDocumentServiceFactory;