or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-processing-summary.mdindex.mdnetwork-utilities.mdprotocol-message-utilities.mdrequest-management.mdretry-rate-limiting.mdstorage-services.mdtree-blob-utilities.mdurl-compression.md
tile.json

protocol-message-utilities.mddocs/

Protocol and Message Utilities

Message recognition and Git protocol operations for Fluid Framework's underlying protocols. Handles message classification and Git tree operations.

Capabilities

Message Classification

Functions for identifying and classifying different types of messages in the Fluid Framework protocol.

/**
 * Identifies if message was sent by container runtime
 * @param message - Message with type property
 * @returns true if message type is MessageType.Operation
 */
function isRuntimeMessage(message: {type: string}): boolean;

/**
 * Determines if message can be coalesced by relay service
 * @param message - Sequenced or unsequenced document message
 * @returns true if message can be coalesced (NoOp or Accept messages)
 */
function canBeCoalescedByService(
  message: ISequencedDocumentMessage | IDocumentMessage
): boolean;

Usage Examples:

import { 
  isRuntimeMessage, 
  canBeCoalescedByService 
} from "@fluidframework/driver-utils";

// Filter runtime messages
const messages = await getMessages();
const runtimeMessages = messages.filter(isRuntimeMessage);

// Check if messages can be coalesced
const coalescableMessages = messages.filter(canBeCoalescedByService);
console.log(`${coalescableMessages.length} messages can be coalesced`);

Git Protocol Operations

Functions for working with Git tree structures and converting between Fluid Framework formats and Git protocol formats.

/**
 * Converts flat git tree structure to hierarchical snapshot tree
 * @param flatTree - Flat git tree from service
 * @param blobsShaToPathCache - Optional cache mapping blob SHAs to paths
 * @param removeAppTreePrefix - Whether to remove .app/ prefix from paths
 * @returns Hierarchical snapshot tree with extended metadata
 */
function buildGitTreeHierarchy(
  flatTree: IGitTree,
  blobsShaToPathCache?: Map<string, string>,
  removeAppTreePrefix?: boolean
): ISnapshotTreeEx;

/**
 * Returns git file mode string for summary objects
 * @param value - Summary object (tree or blob)
 * @returns Git file mode (FileMode.File or FileMode.Directory)
 */
function getGitMode(value: SummaryObject): string;

/**
 * Returns git object type for summary objects
 * @param value - Summary object (tree or blob)
 * @returns "blob" for files, "tree" for directories
 */
function getGitType(value: SummaryObject): "blob" | "tree";

Usage Examples:

import { 
  buildGitTreeHierarchy, 
  getGitMode, 
  getGitType 
} from "@fluidframework/driver-utils";

// Convert flat git tree to hierarchical structure
const flatGitTree: IGitTree = {
  entries: [
    { path: ".app/data.json", sha: "abc123", mode: "100644", type: "blob" },
    { path: ".app/config/settings.json", sha: "def456", mode: "100644", type: "blob" },
    { path: ".protocol/quorum.json", sha: "ghi789", mode: "100644", type: "blob" }
  ]
};

const blobCache = new Map<string, string>();
const hierarchicalTree = buildGitTreeHierarchy(
  flatGitTree, 
  blobCache, 
  true // Remove .app prefix
);

// Get git metadata for summary objects
const summaryBlob = { type: SummaryType.Blob, content: "data" };
const summaryTree = { type: SummaryType.Tree, tree: {} };

console.log(getGitMode(summaryBlob)); // "100644"
console.log(getGitType(summaryBlob)); // "blob"
console.log(getGitMode(summaryTree)); // "040000" 
console.log(getGitType(summaryTree)); // "tree"

Advanced Usage Patterns

Message Processing Pipeline

import { 
  isRuntimeMessage, 
  canBeCoalescedByService 
} from "@fluidframework/driver-utils";

class MessageProcessor {
  processMessageBatch(messages: ISequencedDocumentMessage[]): ProcessedMessages {
    const runtimeMessages: ISequencedDocumentMessage[] = [];
    const systemMessages: ISequencedDocumentMessage[] = [];
    const coalescableMessages: ISequencedDocumentMessage[] = [];
    
    for (const message of messages) {
      if (isRuntimeMessage(message)) {
        runtimeMessages.push(message);
      } else {
        systemMessages.push(message);
      }
      
      if (canBeCoalescedByService(message)) {
        coalescableMessages.push(message);
      }
    }
    
    return {
      runtime: runtimeMessages,
      system: systemMessages,
      coalescable: coalescableMessages,
      total: messages.length
    };
  }
  
  optimizeMessageBatch(messages: ISequencedDocumentMessage[]): ISequencedDocumentMessage[] {
    // Remove coalescable messages that can be optimized away
    return messages.filter(message => !canBeCoalescedByService(message));
  }
}

