Avro protocol implementation for RPC/IPC communication with support for stateful and stateless emitters/listeners. Provides handshake negotiation, message routing, and bidirectional communication.
Create Avro protocol instances from protocol definitions using the parse function or the protocols module directly.
/**
* Parse protocol definition (main method)
* @param {Object} protocolDef - Protocol definition
* @param {Object} opts - Options object
* @returns {Protocol} Protocol instance
*/
function parse(protocolDef, opts);
/**
* Create protocol directly from protocols module
* @param {Object} attrs - Protocol definition
* @param {Object} opts - Options object
* @returns {Protocol} Protocol instance
*/
function createProtocol(attrs, opts); // Available from protocols moduleUsage Examples:
const avro = require('avro-js');
// Define a protocol
const protocolDef = {
protocol: 'UserService',
namespace: 'com.example',
types: [
{
type: 'record',
name: 'User',
fields: [
{ name: 'id', type: 'int' },
{ name: 'name', type: 'string' }
]
}
],
messages: {
getUser: {
request: [{ name: 'id', type: 'int' }],
response: 'User'
},
createUser: {
request: [{ name: 'user', type: 'User' }],
response: 'int'
}
}
};
// Method 1: Using parse function (recommended)
const protocol = avro.parse(protocolDef);
// Method 2: Using protocols module directly
const protocols = require('avro-js/lib/protocols');
const protocolDirect = protocols.createProtocol(protocolDef);
// With options
const protocolWithOpts = avro.parse(protocolDef, {
namespace: 'com.example.v2',
registry: {}
});Main protocol class providing message handling and communication.
/**
* Represents an Avro protocol
*/
class Protocol {
/**
* Create subprotocol for inheritance
* @returns {Protocol} Subprotocol instance
*/
subprotocol();
/**
* Emit message via emitter
* @param {String} name - Message name
* @param {Object} req - Request data
* @param {MessageEmitter} emitter - Message emitter
* @param {Function} cb - Callback function
*/
emit(name, req, emitter, cb);
/**
* Create message emitter
* @param {Object|Function} transport - Transport object or function
* @param {Object} opts - Options object
* @param {Function} cb - Callback function
* @returns {MessageEmitter} Message emitter instance
*/
createEmitter(transport, opts, cb);
/**
* Register message handler
* @param {String} name - Message name
* @param {Function} handler - Handler function
*/
on(name, handler);
/**
* Create message listener
* @param {Object|Function} transport - Transport object or function
* @param {Object} opts - Options object
* @param {Function} cb - Callback function
* @returns {MessageListener} Message listener instance
*/
createListener(transport, opts, cb);
/**
* Get named type from protocol
* @param {String} name - Type name
* @returns {Type} Type instance
*/
getType(name);
/**
* Get protocol name
* @returns {String} Protocol name
*/
getName();
/**
* Get protocol messages
* @returns {Object} Messages object
*/
getMessages();
/**
* Get protocol schema as string
* @returns {String} Protocol schema JSON
*/
toString();
}Usage Examples:
const avro = require('avro-js');
// Create protocol
const protocol = avro.createProtocol(protocolDef);
// Register message handlers
protocol.on('getUser', (req, sender, cb) => {
const userId = req.id;
// Look up user...
const user = { id: userId, name: 'Alice' };
cb(null, user);
});
protocol.on('createUser', (req, sender, cb) => {
const user = req.user;
// Save user...
const newId = 123;
cb(null, newId);
});
// Create subprotocol
const subProtocol = protocol.subprotocol();Send messages to remote protocol endpoints.
/**
* Emitter for persistent connections
*/
class StatefulEmitter extends EventEmitter {
/**
* Create stateful emitter
* @param {Protocol} protocol - Protocol instance
* @param {ReadableStream} readable - Readable stream
* @param {WritableStream} writable - Writable stream
* @param {Object} opts - Options object
*/
constructor(protocol, readable, writable, opts);
}
/**
* Emitter for request/response patterns
*/
class StatelessEmitter extends EventEmitter {
/**
* Create stateless emitter
* @param {Protocol} protocol - Protocol instance
* @param {Function} transport - Transport function
* @param {Object} opts - Options object
*/
constructor(protocol, transport, opts);
}Usage Examples:
const avro = require('avro-js');
const net = require('net');
// Create stateful emitter with TCP connection
const socket = net.connect(8080, 'localhost');
const emitter = protocol.createEmitter(socket);
emitter.on('handshake', (req, res) => {
console.log('Handshake completed');
});
// Emit messages
protocol.emit('getUser', { id: 1 }, emitter, (err, user) => {
if (err) throw err;
console.log('User:', user);
});
// Create stateless emitter with HTTP-like transport
const httpTransport = (writerBuf, readerCb) => {
// Send HTTP request with writerBuf
// Call readerCb(err, responseBuf) when response received
};
const statelessEmitter = protocol.createEmitter(httpTransport);Listen for incoming protocol messages.
/**
* Listener for persistent connections
*/
class StatefulListener extends EventEmitter {
/**
* Create stateful listener
* @param {Protocol} protocol - Protocol instance
* @param {ReadableStream} readable - Readable stream
* @param {WritableStream} writable - Writable stream
* @param {Object} opts - Options object
*/
constructor(protocol, readable, writable, opts);
}
/**
* Listener for request/response patterns
*/
class StatelessListener extends EventEmitter {
/**
* Create stateless listener
* @param {Protocol} protocol - Protocol instance
* @param {Function} transport - Transport function
* @param {Object} opts - Options object
*/
constructor(protocol, transport, opts);
}Usage Examples:
const avro = require('avro-js');
const net = require('net');
// Create TCP server with stateful listeners
const server = net.createServer((socket) => {
const listener = protocol.createListener(socket);
listener.on('handshake', (req, res) => {
console.log('Client connected');
});
listener.on('error', (err) => {
console.error('Listener error:', err);
});
});
server.listen(8080);
// Create stateless listener with HTTP-like transport
const httpListener = (readerBuf, writerCb) => {
// Process request from readerBuf
// Call writerCb(err, responseBuf) with response
};
const statelessListener = protocol.createListener(httpListener);Internal message representation and handling.
/**
* Represents a protocol message
*/
class Message {
/**
* Create message instance
* @param {String} name - Message name
* @param {Object} attrs - Message attributes
* @param {Object} opts - Options object
*/
constructor(name, attrs, opts);
/**
* Get message name
* @returns {String} Message name
*/
getName();
/**
* Get request type
* @returns {Type} Request type
*/
getRequestType();
/**
* Get response type
* @returns {Type} Response type
*/
getResponseType();
/**
* Get error types
* @returns {Type[]} Array of error types
*/
getErrorTypes();
}Low-level streaming interfaces for protocol communication.
/**
* Stream for encoding protocol messages
*/
class MessageEncoder extends stream.Transform {
/**
* Create message encoder
* @param {Protocol} protocol - Protocol instance
* @param {Object} opts - Options object
*/
constructor(protocol, opts);
}
/**
* Stream for decoding protocol messages
*/
class MessageDecoder extends stream.Transform {
/**
* Create message decoder
* @param {Protocol} protocol - Protocol instance
* @param {Object} opts - Options object
*/
constructor(protocol, opts);
}/** Protocol handshake request type */
const HANDSHAKE_REQUEST_TYPE = {
namespace: 'org.apache.avro.ipc',
name: 'HandshakeRequest',
type: 'record',
fields: [
{ name: 'clientHash', type: { name: 'MD5', type: 'fixed', size: 16 } },
{ name: 'clientProtocol', type: ['null', 'string'], default: null },
{ name: 'serverHash', type: 'org.apache.avro.ipc.MD5' },
{ name: 'meta', type: ['null', { type: 'map', values: 'bytes' }], default: null }
]
};
/** Protocol handshake response type */
const HANDSHAKE_RESPONSE_TYPE = {
namespace: 'org.apache.avro.ipc',
name: 'HandshakeResponse',
type: 'record',
fields: [
{
name: 'match',
type: {
name: 'HandshakeMatch',
type: 'enum',
symbols: ['BOTH', 'CLIENT', 'NONE']
}
},
{ name: 'serverProtocol', type: ['null', 'string'], default: null },
{ name: 'serverHash', type: ['null', { name: 'MD5', type: 'fixed', size: 16 }], default: null },
{ name: 'meta', type: ['null', { type: 'map', values: 'bytes' }], default: null }
]
};interface ProtocolDefinition {
/** Protocol name */
protocol: string;
/** Optional namespace */
namespace?: string;
/** Types used by protocol */
types?: Object[];
/** Protocol messages */
messages?: {
[messageName: string]: {
/** Request parameters */
request: { name: string; type: any }[];
/** Response type */
response: any;
/** Error types */
errors?: any[];
/** One-way message */
'one-way'?: boolean;
};
};
}
interface EmitterOptions {
/** Timeout for requests in milliseconds */
timeout?: number;
/** Maximum concurrent requests */
scope?: Object;
/** Initial handshake metadata */
metadata?: { [key: string]: Buffer };
/** Channel for multiplexing */
channel?: Object;
}
interface ListenerOptions {
/** Scope for handlers */
scope?: Object;
/** Channel for multiplexing */
channel?: Object;
/** Server metadata */
metadata?: { [key: string]: Buffer };
}
interface TransportFunction {
/**
* Transport function for stateless communication
* @param {Buffer} reqBuf - Request buffer
* @param {Function} cb - Callback(err, resBuf)
*/
(reqBuf: Buffer, cb: (err: Error, resBuf: Buffer) => void): void;
}
interface MessageHandler {
/**
* Handler for protocol messages
* @param {Object} req - Request data
* @param {Object} sender - Sender information
* @param {Function} cb - Response callback
*/
(req: any, sender: any, cb: (err: Error, res: any) => void): void;
}