Message recognition and Git protocol operations for Fluid Framework's underlying protocols. Handles message classification and Git tree operations.
Functions for identifying and classifying different types of messages in the Fluid Framework protocol.
/**
* Identifies if message was sent by container runtime
* @param message - Message with type property
* @returns true if message type is MessageType.Operation
*/
function isRuntimeMessage(message: {type: string}): boolean;
/**
* Determines if message can be coalesced by relay service
* @param message - Sequenced or unsequenced document message
* @returns true if message can be coalesced (NoOp or Accept messages)
*/
function canBeCoalescedByService(
message: ISequencedDocumentMessage | IDocumentMessage
): boolean;Usage Examples:
import {
isRuntimeMessage,
canBeCoalescedByService
} from "@fluidframework/driver-utils";
// Filter runtime messages
const messages = await getMessages();
const runtimeMessages = messages.filter(isRuntimeMessage);
// Check if messages can be coalesced
const coalescableMessages = messages.filter(canBeCoalescedByService);
console.log(`${coalescableMessages.length} messages can be coalesced`);Functions for working with Git tree structures and converting between Fluid Framework formats and Git protocol formats.
/**
* Converts flat git tree structure to hierarchical snapshot tree
* @param flatTree - Flat git tree from service
* @param blobsShaToPathCache - Optional cache mapping blob SHAs to paths
* @param removeAppTreePrefix - Whether to remove .app/ prefix from paths
* @returns Hierarchical snapshot tree with extended metadata
*/
function buildGitTreeHierarchy(
flatTree: IGitTree,
blobsShaToPathCache?: Map<string, string>,
removeAppTreePrefix?: boolean
): ISnapshotTreeEx;
/**
* Returns git file mode string for summary objects
* @param value - Summary object (tree or blob)
* @returns Git file mode (FileMode.File or FileMode.Directory)
*/
function getGitMode(value: SummaryObject): string;
/**
* Returns git object type for summary objects
* @param value - Summary object (tree or blob)
* @returns "blob" for files, "tree" for directories
*/
function getGitType(value: SummaryObject): "blob" | "tree";Usage Examples:
import {
buildGitTreeHierarchy,
getGitMode,
getGitType
} from "@fluidframework/driver-utils";
// Convert flat git tree to hierarchical structure
const flatGitTree: IGitTree = {
entries: [
{ path: ".app/data.json", sha: "abc123", mode: "100644", type: "blob" },
{ path: ".app/config/settings.json", sha: "def456", mode: "100644", type: "blob" },
{ path: ".protocol/quorum.json", sha: "ghi789", mode: "100644", type: "blob" }
]
};
const blobCache = new Map<string, string>();
const hierarchicalTree = buildGitTreeHierarchy(
flatGitTree,
blobCache,
true // Remove .app prefix
);
// Get git metadata for summary objects
const summaryBlob = { type: SummaryType.Blob, content: "data" };
const summaryTree = { type: SummaryType.Tree, tree: {} };
console.log(getGitMode(summaryBlob)); // "100644"
console.log(getGitType(summaryBlob)); // "blob"
console.log(getGitMode(summaryTree)); // "040000"
console.log(getGitType(summaryTree)); // "tree"import {
isRuntimeMessage,
canBeCoalescedByService
} from "@fluidframework/driver-utils";
class MessageProcessor {
processMessageBatch(messages: ISequencedDocumentMessage[]): ProcessedMessages {
const runtimeMessages: ISequencedDocumentMessage[] = [];
const systemMessages: ISequencedDocumentMessage[] = [];
const coalescableMessages: ISequencedDocumentMessage[] = [];
for (const message of messages) {
if (isRuntimeMessage(message)) {
runtimeMessages.push(message);
} else {
systemMessages.push(message);
}
if (canBeCoalescedByService(message)) {
coalescableMessages.push(message);
}
}
return {
runtime: runtimeMessages,
system: systemMessages,
coalescable: coalescableMessages,
total: messages.length
};
}
optimizeMessageBatch(messages: ISequencedDocumentMessage[]): ISequencedDocumentMessage[] {
// Remove coalescable messages that can be optimized away
return messages.filter(message => !canBeCoalescedByService(message));
}
}
interface ProcessedMessages {
runtime: ISequencedDocumentMessage[];
system: ISequencedDocumentMessage[];
coalescable: ISequencedDocumentMessage[];
total: number;
}import {
buildGitTreeHierarchy,
getGitMode,
getGitType
} from "@fluidframework/driver-utils";
class GitTreeConverter {
convertToSnapshot(
gitTree: IGitTree,
shouldRemoveAppPrefix: boolean = false
): ConversionResult {
const blobPathCache = new Map<string, string>();
const snapshotTree = buildGitTreeHierarchy(
gitTree,
blobPathCache,
shouldRemoveAppPrefix
);
return {
tree: snapshotTree,
blobPaths: blobPathCache,
stats: this.calculateStats(gitTree)
};
}
generateGitTreeFromSummary(summary: ISummaryTree): GitTreeEntry[] {
const entries: GitTreeEntry[] = [];
this.walkSummaryTree(summary, "", entries);
return entries;
}
private walkSummaryTree(
tree: ISummaryTree,
basePath: string,
entries: GitTreeEntry[]
): void {
for (const [name, value] of Object.entries(tree.tree)) {
const fullPath = basePath ? `${basePath}/${name}` : name;
entries.push({
path: fullPath,
mode: getGitMode(value),
type: getGitType(value),
sha: this.generateSha(value) // Would need actual SHA calculation
});
if (value.type === SummaryType.Tree) {
this.walkSummaryTree(value, fullPath, entries);
}
}
}
private calculateStats(gitTree: IGitTree): GitTreeStats {
const stats: GitTreeStats = {
totalEntries: gitTree.entries.length,
blobCount: 0,
treeCount: 0,
maxDepth: 0
};
for (const entry of gitTree.entries) {
if (entry.type === "blob") {
stats.blobCount++;
} else if (entry.type === "tree") {
stats.treeCount++;
}
const depth = entry.path.split('/').length;
stats.maxDepth = Math.max(stats.maxDepth, depth);
}
return stats;
}
private generateSha(value: SummaryObject): string {
// Simplified SHA generation - in practice would use proper hashing
return `sha-${JSON.stringify(value).length}`;
}
}
interface ConversionResult {
tree: ISnapshotTreeEx;
blobPaths: Map<string, string>;
stats: GitTreeStats;
}
interface GitTreeStats {
totalEntries: number;
blobCount: number;
treeCount: number;
maxDepth: number;
}
interface GitTreeEntry {
path: string;
mode: string;
type: "blob" | "tree";
sha: string;
}import {
isRuntimeMessage,
canBeCoalescedByService
} from "@fluidframework/driver-utils";
class MessageStreamProcessor {
async processMessageStream(
messageStream: IStream<ISequencedDocumentMessage[]>,
onRuntimeMessage: (message: ISequencedDocumentMessage) => void,
onSystemMessage: (message: ISequencedDocumentMessage) => void
): Promise<void> {
while (true) {
const result = await messageStream.read();
if (result.done) {
break;
}
if (result.value) {
for (const message of result.value) {
if (isRuntimeMessage(message)) {
onRuntimeMessage(message);
} else {
onSystemMessage(message);
}
}
}
}
}
createOptimizedMessageStream(
sourceStream: IStream<ISequencedDocumentMessage[]>
): IStream<ISequencedDocumentMessage[]> {
const optimizedQueue = new Queue<ISequencedDocumentMessage[]>();
// Process source stream in background
this.processSourceStream(sourceStream, optimizedQueue);
return optimizedQueue;
}
private async processSourceStream(
source: IStream<ISequencedDocumentMessage[]>,
output: Queue<ISequencedDocumentMessage[]>
): Promise<void> {
try {
while (true) {
const result = await source.read();
if (result.done) {
output.pushDone();
break;
}
if (result.value) {
// Filter out coalescable messages for optimization
const optimizedBatch = result.value.filter(
message => !canBeCoalescedByService(message)
);
if (optimizedBatch.length > 0) {
output.pushValue(optimizedBatch);
}
}
}
} catch (error) {
output.pushError(error);
}
}
}