or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

files.mdindex.mdprotocols.mdstreams.mdtypes.mdutils.md
tile.json

protocols.mddocs/

Protocol Communication

Avro protocol implementation for RPC/IPC communication with support for stateful and stateless emitters/listeners. Provides handshake negotiation, message routing, and bidirectional communication.

Capabilities

Protocol Creation

Create Avro protocol instances from protocol definitions using the parse function or the protocols module directly.

/**
 * Parse protocol definition (main method)
 * @param {Object} protocolDef - Protocol definition
 * @param {Object} opts - Options object
 * @returns {Protocol} Protocol instance
 */
function parse(protocolDef, opts);

/**
 * Create protocol directly from protocols module
 * @param {Object} attrs - Protocol definition
 * @param {Object} opts - Options object
 * @returns {Protocol} Protocol instance
 */
function createProtocol(attrs, opts); // Available from protocols module

Usage Examples:

const avro = require('avro-js');

// Define a protocol
const protocolDef = {
  protocol: 'UserService',
  namespace: 'com.example',
  types: [
    {
      type: 'record',
      name: 'User',
      fields: [
        { name: 'id', type: 'int' },
        { name: 'name', type: 'string' }
      ]
    }
  ],
  messages: {
    getUser: {
      request: [{ name: 'id', type: 'int' }],
      response: 'User'
    },
    createUser: {
      request: [{ name: 'user', type: 'User' }],
      response: 'int'
    }
  }
};

// Method 1: Using parse function (recommended)
const protocol = avro.parse(protocolDef);

// Method 2: Using protocols module directly
const protocols = require('avro-js/lib/protocols');
const protocolDirect = protocols.createProtocol(protocolDef);

// With options
const protocolWithOpts = avro.parse(protocolDef, {
  namespace: 'com.example.v2',
  registry: {}
});

Protocol Class

Main protocol class providing message handling and communication.

/**
 * Represents an Avro protocol
 */
class Protocol {
 /**
  * Create subprotocol for inheritance
  * @returns {Protocol} Subprotocol instance
  */
 subprotocol();

 /**
  * Emit message via emitter
  * @param {String} name - Message name
  * @param {Object} req - Request data
  * @param {MessageEmitter} emitter - Message emitter
  * @param {Function} cb - Callback function
  */
 emit(name, req, emitter, cb);

 /**
  * Create message emitter
  * @param {Object|Function} transport - Transport object or function
  * @param {Object} opts - Options object
  * @param {Function} cb - Callback function
  * @returns {MessageEmitter} Message emitter instance
  */
 createEmitter(transport, opts, cb);

 /**
  * Register message handler
  * @param {String} name - Message name
  * @param {Function} handler - Handler function
  */
 on(name, handler);

 /**
  * Create message listener
  * @param {Object|Function} transport - Transport object or function
  * @param {Object} opts - Options object
  * @param {Function} cb - Callback function
  * @returns {MessageListener} Message listener instance
  */
 createListener(transport, opts, cb);

 /**
  * Get named type from protocol
  * @param {String} name - Type name
  * @returns {Type} Type instance
  */
 getType(name);

 /**
  * Get protocol name
  * @returns {String} Protocol name
  */
 getName();

 /**
  * Get protocol messages
  * @returns {Object} Messages object
  */
 getMessages();

 /**
  * Get protocol schema as string
  * @returns {String} Protocol schema JSON
  */
 toString();
}

Usage Examples:

const avro = require('avro-js');

// Create protocol
const protocol = avro.createProtocol(protocolDef);

// Register message handlers
protocol.on('getUser', (req, sender, cb) => {
  const userId = req.id;
  // Look up user...
  const user = { id: userId, name: 'Alice' };
  cb(null, user);
});

protocol.on('createUser', (req, sender, cb) => {
  const user = req.user;
  // Save user...
  const newId = 123;
  cb(null, newId);
});

// Create subprotocol
const subProtocol = protocol.subprotocol();

Message Emitters

Send messages to remote protocol endpoints.

/**
 * Emitter for persistent connections
 */
class StatefulEmitter extends EventEmitter {
 /**
  * Create stateful emitter
  * @param {Protocol} protocol - Protocol instance
  * @param {ReadableStream} readable - Readable stream
  * @param {WritableStream} writable - Writable stream
  * @param {Object} opts - Options object
  */
 constructor(protocol, readable, writable, opts);
}

/**
 * Emitter for request/response patterns
 */
class StatelessEmitter extends EventEmitter {
 /**
  * Create stateless emitter
  * @param {Protocol} protocol - Protocol instance
  * @param {Function} transport - Transport function
  * @param {Object} opts - Options object
  */
 constructor(protocol, transport, opts);
}

Usage Examples:

const avro = require('avro-js');
const net = require('net');

// Create stateful emitter with TCP connection
const socket = net.connect(8080, 'localhost');
const emitter = protocol.createEmitter(socket);

emitter.on('handshake', (req, res) => {
  console.log('Handshake completed');
});

// Emit messages
protocol.emit('getUser', { id: 1 }, emitter, (err, user) => {
  if (err) throw err;
  console.log('User:', user);
});

// Create stateless emitter with HTTP-like transport
const httpTransport = (writerBuf, readerCb) => {
  // Send HTTP request with writerBuf
  // Call readerCb(err, responseBuf) when response received
};

const statelessEmitter = protocol.createEmitter(httpTransport);

Message Listeners

