or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

agent-connection.mdclient-connection.mderrors.mdindex.mdinterfaces.mdprotocol-types.mdstream.md
tile.json

stream.mddocs/

Stream Management

The ACP SDK uses bidirectional streams for JSON-RPC 2.0 message exchange. The Stream type and ndJsonStream function provide the foundation for ACP communication.

Stream Type

The Stream type defines a bidirectional message channel.

type Stream = {
  writable: WritableStream<AnyMessage>;
  readable: ReadableStream<AnyMessage>;
};

Properties:

  • writable: WritableStream for sending messages to the other party
  • readable: ReadableStream for receiving messages from the other party

AnyMessage Type: A union of all JSON-RPC 2.0 message types (requests, responses, and notifications).

ndJsonStream Function

Creates an ACP Stream from a pair of newline-delimited JSON streams.

/**
 * Creates an ACP Stream from newline-delimited JSON streams
 * @param output - Writable stream to send encoded messages to
 * @param input - Readable stream to receive encoded messages from
 * @returns Stream for bidirectional ACP communication
 */
function ndJsonStream(
  output: WritableStream<Uint8Array>,
  input: ReadableStream<Uint8Array>
): Stream

Parameters:

  • output: WritableStream<Uint8Array> for encoded output (newline-delimited JSON)
  • input: ReadableStream<Uint8Array> for encoded input (newline-delimited JSON)

Returns:

  • Stream object with readable/writable streams of AnyMessage objects

Description: This is the typical way to handle ACP connections over stdio, converting between AnyMessage objects and newline-delimited JSON. Each message is:

  1. Serialized to JSON
  2. Appended with a newline character
  3. Encoded to UTF-8 bytes

Messages are parsed by:

  1. Decoding UTF-8 bytes to text
  2. Splitting on newline characters
  3. Parsing each line as JSON

Usage Examples

Node.js stdio Connection

import { ndJsonStream, AgentSideConnection } from '@agentclientprotocol/sdk';
import { Readable, Writable } from 'stream';

// Convert Node.js streams to Web Streams
function nodeStreamToWebReadable(nodeStream: Readable): ReadableStream<Uint8Array> {
  return new ReadableStream({
    start(controller) {
      nodeStream.on('data', (chunk) => {
        controller.enqueue(chunk);
      });

      nodeStream.on('end', () => {
        controller.close();
      });

      nodeStream.on('error', (error) => {
        controller.error(error);
      });
    }
  });
}

function nodeStreamToWebWritable(nodeStream: Writable): WritableStream<Uint8Array> {
  return new WritableStream({
    write(chunk) {
      return new Promise((resolve, reject) => {
        nodeStream.write(chunk, (error) => {
          if (error) reject(error);
          else resolve();
        });
      });
    }
  });
}

// Create ACP stream from stdio
const stream = ndJsonStream(
  nodeStreamToWebWritable(process.stdout),
  nodeStreamToWebReadable(process.stdin)
);

// Use with connection
const connection = new AgentSideConnection(
  (conn) => agent,
  stream
);

Custom Stream Implementation

You can also implement custom streams for other transports:

import { Stream, AnyMessage } from '@agentclientprotocol/sdk';

// Example: WebSocket-based stream
function webSocketStream(ws: WebSocket): Stream {
  const readable = new ReadableStream<AnyMessage>({
    start(controller) {
      ws.addEventListener('message', (event) => {
        try {
          const message = JSON.parse(event.data) as AnyMessage;
          controller.enqueue(message);
        } catch (error) {
          console.error('Failed to parse message:', error);
        }
      });

      ws.addEventListener('close', () => {
        controller.close();
      });

      ws.addEventListener('error', (error) => {
        controller.error(error);
      });
    }
  });

  const writable = new WritableStream<AnyMessage>({
    write(message) {
      ws.send(JSON.stringify(message));
    }
  });

  return { readable, writable };
}

// Use with connection
const ws = new WebSocket('ws://localhost:8080');
const stream = webSocketStream(ws);

const connection = new ClientSideConnection(
  (agent) => client,
  stream
);

