or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

batches.mdbeta-features.mdclient.mdcompletions.mderrors.mdfiles.mdindex.mdmessages.mdmodels.mdskills.mdstreaming.mdtools.mdtypes.md
tile.json

streaming.mddocs/

Streaming

The Anthropic SDK provides powerful streaming capabilities for real-time message generation. Streaming allows you to process responses as they arrive, providing better user experience for long-running requests.

Overview

The SDK offers two streaming approaches:

  1. Basic Streaming: Low-level async iteration over raw stream events
  2. MessageStream: Enhanced helper with event handlers, message accumulation, and convenience methods

Basic Streaming

Use the stream: true parameter to get raw stream events:

client.messages.create(
  params: MessageCreateParamsStreaming
): APIPromise<Stream<RawMessageStreamEvent>>;

interface MessageCreateParamsStreaming {
  stream: true;
  // ... other message params
}

Example:

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

const stream = await client.messages.create({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  stream: true,  // Enable streaming
  messages: [
    {
      role: 'user',
      content: 'Write a short story about a robot.',
    }
  ],
});

// Async iteration over events
for await (const event of stream) {
  if (event.type === 'content_block_start') {
    console.log('Content block started:', event.content_block);
  } else if (event.type === 'content_block_delta') {
    if (event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  } else if (event.type === 'message_stop') {
    console.log('\nStream ended');
  }
}

Cancellation

Break from the loop or abort the controller:

const stream = await client.messages.create({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  stream: true,
  messages: [/* ... */],
});

let tokenCount = 0;
for await (const event of stream) {
  if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
    tokenCount += event.delta.text.length;
    if (tokenCount > 500) {
      break;  // Stop iteration
    }
  }
}

// Or use controller
stream.controller.abort();

MessageStream

Enhanced streaming helper that provides event-based API and automatic message accumulation:

client.messages.stream(params: MessageStreamParams): MessageStream;

class MessageStream implements AsyncIterable<MessageStreamEvent> {
  // Properties
  messages: MessageParam[];           // Sent messages
  receivedMessages: Message[];        // Received messages
  controller: AbortController;        // Abort controller
  currentMessage: Message | undefined;  // Current accumulating message
  response: Response | null;          // Raw HTTP response
  request_id: string | null;          // Request ID

  // Event methods
  on(event: string, listener: Function): this;
  once(event: string, listener: Function): this;
  off(event: string, listener: Function): this;
  emitted(event: string): Promise<any>;

  // Promise methods
  done(): Promise<void>;
  finalMessage(): Promise<Message>;
  finalText(): Promise<string>;

  // Control methods
  abort(): void;
  withResponse(): Promise<{data: Message, response: Response, request_id: string}>;

  // Conversion
  toReadableStream(): ReadableStream;
  [Symbol.asyncIterator](): AsyncIterator<MessageStreamEvent>;

  // Static constructors
  static fromReadableStream(stream: ReadableStream): MessageStream;
  static createMessage(messages: Messages, params: MessageStreamParams, options?: RequestOptions): MessageStream;
}

Example:

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  messages: [
    {
      role: 'user',
      content: 'Explain quantum computing.',
    }
  ],
});

stream.on('text', (text) => {
  console.log('Text delta:', text);
});

stream.on('message', (message) => {
  console.log('Complete message:', message);
});

// Wait for completion
const finalMessage = await stream.finalMessage();
console.log('Final:', finalMessage.content);

Events

MessageStream emits the following events:

connect

Fired when connection to the API is established:

stream.on('connect', () => void);

Example:

stream.on('connect', () => {
  console.log('Connected to API');
});

streamEvent

Fired for each stream event with accumulated message snapshot:

stream.on('streamEvent', (event: MessageStreamEvent, snapshot: Message) => void);

Example:

stream.on('streamEvent', (event, snapshot) => {
  console.log('Event type:', event.type);
  console.log('Current message state:', snapshot);
});

text

Fired when text content is generated:

stream.on('text', (textDelta: string, textSnapshot: string) => void);

Parameters:

  • textDelta: The new text added in this event
  • textSnapshot: All text accumulated so far

Example:

let fullText = '';
stream.on('text', (delta, snapshot) => {
  process.stdout.write(delta);  // Stream to console
  fullText = snapshot;          // Keep full text
});

inputJson

Fired when tool input JSON is being generated:

stream.on('inputJson', (partialJson: string, jsonSnapshot: unknown) => void);

