Complete Avro RPC implementation with client-server communication, protocol discovery, and transport abstraction for building distributed systems.
Avro RPC service for protocol handling and client-server communication.
class Service {
/**
* Create service from protocol
* @param protocol - Avro protocol object or JSON string
* @param opts - Service creation options
* @returns Service instance
*/
static forProtocol(protocol, opts);
/**
* Check client-server protocol compatibility
* @param client - Client protocol or service
* @param server - Server protocol or service
* @returns Boolean indicating compatibility
*/
static compatible(client, server);
/**
* Check if object is a Service instance
* @param obj - Object to check
* @returns Boolean indicating if object is a Service
*/
static isService(obj);
/**
* Create RPC client
* @param opts - Client creation options
* @returns RPC client instance
*/
createClient(opts);
/**
* Create RPC server
* @param opts - Server creation options
* @returns RPC server instance
*/
createServer(opts);
/**
* Get message definition by name
* @param name - Message name
* @returns Message object
*/
message(name);
/**
* Get type definition by name
* @param name - Type name
* @returns Type instance
*/
type(name);
/**
* Check equality with another service (deprecated)
* @param other - Other service
* @returns Boolean indicating equality
*/
equals(other);
/**
* Get string representation
* @returns String representation
*/
inspect();
}Usage Examples:
const avsc = require('avsc');
// Create service from protocol
const protocol = {
protocol: 'Calculator',
namespace: 'com.example',
types: [
{
type: 'record',
name: 'AddRequest',
fields: [
{name: 'a', type: 'int'},
{name: 'b', type: 'int'}
]
}
],
messages: {
add: {
request: 'AddRequest',
response: 'int'
}
}
};
const service = avsc.Service.forProtocol(protocol);
// Create server
const server = service.createServer({
transport: tcpTransport
});
server.onMessage('add', (req, callback) => {
callback(null, req.a + req.b);
});
// Create client
const client = service.createClient({
transport: tcpTransport
});
client.emitMessage('add', {a: 5, b: 3}, (err, result) => {
console.log('Result:', result); // 8
});interface Service {
/** Documentation string from protocol */
doc: string;
/** Protocol hash for compatibility checking */
hash: Buffer;
/** Array of message definitions */
messages: Message[];
/** Service name from protocol */
name: string;
/** Complete protocol object */
protocol: any;
/** Array of type definitions */
types: Type[];
}RPC client implementation for making remote procedure calls.
class Client {
/**
* Get active communication channels
* @returns Array of active channels
*/
activeChannels();
/**
* Create communication channel with transport
* @param transport - Transport implementation
* @param opts - Channel options
* @returns Client channel instance
*/
createChannel(transport, opts);
/**
* Destroy all channels
* @param opts - Destruction options
*/
destroyChannels(opts);
/**
* Send RPC message
* @param name - Message name
* @param req - Request data
* @param opts - Call options
* @param callback - Response callback
*/
emitMessage(name, req, opts, callback);
/**
* Get remote protocol information
* @returns Array of remote protocols
*/
remoteProtocols();
/**
* Add middleware
* @param args - Middleware functions
*/
use(...args);
}RPC server implementation for handling remote procedure calls.
class Server {
/**
* Get active communication channels
* @returns Array of active channels
*/
activeChannels();
/**
* Create communication channel with transport
* @param transport - Transport implementation
* @param opts - Channel options
* @returns Server channel instance
*/
createChannel(transport, opts);
/**
* Register message handler
* @param name - Message name
* @param handler - Handler function
*/
onMessage(name, handler);
/**
* Get remote protocol information
* @returns Array of remote protocols
*/
remoteProtocols();
/**
* Add middleware
* @param args - Middleware functions
*/
use(...args);
}Communication channels for client-server connections.
class ClientChannel {
/** Associated client instance */
client: Client;
/** Whether channel is destroyed */
destroyed: boolean;
/** Whether channel is draining */
draining: boolean;
/** Number of pending operations */
pending: number;
/** Timeout value in milliseconds */
timeout: number;
/**
* Send ping to server
* @param timeout - Ping timeout
* @param callback - Response callback
*/
ping(timeout, callback);
/**
* Destroy channel
* @param noWait - Don't wait for pending operations
*/
destroy(noWait);
}
class ServerChannel {
/** Whether channel is destroyed */
destroyed: boolean;
/** Whether channel is draining */
draining: boolean;
/** Number of pending operations */
pending: number;
/** Associated server instance */
server: Server;
/**
* Destroy channel
* @param noWait - Don't wait for pending operations
*/
destroy(noWait);
}Discover protocol from transport connection.
/**
* Discover protocol from transport
* @param transport - Transport implementation
* @param opts - Discovery options
* @param callback - Protocol callback
*/
function discoverProtocol(transport, opts, callback);Usage Examples:
const avsc = require('avsc');
// Discover remote protocol
avsc.discoverProtocol(transport, {timeout: 5000}, (err, protocol) => {
if (err) {
console.error('Discovery failed:', err);
return;
}
console.log('Discovered protocol:', protocol.name);
const service = avsc.Service.forProtocol(protocol);
// Use service...
});Avro RPC uses pluggable transports for communication:
interface Transport {
/** Readable stream interface */
readable: boolean;
/** Writable stream interface */
writable: boolean;
/** Read data from transport */
read(size?: number): Buffer | null;
/** Write data to transport */
write(chunk: Buffer): boolean;
/** End transport connection */
end(): void;
/** Event emitter interface */
on(event: string, listener: Function): this;
emit(event: string, ...args: any[]): boolean;
}interface ServiceOptions {
/** Custom logical type implementations */
logicalTypes?: {[type: string]: any};
/** Default namespace */
namespace?: string;
/** Type registry */
registry?: {[name: string]: Type};
/** Silent mode for errors */
silent?: boolean;
}
interface ClientOptions {
/** Transport for communication */
transport?: Transport;
/** Request timeout in milliseconds */
timeout?: number;
/** Maximum concurrent requests */
concurrency?: number;
/** Buffer pool for message encoding */
bufferPool?: any;
}
interface ServerOptions {
/** Transport for communication */
transport?: Transport;
/** Buffer pool for message encoding */
bufferPool?: any;
/** Silent mode for errors */
silent?: boolean;
}
interface ChannelOptions {
/** Request timeout */
timeout?: number;
/** Buffer pool */
bufferPool?: any;
/** Silent mode */
silent?: boolean;
}
interface CallOptions {
/** Request timeout override */
timeout?: number;
}
interface DiscoveryOptions {
/** Discovery timeout */
timeout?: number;
/** Silent mode */
silent?: boolean;
}
interface Message {
/** Message name */
name: string;
/** Request type */
requestType: Type;
/** Response type */
responseType: Type;
/** Error union type */
errorType?: Type;
/** Documentation */
doc?: string;
/** One-way message flag */
oneWay?: boolean;
}
type MessageHandler = (request: any, callback: (err: Error | null, response?: any) => void) => void;
type MessageCallback = (err: Error | null, response?: any) => void;