TCP Socket Connection

import * as net from 'net';
import { ndJsonStream } from '@agentclientprotocol/sdk';

// Connect to TCP socket
const socket = net.connect({ port: 8080, host: 'localhost' });

const stream = ndJsonStream(
  nodeStreamToWebWritable(socket),
  nodeStreamToWebReadable(socket)
);

const connection = new AgentSideConnection(
  (conn) => agent,
  stream
);

// Handle connection events
socket.on('connect', () => {
  console.log('Connected to agent');
});

socket.on('end', () => {
  console.log('Connection closed');
});

socket.on('error', (error) => {
  console.error('Socket error:', error);
});

In-Memory Stream for Testing

import { Stream, AnyMessage } from '@agentclientprotocol/sdk';

// Create paired streams for testing
function createTestStreams(): { agent: Stream; client: Stream } {
  // Message queues
  const agentToClient: AnyMessage[] = [];
  const clientToAgent: AnyMessage[] = [];

  // Agent's readable stream (receives from client)
  const agentReadable = new ReadableStream<AnyMessage>({
    async pull(controller) {
      while (clientToAgent.length === 0) {
        await new Promise(resolve => setTimeout(resolve, 10));
      }
      controller.enqueue(clientToAgent.shift()!);
    }
  });

  // Agent's writable stream (sends to client)
  const agentWritable = new WritableStream<AnyMessage>({
    write(message) {
      agentToClient.push(message);
    }
  });

  // Client's readable stream (receives from agent)
  const clientReadable = new ReadableStream<AnyMessage>({
    async pull(controller) {
      while (agentToClient.length === 0) {
        await new Promise(resolve => setTimeout(resolve, 10));
      }
      controller.enqueue(agentToClient.shift()!);
    }
  });

  // Client's writable stream (sends to agent)
  const clientWritable = new WritableStream<AnyMessage>({
    write(message) {
      clientToAgent.push(message);
    }
  });

  return {
    agent: { readable: agentReadable, writable: agentWritable },
    client: { readable: clientReadable, writable: clientWritable }
  };
}

// Use in tests
const { agent: agentStream, client: clientStream } = createTestStreams();

const agentConn = new AgentSideConnection((conn) => agent, agentStream);
const clientConn = new ClientSideConnection((agent) => client, clientStream);

// Now they can communicate
const initResponse = await clientConn.initialize({
  protocolVersion: 1,
  capabilities: {},
  clientInfo: { name: 'test-client', version: '1.0.0' }
});

Stream Lifecycle

Both AgentSideConnection and ClientSideConnection manage stream lifecycle:

  1. Creation: Stream is provided to connection constructor
  2. Active: Connection reads from readable and writes to writable
  3. Closure: When readable stream ends, connection closes
  4. Cleanup: Connection emits abort signal and closes promise resolves
const connection = new AgentSideConnection(
  (conn) => agent,
  stream
);

// Listen for closure
connection.signal.addEventListener('abort', () => {
  console.log('Stream closed, connection ended');
});

// Wait for closure
await connection.closed;
console.log('Connection has closed');

Error Handling

Stream errors are handled automatically:

// Parsing errors are logged to console
// Invalid messages are skipped
// Fatal stream errors close the connection

connection.signal.addEventListener('abort', () => {
  // Connection closed due to error or normal termination
  cleanup();
});

Protocol Format

The newline-delimited JSON format:

{"jsonrpc":"2.0","method":"initialize","id":0,"params":{...}}\n
{"jsonrpc":"2.0","id":0,"result":{...}}\n
{"jsonrpc":"2.0","method":"session/update","params":{...}}\n

Each line is a complete JSON-RPC 2.0 message. Empty lines and lines with only whitespace are ignored.

Notes

  • ndJsonStream is the recommended way to create streams for stdio connections
  • Custom stream implementations are supported for other transports
  • Streams must use the Web Streams API (ReadableStream/WritableStream)
  • Message parsing errors are logged but don't terminate the connection
  • The connection automatically manages stream lifecycle
  • Both sides of the connection use the same Stream type