Parameters:

  • partialJson: The JSON string delta
  • jsonSnapshot: Partially parsed JSON object (best effort)

Example:

stream.on('inputJson', (partialJson, jsonSnapshot) => {
  console.log('Tool input building:', jsonSnapshot);
});

thinking

Fired when extended thinking content is generated:

stream.on('thinking', (thinkingDelta: string, thinkingSnapshot: string) => void);

Example:

stream.on('thinking', (delta, snapshot) => {
  console.log('Reasoning:', delta);
});

citation

Fired when a citation is added:

stream.on('citation', (citation: TextCitation, citationsSnapshot: TextCitation[]) => void);

Example:

stream.on('citation', (citation, allCitations) => {
  console.log('New citation:', citation.cited_text);
  console.log('Total citations:', allCitations.length);
});

signature

Fired when a signature is generated (for extended thinking):

stream.on('signature', (signature: string) => void);

Example:

stream.on('signature', (sig) => {
  console.log('Signature:', sig);
});

contentBlock

Fired when a content block is completed:

stream.on('contentBlock', (content: ContentBlock) => void);

Example:

stream.on('contentBlock', (block) => {
  if (block.type === 'text') {
    console.log('Text block completed:', block.text);
  } else if (block.type === 'tool_use') {
    console.log('Tool use:', block.name, block.input);
  }
});

message

Fired when the complete message is received:

stream.on('message', (message: Message) => void);

Example:

stream.on('message', (message) => {
  console.log('Message ID:', message.id);
  console.log('Stop reason:', message.stop_reason);
  console.log('Usage:', message.usage);
});

finalMessage

Fired after the message event with the final message:

stream.on('finalMessage', (message: Message) => void);

Example:

stream.on('finalMessage', (message) => {
  console.log('Stream complete. Final message:', message);
});

error

Fired when an error occurs:

stream.on('error', (error: AnthropicError) => void);

Example:

stream.on('error', (error) => {
  console.error('Stream error:', error.message);
  if (error instanceof Anthropic.APIError) {
    console.error('Status:', error.status);
  }
});

abort

Fired when the stream is aborted:

stream.on('abort', (error: APIUserAbortError) => void);

Example:

stream.on('abort', (error) => {
  console.log('Stream aborted by user');
});

end

Last event fired when stream processing is complete:

stream.on('end', () => void);

Example:

stream.on('end', () => {
  console.log('Stream processing ended');
});

Methods

on()

Register an event listener:

stream.on(event: keyof MessageStreamEvents, listener: Function): MessageStream;

Example:

stream
  .on('connect', () => console.log('Connected'))
  .on('text', (text) => process.stdout.write(text))
  .on('error', (err) => console.error(err))
  .on('end', () => console.log('\nDone'));

once()

Register a one-time event listener:

stream.once(event: keyof MessageStreamEvents, listener: Function): MessageStream;

Example:

stream.once('message', (message) => {
  console.log('First message received:', message.id);
});

off()

Remove an event listener:

stream.off(event: keyof MessageStreamEvents, listener: Function): MessageStream;

Example:

const textHandler = (text) => console.log(text);

stream.on('text', textHandler);

// Later, remove the listener
stream.off('text', textHandler);

emitted()

Wait for a specific event to fire:

stream.emitted(event: keyof MessageStreamEvents): Promise<EventData>;

Example:

// Wait for first text
const firstText = await stream.emitted('text');
console.log('First text received:', firstText);

// Wait for complete message
const message = await stream.emitted('message');
console.log('Message received:', message);

done()

Wait for stream to complete:

stream.done(): Promise<void>;

Example:

const stream = client.messages.stream({ /* ... */ });

// Register handlers
stream.on('text', (text) => process.stdout.write(text));

// Wait for completion
await stream.done();
console.log('Stream finished');

finalMessage()

Get the complete accumulated message:

stream.finalMessage(): Promise<Message>;

Example:

const stream = client.messages.stream({ /* ... */ });

const message = await stream.finalMessage();
console.log('Complete message:', message.content);
console.log('Usage:', message.usage);

finalText()

Get just the text content from the final message:

stream.finalText(): Promise<string>;

Example:

const stream = client.messages.stream({ /* ... */ });

const text = await stream.finalText();
console.log('Response:', text);

abort()

Abort the stream:

stream.abort(): void;

Example:

const stream = client.messages.stream({ /* ... */ });

