The ACP SDK uses bidirectional streams for JSON-RPC 2.0 message exchange. The Stream type and ndJsonStream function provide the foundation for ACP communication.
The Stream type defines a bidirectional message channel.
type Stream = {
writable: WritableStream<AnyMessage>;
readable: ReadableStream<AnyMessage>;
};Properties:
writable: WritableStream for sending messages to the other partyreadable: ReadableStream for receiving messages from the other partyAnyMessage Type: A union of all JSON-RPC 2.0 message types (requests, responses, and notifications).
Creates an ACP Stream from a pair of newline-delimited JSON streams.
/**
* Creates an ACP Stream from newline-delimited JSON streams
* @param output - Writable stream to send encoded messages to
* @param input - Readable stream to receive encoded messages from
* @returns Stream for bidirectional ACP communication
*/
function ndJsonStream(
output: WritableStream<Uint8Array>,
input: ReadableStream<Uint8Array>
): StreamParameters:
output: WritableStream<Uint8Array> for encoded output (newline-delimited JSON)input: ReadableStream<Uint8Array> for encoded input (newline-delimited JSON)Returns:
Stream object with readable/writable streams of AnyMessage objectsDescription: This is the typical way to handle ACP connections over stdio, converting between AnyMessage objects and newline-delimited JSON. Each message is:
Messages are parsed by:
import { ndJsonStream, AgentSideConnection } from '@agentclientprotocol/sdk';
import { Readable, Writable } from 'stream';
// Convert Node.js streams to Web Streams
function nodeStreamToWebReadable(nodeStream: Readable): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
nodeStream.on('data', (chunk) => {
controller.enqueue(chunk);
});
nodeStream.on('end', () => {
controller.close();
});
nodeStream.on('error', (error) => {
controller.error(error);
});
}
});
}
function nodeStreamToWebWritable(nodeStream: Writable): WritableStream<Uint8Array> {
return new WritableStream({
write(chunk) {
return new Promise((resolve, reject) => {
nodeStream.write(chunk, (error) => {
if (error) reject(error);
else resolve();
});
});
}
});
}
// Create ACP stream from stdio
const stream = ndJsonStream(
nodeStreamToWebWritable(process.stdout),
nodeStreamToWebReadable(process.stdin)
);
// Use with connection
const connection = new AgentSideConnection(
(conn) => agent,
stream
);You can also implement custom streams for other transports:
import { Stream, AnyMessage } from '@agentclientprotocol/sdk';
// Example: WebSocket-based stream
function webSocketStream(ws: WebSocket): Stream {
const readable = new ReadableStream<AnyMessage>({
start(controller) {
ws.addEventListener('message', (event) => {
try {
const message = JSON.parse(event.data) as AnyMessage;
controller.enqueue(message);
} catch (error) {
console.error('Failed to parse message:', error);
}
});
ws.addEventListener('close', () => {
controller.close();
});
ws.addEventListener('error', (error) => {
controller.error(error);
});
}
});
const writable = new WritableStream<AnyMessage>({
write(message) {
ws.send(JSON.stringify(message));
}
});
return { readable, writable };
}
// Use with connection
const ws = new WebSocket('ws://localhost:8080');
const stream = webSocketStream(ws);
const connection = new ClientSideConnection(
(agent) => client,
stream
);import * as net from 'net';
import { ndJsonStream } from '@agentclientprotocol/sdk';
// Connect to TCP socket
const socket = net.connect({ port: 8080, host: 'localhost' });
const stream = ndJsonStream(
nodeStreamToWebWritable(socket),
nodeStreamToWebReadable(socket)
);
const connection = new AgentSideConnection(
(conn) => agent,
stream
);
// Handle connection events
socket.on('connect', () => {
console.log('Connected to agent');
});
socket.on('end', () => {
console.log('Connection closed');
});
socket.on('error', (error) => {
console.error('Socket error:', error);
});import { Stream, AnyMessage } from '@agentclientprotocol/sdk';
// Create paired streams for testing
function createTestStreams(): { agent: Stream; client: Stream } {
// Message queues
const agentToClient: AnyMessage[] = [];
const clientToAgent: AnyMessage[] = [];
// Agent's readable stream (receives from client)
const agentReadable = new ReadableStream<AnyMessage>({
async pull(controller) {
while (clientToAgent.length === 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
controller.enqueue(clientToAgent.shift()!);
}
});
// Agent's writable stream (sends to client)
const agentWritable = new WritableStream<AnyMessage>({
write(message) {
agentToClient.push(message);
}
});
// Client's readable stream (receives from agent)
const clientReadable = new ReadableStream<AnyMessage>({
async pull(controller) {
while (agentToClient.length === 0) {
await new Promise(resolve => setTimeout(resolve, 10));
}
controller.enqueue(agentToClient.shift()!);
}
});
// Client's writable stream (sends to agent)
const clientWritable = new WritableStream<AnyMessage>({
write(message) {
clientToAgent.push(message);
}
});
return {
agent: { readable: agentReadable, writable: agentWritable },
client: { readable: clientReadable, writable: clientWritable }
};
}
// Use in tests
const { agent: agentStream, client: clientStream } = createTestStreams();
const agentConn = new AgentSideConnection((conn) => agent, agentStream);
const clientConn = new ClientSideConnection((agent) => client, clientStream);
// Now they can communicate
const initResponse = await clientConn.initialize({
protocolVersion: 1,
capabilities: {},
clientInfo: { name: 'test-client', version: '1.0.0' }
});Both AgentSideConnection and ClientSideConnection manage stream lifecycle:
const connection = new AgentSideConnection(
(conn) => agent,
stream
);
// Listen for closure
connection.signal.addEventListener('abort', () => {
console.log('Stream closed, connection ended');
});
// Wait for closure
await connection.closed;
console.log('Connection has closed');Stream errors are handled automatically:
// Parsing errors are logged to console
// Invalid messages are skipped
// Fatal stream errors close the connection
connection.signal.addEventListener('abort', () => {
// Connection closed due to error or normal termination
cleanup();
});The newline-delimited JSON format:
{"jsonrpc":"2.0","method":"initialize","id":0,"params":{...}}\n
{"jsonrpc":"2.0","id":0,"result":{...}}\n
{"jsonrpc":"2.0","method":"session/update","params":{...}}\nEach line is a complete JSON-RPC 2.0 message. Empty lines and lines with only whitespace are ignored.
ndJsonStream is the recommended way to create streams for stdio connections