interface ProcessedMessages {
  runtime: ISequencedDocumentMessage[];
  system: ISequencedDocumentMessage[];
  coalescable: ISequencedDocumentMessage[];
  total: number;
}

Git Tree Converter

import { 
  buildGitTreeHierarchy, 
  getGitMode, 
  getGitType 
} from "@fluidframework/driver-utils";

class GitTreeConverter {
  convertToSnapshot(
    gitTree: IGitTree, 
    shouldRemoveAppPrefix: boolean = false
  ): ConversionResult {
    const blobPathCache = new Map<string, string>();
    
    const snapshotTree = buildGitTreeHierarchy(
      gitTree, 
      blobPathCache, 
      shouldRemoveAppPrefix
    );
    
    return {
      tree: snapshotTree,
      blobPaths: blobPathCache,
      stats: this.calculateStats(gitTree)
    };
  }
  
  generateGitTreeFromSummary(summary: ISummaryTree): GitTreeEntry[] {
    const entries: GitTreeEntry[] = [];
    this.walkSummaryTree(summary, "", entries);
    return entries;
  }
  
  private walkSummaryTree(
    tree: ISummaryTree, 
    basePath: string, 
    entries: GitTreeEntry[]
  ): void {
    for (const [name, value] of Object.entries(tree.tree)) {
      const fullPath = basePath ? `${basePath}/${name}` : name;
      
      entries.push({
        path: fullPath,
        mode: getGitMode(value),
        type: getGitType(value),
        sha: this.generateSha(value) // Would need actual SHA calculation
      });
      
      if (value.type === SummaryType.Tree) {
        this.walkSummaryTree(value, fullPath, entries);
      }
    }
  }
  
  private calculateStats(gitTree: IGitTree): GitTreeStats {
    const stats: GitTreeStats = {
      totalEntries: gitTree.entries.length,
      blobCount: 0,
      treeCount: 0,
      maxDepth: 0
    };
    
    for (const entry of gitTree.entries) {
      if (entry.type === "blob") {
        stats.blobCount++;
      } else if (entry.type === "tree") {
        stats.treeCount++;
      }
      
      const depth = entry.path.split('/').length;
      stats.maxDepth = Math.max(stats.maxDepth, depth);
    }
    
    return stats;
  }
  
  private generateSha(value: SummaryObject): string {
    // Simplified SHA generation - in practice would use proper hashing
    return `sha-${JSON.stringify(value).length}`;
  }
}

interface ConversionResult {
  tree: ISnapshotTreeEx;
  blobPaths: Map<string, string>;
  stats: GitTreeStats;
}

interface GitTreeStats {
  totalEntries: number;
  blobCount: number;
  treeCount: number;
  maxDepth: number;
}

interface GitTreeEntry {
  path: string;
  mode: string;
  type: "blob" | "tree";
  sha: string;
}

Message Stream Processor

import { 
  isRuntimeMessage, 
  canBeCoalescedByService 
} from "@fluidframework/driver-utils";

class MessageStreamProcessor {
  async processMessageStream(
    messageStream: IStream<ISequencedDocumentMessage[]>,
    onRuntimeMessage: (message: ISequencedDocumentMessage) => void,
    onSystemMessage: (message: ISequencedDocumentMessage) => void
  ): Promise<void> {
    while (true) {
      const result = await messageStream.read();
      
      if (result.done) {
        break;
      }
      
      if (result.value) {
        for (const message of result.value) {
          if (isRuntimeMessage(message)) {
            onRuntimeMessage(message);
          } else {
            onSystemMessage(message);
          }
        }
      }
    }
  }
  
  createOptimizedMessageStream(
    sourceStream: IStream<ISequencedDocumentMessage[]>
  ): IStream<ISequencedDocumentMessage[]> {
    const optimizedQueue = new Queue<ISequencedDocumentMessage[]>();
    
    // Process source stream in background
    this.processSourceStream(sourceStream, optimizedQueue);
    
    return optimizedQueue;
  }
  
  private async processSourceStream(
    source: IStream<ISequencedDocumentMessage[]>,
    output: Queue<ISequencedDocumentMessage[]>
  ): Promise<void> {
    try {
      while (true) {
        const result = await source.read();
        
        if (result.done) {
          output.pushDone();
          break;
        }
        
        if (result.value) {
          // Filter out coalescable messages for optimization
          const optimizedBatch = result.value.filter(
            message => !canBeCoalescedByService(message)
          );
          
          if (optimizedBatch.length > 0) {
            output.pushValue(optimizedBatch);
          }
        }
      }
    } catch (error) {
      output.pushError(error);
    }
  }
}