// Abort after 5 seconds
setTimeout(() => {
  stream.abort();
}, 5000);

try {
  await stream.done();
} catch (error) {
  if (error instanceof Anthropic.APIUserAbortError) {
    console.log('Stream was aborted');
  }
}

withResponse()

Get the stream, response, and request ID:

stream.withResponse(): Promise<{
  data: MessageStream;
  response: Response;
  request_id: string | null;
}>;

Example:

const { data: stream, response, request_id } = await client.messages
  .stream({ /* ... */ })
  .withResponse();

console.log('Request ID:', request_id);
console.log('Status:', response.status);

stream.on('text', (text) => console.log(text));
await stream.done();

toReadableStream()

Convert to a Web ReadableStream:

stream.toReadableStream(): ReadableStream;

Example:

const stream = client.messages.stream({ /* ... */ });
const readableStream = stream.toReadableStream();

// Use with Response for server-side streaming
return new Response(readableStream, {
  headers: { 'Content-Type': 'text/event-stream' },
});

Async Iteration

MessageStream is async iterable:

for await (const event of stream) {
  // Process event
}

Example:

const stream = client.messages.stream({ /* ... */ });

for await (const event of stream) {
  if (event.type === 'content_block_delta') {
    if (event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
}

Stream Event Types

type MessageStreamEvent =
  | MessageStartEvent
  | MessageDeltaEvent
  | MessageStopEvent
  | ContentBlockStartEvent
  | ContentBlockDeltaEvent
  | ContentBlockStopEvent
  ;

interface MessageStartEvent {
  type: 'message_start';
  message: Message;
}

interface MessageDeltaEvent {
  type: 'message_delta';
  delta: {
    stop_reason: StopReason | null;
    stop_sequence: string | null;
  };
  usage: MessageDeltaUsage;
}

interface MessageStopEvent {
  type: 'message_stop';
}

interface ContentBlockStartEvent {
  type: 'content_block_start';
  index: number;
  content_block: ContentBlock;
}

interface ContentBlockDeltaEvent {
  type: 'content_block_delta';
  index: number;
  delta: RawContentBlockDelta;
}

interface ContentBlockStopEvent {
  type: 'content_block_stop';
  index: number;
}

Delta Types

type RawContentBlockDelta =
  | TextDelta
  | ThinkingDelta
  | InputJSONDelta
  | CitationsDelta
  | SignatureDelta
  ;

interface TextDelta {
  type: 'text_delta';
  text: string;
}

interface ThinkingDelta {
  type: 'thinking_delta';
  thinking: string;
}

interface InputJSONDelta {
  type: 'input_json_delta';
  partial_json: string;
}

interface CitationsDelta {
  type: 'citations_delta';
  citations: TextCitation[];
}

interface SignatureDelta {
  type: 'signature_delta';
  signature: string;
}

Usage Patterns

Simple Text Streaming

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  messages: [
    {
      role: 'user',
      content: 'Write a poem about the ocean.',
    }
  ],
});

stream.on('text', (text) => {
  process.stdout.write(text);
});

await stream.done();

Accumulating Response

let accumulatedText = '';

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  messages: [
    {
      role: 'user',
      content: 'Explain machine learning.',
    }
  ],
});

stream.on('text', (delta, snapshot) => {
  accumulatedText = snapshot;
});

await stream.done();
console.log('Complete response:', accumulatedText);

Tool Use with Streaming

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  tools: [
    {
      name: 'calculator',
      description: 'Perform calculations',
      input_schema: {
        type: 'object',
        properties: {
          expression: { type: 'string' },
        },
        required: ['expression'],
      },
    },
  ],
  messages: [
    {
      role: 'user',
      content: 'What is 15 * 23?',
    }
  ],
});

stream.on('contentBlock', (block) => {
  if (block.type === 'tool_use') {
    console.log('Tool called:', block.name);
    console.log('Input:', block.input);
  }
});

const message = await stream.finalMessage();

if (message.stop_reason === 'tool_use') {
  // Handle tool execution
  const toolBlock = message.content.find(b => b.type === 'tool_use');
  // Execute tool and continue conversation
}

Extended Thinking

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 4096,
  thinking: {
    type: 'enabled',
    budget_tokens: 2000,
  },
  messages: [
    {
      role: 'user',
      content: 'Solve this logic puzzle: ...',
    }
  ],
});

stream.on('thinking', (delta, snapshot) => {
  console.log('Reasoning:', delta);
});