Listen for incoming protocol messages.

/**
 * Listener for persistent connections
 */
class StatefulListener extends EventEmitter {
 /**
  * Create stateful listener
  * @param {Protocol} protocol - Protocol instance
  * @param {ReadableStream} readable - Readable stream
  * @param {WritableStream} writable - Writable stream
  * @param {Object} opts - Options object
  */
 constructor(protocol, readable, writable, opts);
}

/**
 * Listener for request/response patterns
 */
class StatelessListener extends EventEmitter {
 /**
  * Create stateless listener
  * @param {Protocol} protocol - Protocol instance
  * @param {Function} transport - Transport function
  * @param {Object} opts - Options object
  */
 constructor(protocol, transport, opts);
}

Usage Examples:

const avro = require('avro-js');
const net = require('net');

// Create TCP server with stateful listeners
const server = net.createServer((socket) => {
  const listener = protocol.createListener(socket);
  
  listener.on('handshake', (req, res) => {
    console.log('Client connected');
  });
  
  listener.on('error', (err) => {
    console.error('Listener error:', err);
  });
});

server.listen(8080);

// Create stateless listener with HTTP-like transport
const httpListener = (readerBuf, writerCb) => {
  // Process request from readerBuf
  // Call writerCb(err, responseBuf) with response
};

const statelessListener = protocol.createListener(httpListener);

Protocol Messages

Internal message representation and handling.

/**
 * Represents a protocol message
 */
class Message {
 /**
  * Create message instance
  * @param {String} name - Message name
  * @param {Object} attrs - Message attributes
  * @param {Object} opts - Options object
  */
 constructor(name, attrs, opts);

 /**
  * Get message name
  * @returns {String} Message name
  */
 getName();

 /**
  * Get request type
  * @returns {Type} Request type
  */
 getRequestType();

 /**
  * Get response type
  * @returns {Type} Response type
  */
 getResponseType();

 /**
  * Get error types
  * @returns {Type[]} Array of error types
  */
 getErrorTypes();
}

Protocol Streams

Low-level streaming interfaces for protocol communication.

/**
 * Stream for encoding protocol messages
 */
class MessageEncoder extends stream.Transform {
 /**
  * Create message encoder
  * @param {Protocol} protocol - Protocol instance
  * @param {Object} opts - Options object
  */
 constructor(protocol, opts);
}

/**
 * Stream for decoding protocol messages
 */
class MessageDecoder extends stream.Transform {
 /**
  * Create message decoder
  * @param {Protocol} protocol - Protocol instance
  * @param {Object} opts - Options object
  */
 constructor(protocol, opts);
}

Handshake Types

/** Protocol handshake request type */
const HANDSHAKE_REQUEST_TYPE = {
  namespace: 'org.apache.avro.ipc',
  name: 'HandshakeRequest',
  type: 'record',
  fields: [
    { name: 'clientHash', type: { name: 'MD5', type: 'fixed', size: 16 } },
    { name: 'clientProtocol', type: ['null', 'string'], default: null },
    { name: 'serverHash', type: 'org.apache.avro.ipc.MD5' },
    { name: 'meta', type: ['null', { type: 'map', values: 'bytes' }], default: null }
  ]
};

/** Protocol handshake response type */
const HANDSHAKE_RESPONSE_TYPE = {
  namespace: 'org.apache.avro.ipc',
  name: 'HandshakeResponse',
  type: 'record',
  fields: [
    { 
      name: 'match',
      type: {
        name: 'HandshakeMatch',
        type: 'enum',
        symbols: ['BOTH', 'CLIENT', 'NONE']
      }
    },
    { name: 'serverProtocol', type: ['null', 'string'], default: null },
    { name: 'serverHash', type: ['null', { name: 'MD5', type: 'fixed', size: 16 }], default: null },
    { name: 'meta', type: ['null', { type: 'map', values: 'bytes' }], default: null }
  ]
};

Protocol Options

interface ProtocolDefinition {
  /** Protocol name */
  protocol: string;
  /** Optional namespace */
  namespace?: string;
  /** Types used by protocol */
  types?: Object[];
  /** Protocol messages */
  messages?: {
    [messageName: string]: {
      /** Request parameters */
      request: { name: string; type: any }[];
      /** Response type */
      response: any;
      /** Error types */
      errors?: any[];
      /** One-way message */
      'one-way'?: boolean;
    };
  };
}

interface EmitterOptions {
  /** Timeout for requests in milliseconds */
  timeout?: number;
  /** Maximum concurrent requests */
  scope?: Object;
  /** Initial handshake metadata */
  metadata?: { [key: string]: Buffer };
  /** Channel for multiplexing */
  channel?: Object;
}

interface ListenerOptions {
  /** Scope for handlers */
  scope?: Object;
  /** Channel for multiplexing */
  channel?: Object;
  /** Server metadata */
  metadata?: { [key: string]: Buffer };
}

interface TransportFunction {
  /** 
   * Transport function for stateless communication
   * @param {Buffer} reqBuf - Request buffer
   * @param {Function} cb - Callback(err, resBuf)
   */
  (reqBuf: Buffer, cb: (err: Error, resBuf: Buffer) => void): void;
}

interface MessageHandler {
  /**
   * Handler for protocol messages
   * @param {Object} req - Request data
   * @param {Object} sender - Sender information
   * @param {Function} cb - Response callback
   */
  (req: any, sender: any, cb: (err: Error, res: any) => void): void;
}