The protocol and quorum management system handles Fluid protocol operations, client synchronization, and consensus mechanisms within Fluid containers.
The main interface for handling Fluid protocol operations, extending the base protocol handler with audience management and signal processing.
interface IProtocolHandler extends IBaseProtocolHandler {
readonly audience: IAudienceOwner;
processSignal(message: ISignalMessage): any;
}The foundational interface providing core protocol handling capabilities.
interface IBaseProtocolHandler {
readonly attributes: IDocumentAttributes;
readonly quorum: IQuorum;
close(): void;
getProtocolState(): IScribeProtocolState;
processMessage(message: ISequencedDocumentMessage, local: boolean): IProcessMessageResult;
setConnectionState(connected: boolean, clientId: string | undefined): any;
snapshot(): IQuorumSnapshot;
}A function type for building custom protocol handlers.
type ProtocolHandlerBuilder = (
attributes: IDocumentAttributes,
snapshot: IQuorumSnapshot,
sendProposal: (key: string, value: any) => number
) => IProtocolHandler;import type {
ProtocolHandlerBuilder,
IDocumentAttributes,
IQuorumSnapshot,
IProtocolHandler
} from "@fluidframework/container-loader";
const customProtocolHandlerBuilder: ProtocolHandlerBuilder = (
attributes: IDocumentAttributes,
snapshot: IQuorumSnapshot,
sendProposal: (key: string, value: any) => number
): IProtocolHandler => {
return {
// Implement IBaseProtocolHandler methods
attributes,
quorum: new CustomQuorum(snapshot, sendProposal),
close() {
// Cleanup protocol handler resources
this.quorum.dispose();
},
getProtocolState(): IScribeProtocolState {
return {
members: this.quorum.getMembers().map(member => [member.id, member]),
minimumSequenceNumber: this.quorum.getMinimumSequenceNumber(),
proposals: this.quorum.getProposals(),
sequenceNumber: this.quorum.getSequenceNumber(),
values: this.quorum.getValues()
};
},
processMessage(message, local) {
// Handle incoming protocol messages
return this.quorum.processMessage(message, local);
},
setConnectionState(connected, clientId) {
this.quorum.setConnectionState(connected, clientId);
},
snapshot(): IQuorumSnapshot {
return {
members: this.quorum.getMembers().map(member => [member.id, member]),
proposals: this.quorum.getProposals(),
values: this.quorum.getValues()
};
},
// Implement IProtocolHandler-specific methods
audience: this.quorum.getAudience(),
processSignal(message) {
// Handle signal messages
return this.audience.processSignal(message);
}
};
};
// Use in loader configuration
const loaderProps: ILoaderProps = {
codeLoader: myCodeLoader,
documentServiceFactory: myDocumentServiceFactory,
urlResolver: myUrlResolver,
protocolHandlerBuilder: customProtocolHandlerBuilder
};Interface representing a snapshot of the quorum state.
interface IQuorumSnapshot {
members: QuorumClientsSnapshot;
proposals: QuorumProposalsSnapshot["proposals"];
values: QuorumProposalsSnapshot["values"];
}Interface representing the complete protocol state for scribe operations.
interface IScribeProtocolState {
members: [string, ISequencedClient][];
minimumSequenceNumber: number;
proposals: [number, ISequencedProposal, string[]][];
sequenceNumber: number;
values: [string, ICommittedProposal][];
}type QuorumClientsSnapshot = [string, ISequencedClient][];
type QuorumProposalsSnapshot = {
proposals: [number, ISequencedProposal, string[]][];
values: [string, ICommittedProposal][];
};These types represent the serialized state of quorum members and proposals for snapshot and restoration purposes.
import type {
IProtocolHandler,
ISequencedDocumentMessage,
ISignalMessage
} from "@fluidframework/container-loader";
class ContainerManager {
private protocolHandler: IProtocolHandler;
constructor(protocolHandler: IProtocolHandler) {
this.protocolHandler = protocolHandler;
}
handleIncomingMessage(message: ISequencedDocumentMessage, isLocal: boolean) {
// Process operational messages through protocol handler
const result = this.protocolHandler.processMessage(message, isLocal);
if (result.hasProposals) {
console.log("Message contains proposals:", result.proposals);
}
return result;
}
handleSignalMessage(signal: ISignalMessage) {
// Process signal messages
this.protocolHandler.processSignal(signal);
}
getQuorumMembers() {
// Access current quorum members
return this.protocolHandler.quorum.getMembers();
}
proposeQuorumChange(key: string, value: any) {
// Propose changes to quorum state
const proposalId = this.protocolHandler.quorum.propose(key, value);
console.log(`Proposed change for ${key} with ID ${proposalId}`);
return proposalId;
}
onConnectionStateChange(connected: boolean, clientId?: string) {
// Update protocol handler with connection state
this.protocolHandler.setConnectionState(connected, clientId);
}
createSnapshot() {
// Create snapshot of current protocol state
return this.protocolHandler.snapshot();
}
getProtocolState() {
// Get detailed protocol state for scribe operations
return this.protocolHandler.getProtocolState();
}
dispose() {
// Clean up protocol handler
this.protocolHandler.close();
}
}import type { IAudienceOwner } from "@fluidframework/container-definitions";
function setupAudienceHandling(protocolHandler: IProtocolHandler) {
const audience = protocolHandler.audience;
// Listen for audience changes
audience.on("addMember", (clientId: string, details: any) => {
console.log(`Client ${clientId} joined the audience`, details);
});
audience.on("removeMember", (clientId: string) => {
console.log(`Client ${clientId} left the audience`);
});
// Get current audience members
const currentMembers = audience.getMembers();
console.log("Current audience members:",
Object.keys(currentMembers).length);
// Get specific member details
const myClientId = audience.getMyself()?.id;
if (myClientId) {
const myDetails = currentMembers[myClientId];
console.log("My client details:", myDetails);
}
}import type { IQuorum } from "@fluidframework/protocol-definitions";
function handleQuorumOperations(quorum: IQuorum) {
// Propose a value change
const proposalId = quorum.propose("setting.maxUsers", 100);
// Listen for proposal approval
quorum.on("approveProposal", (sequenceNumber: number, key: string, value: any, approvalSequenceNumber: number) => {
if (key === "setting.maxUsers") {
console.log(`Max users setting approved: ${value}`);
}
});
// Listen for quorum value changes
quorum.on("commitProposal", (key: string, value: any) => {
console.log(`Quorum value committed - ${key}: ${value}`);
});
// Get current quorum values
const maxUsers = quorum.get("setting.maxUsers");
console.log("Current max users:", maxUsers);
// Check if a proposal is pending
const pendingProposals = quorum.getProposals();
console.log("Pending proposals:", pendingProposals.size);
}function serializeProtocolState(protocolHandler: IProtocolHandler) {
// Create complete snapshot for persistence
const snapshot = protocolHandler.snapshot();
const serializedState = {
members: snapshot.members,
proposals: snapshot.proposals,
values: snapshot.values,
protocolState: protocolHandler.getProtocolState()
};
return JSON.stringify(serializedState);
}
function restoreProtocolState(
serializedState: string,
protocolHandlerBuilder: ProtocolHandlerBuilder,
attributes: IDocumentAttributes,
sendProposal: (key: string, value: any) => number
): IProtocolHandler {
const state = JSON.parse(serializedState);
const snapshot: IQuorumSnapshot = {
members: state.members,
proposals: state.proposals,
values: state.values
};
// Restore protocol handler from snapshot
return protocolHandlerBuilder(attributes, snapshot, sendProposal);
}function handleProtocolErrors(protocolHandler: IProtocolHandler) {
try {
// Attempt protocol operations
const result = protocolHandler.processMessage(message, false);
if (result.error) {
console.error("Protocol message processing error:", result.error);
// Handle specific error types
switch (result.error.errorType) {
case "invalidSequenceNumber":
// Handle sequence number conflicts
break;
case "proposalRejected":
// Handle rejected proposals
break;
default:
// Handle other protocol errors
break;
}
}
} catch (error) {
console.error("Unexpected protocol error:", error);
// Attempt recovery
try {
const currentState = protocolHandler.getProtocolState();
console.log("Current protocol state:", currentState);
// Potentially reset or reinitialize based on state
} catch (stateError) {
console.error("Cannot recover protocol state:", stateError);
// Consider full protocol handler reset
}
}
}class ProtocolManager {
private protocolHandler?: IProtocolHandler;
async initialize(
attributes: IDocumentAttributes,
snapshot: IQuorumSnapshot,
sendProposal: (key: string, value: any) => number,
protocolHandlerBuilder: ProtocolHandlerBuilder
) {
// Initialize protocol handler
this.protocolHandler = protocolHandlerBuilder(
attributes,
snapshot,
sendProposal
);
// Set up error handling
this.setupErrorHandling();
// Set up connection state monitoring
this.setupConnectionMonitoring();
}
private setupErrorHandling() {
if (!this.protocolHandler) return;
// Monitor for protocol errors
this.protocolHandler.quorum.on("error", (error) => {
console.error("Quorum error:", error);
this.handleQuorumError(error);
});
}
private setupConnectionMonitoring() {
// Monitor connection state and update protocol handler
this.connectionService.on("connected", (clientId: string) => {
this.protocolHandler?.setConnectionState(true, clientId);
});
this.connectionService.on("disconnected", () => {
this.protocolHandler?.setConnectionState(false, undefined);
});
}
dispose() {
if (this.protocolHandler) {
this.protocolHandler.close();
this.protocolHandler = undefined;
}
}
}