stream.on('text', (delta) => {
  console.log('Answer:', delta);
});

await stream.done();

Progress Tracking

let totalTokens = 0;

const stream = client.messages.stream({
  model: 'claude-sonnet-4-5-20250929',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Tell me a story.' }],
});

stream.on('text', (delta) => {
  totalTokens += delta.length;
  console.log(`Tokens: ${totalTokens}`);
});

stream.on('message', (message) => {
  console.log('Actual usage:', message.usage);
});

await stream.done();

Server-Side Streaming (Backend to Frontend)

// Backend (Express/Node.js)
app.get('/api/stream', async (req, res) => {
  const stream = client.messages.stream({
    model: 'claude-sonnet-4-5-20250929',
    max_tokens: 1024,
    messages: [{ role: 'user', content: req.query.prompt }],
  });

  // Convert to ReadableStream and pipe to response
  const readableStream = stream.toReadableStream();

  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Stream to client
  for await (const chunk of readableStream) {
    res.write(chunk);
  }

  res.end();
});

// Frontend
const eventSource = new EventSource('/api/stream?prompt=Hello');
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log(data);
};

Or reconstruct MessageStream on frontend:

// Backend
app.get('/api/stream', async (req, res) => {
  const stream = client.messages.stream({ /* ... */ });
  const readableStream = stream.toReadableStream();

  return new Response(readableStream, {
    headers: { 'Content-Type': 'text/event-stream' },
  });
});

// Frontend
const response = await fetch('/api/stream');
const stream = Anthropic.MessageStream.fromReadableStream(response.body);

stream.on('text', (text) => {
  console.log(text);
});

await stream.finalMessage();

Error Handling

With Event Listeners

const stream = client.messages.stream({ /* ... */ });

stream.on('error', (error) => {
  console.error('Stream error:', error);

  if (error instanceof Anthropic.APIError) {
    if (error.status === 429) {
      console.log('Rate limited, retry later');
    }
  }
});

stream.on('abort', () => {
  console.log('Stream was aborted');
});

await stream.done().catch(err => {
  console.error('Stream failed:', err);
});

With Try-Catch

try {
  const stream = client.messages.stream({ /* ... */ });

  for await (const event of stream) {
    // Process events
  }
} catch (error) {
  if (error instanceof Anthropic.APIUserAbortError) {
    console.log('User aborted');
  } else if (error instanceof Anthropic.APIConnectionError) {
    console.log('Network error');
  } else {
    throw error;
  }
}

Cancellation

Manual Abort

const stream = client.messages.stream({ /* ... */ });

// Abort after condition
stream.on('text', (delta) => {
  if (delta.includes('STOP')) {
    stream.abort();
  }
});

await stream.done();

Timeout

const stream = client.messages.stream({ /* ... */ });

const timeout = setTimeout(() => {
  stream.abort();
}, 30000);  // 30 seconds

try {
  await stream.done();
  clearTimeout(timeout);
} catch (error) {
  if (error instanceof Anthropic.APIUserAbortError) {
    console.log('Timed out');
  }
}

With AbortSignal

const abortController = new AbortController();

const stream = client.messages.stream(
  { /* ... */ },
  { signal: abortController.signal }
);

// Abort externally
setTimeout(() => abortController.abort(), 10000);

await stream.done();

Best Practices

Memory Efficiency

// ✅ Good: Use stream.on for processing
const stream = client.messages.stream({ /* ... */ });

stream.on('text', (text) => {
  // Process immediately, don't accumulate
  sendToClient(text);
});

await stream.done();

// ❌ Bad: Accumulating everything in memory
let allText = '';
stream.on('text', (delta, snapshot) => {
  allText = snapshot;  // Keeps growing
});

Error Resilience

// ✅ Good: Handle errors
const stream = client.messages.stream({ /* ... */ });

stream.on('error', (error) => {
  logger.error('Stream error', { error, requestId: stream.request_id });
  notifyUser('Generation failed, please retry');
});

await stream.done().catch(() => {
  // Already handled in error event
});

Resource Cleanup

// ✅ Good: Ensure cleanup
const stream = client.messages.stream({ /* ... */ });

try {
  stream.on('text', (text) => saveToDatabase(text));
  await stream.done();
} finally {
  // Cleanup resources
  closeConnections();
}

See Also

  • Messages API - Core message creation
  • Tools - Tool use with streaming
  • Errors - Error handling reference
  • Types - Complete type definitions