The Anthropic SDK provides powerful streaming capabilities for real-time message generation. Streaming allows you to process responses as they arrive, providing better user experience for long-running requests.
The SDK offers two streaming approaches:
Use the stream: true parameter to get raw stream events:
client.messages.create(
params: MessageCreateParamsStreaming
): APIPromise<Stream<RawMessageStreamEvent>>;
interface MessageCreateParamsStreaming {
stream: true;
// ... other message params
}Example:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = await client.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
stream: true, // Enable streaming
messages: [
{
role: 'user',
content: 'Write a short story about a robot.',
}
],
});
// Async iteration over events
for await (const event of stream) {
if (event.type === 'content_block_start') {
console.log('Content block started:', event.content_block);
} else if (event.type === 'content_block_delta') {
if (event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
} else if (event.type === 'message_stop') {
console.log('\nStream ended');
}
}Break from the loop or abort the controller:
const stream = await client.messages.create({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
stream: true,
messages: [/* ... */],
});
let tokenCount = 0;
for await (const event of stream) {
if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
tokenCount += event.delta.text.length;
if (tokenCount > 500) {
break; // Stop iteration
}
}
}
// Or use controller
stream.controller.abort();Enhanced streaming helper that provides event-based API and automatic message accumulation:
client.messages.stream(params: MessageStreamParams): MessageStream;
class MessageStream implements AsyncIterable<MessageStreamEvent> {
// Properties
messages: MessageParam[]; // Sent messages
receivedMessages: Message[]; // Received messages
controller: AbortController; // Abort controller
currentMessage: Message | undefined; // Current accumulating message
response: Response | null; // Raw HTTP response
request_id: string | null; // Request ID
// Event methods
on(event: string, listener: Function): this;
once(event: string, listener: Function): this;
off(event: string, listener: Function): this;
emitted(event: string): Promise<any>;
// Promise methods
done(): Promise<void>;
finalMessage(): Promise<Message>;
finalText(): Promise<string>;
// Control methods
abort(): void;
withResponse(): Promise<{data: Message, response: Response, request_id: string}>;
// Conversion
toReadableStream(): ReadableStream;
[Symbol.asyncIterator](): AsyncIterator<MessageStreamEvent>;
// Static constructors
static fromReadableStream(stream: ReadableStream): MessageStream;
static createMessage(messages: Messages, params: MessageStreamParams, options?: RequestOptions): MessageStream;
}Example:
const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [
{
role: 'user',
content: 'Explain quantum computing.',
}
],
});
stream.on('text', (text) => {
console.log('Text delta:', text);
});
stream.on('message', (message) => {
console.log('Complete message:', message);
});
// Wait for completion
const finalMessage = await stream.finalMessage();
console.log('Final:', finalMessage.content);MessageStream emits the following events:
Fired when connection to the API is established:
stream.on('connect', () => void);Example:
stream.on('connect', () => {
console.log('Connected to API');
});Fired for each stream event with accumulated message snapshot:
stream.on('streamEvent', (event: MessageStreamEvent, snapshot: Message) => void);Example:
stream.on('streamEvent', (event, snapshot) => {
console.log('Event type:', event.type);
console.log('Current message state:', snapshot);
});Fired when text content is generated:
stream.on('text', (textDelta: string, textSnapshot: string) => void);Parameters:
textDelta: The new text added in this eventtextSnapshot: All text accumulated so farExample:
let fullText = '';
stream.on('text', (delta, snapshot) => {
process.stdout.write(delta); // Stream to console
fullText = snapshot; // Keep full text
});Fired when tool input JSON is being generated:
stream.on('inputJson', (partialJson: string, jsonSnapshot: unknown) => void);Parameters:
partialJson: The JSON string deltajsonSnapshot: Partially parsed JSON object (best effort)Example:
stream.on('inputJson', (partialJson, jsonSnapshot) => {
console.log('Tool input building:', jsonSnapshot);
});Fired when extended thinking content is generated:
stream.on('thinking', (thinkingDelta: string, thinkingSnapshot: string) => void);Example:
stream.on('thinking', (delta, snapshot) => {
console.log('Reasoning:', delta);
});Fired when a citation is added:
stream.on('citation', (citation: TextCitation, citationsSnapshot: TextCitation[]) => void);Example:
stream.on('citation', (citation, allCitations) => {
console.log('New citation:', citation.cited_text);
console.log('Total citations:', allCitations.length);
});Fired when a signature is generated (for extended thinking):
stream.on('signature', (signature: string) => void);Example:
stream.on('signature', (sig) => {
console.log('Signature:', sig);
});Fired when a content block is completed:
stream.on('contentBlock', (content: ContentBlock) => void);Example:
stream.on('contentBlock', (block) => {
if (block.type === 'text') {
console.log('Text block completed:', block.text);
} else if (block.type === 'tool_use') {
console.log('Tool use:', block.name, block.input);
}
});Fired when the complete message is received:
stream.on('message', (message: Message) => void);Example:
stream.on('message', (message) => {
console.log('Message ID:', message.id);
console.log('Stop reason:', message.stop_reason);
console.log('Usage:', message.usage);
});Fired after the message event with the final message:
stream.on('finalMessage', (message: Message) => void);Example:
stream.on('finalMessage', (message) => {
console.log('Stream complete. Final message:', message);
});Fired when an error occurs:
stream.on('error', (error: AnthropicError) => void);Example:
stream.on('error', (error) => {
console.error('Stream error:', error.message);
if (error instanceof Anthropic.APIError) {
console.error('Status:', error.status);
}
});Fired when the stream is aborted:
stream.on('abort', (error: APIUserAbortError) => void);Example:
stream.on('abort', (error) => {
console.log('Stream aborted by user');
});Last event fired when stream processing is complete:
stream.on('end', () => void);Example:
stream.on('end', () => {
console.log('Stream processing ended');
});Register an event listener:
stream.on(event: keyof MessageStreamEvents, listener: Function): MessageStream;Example:
stream
.on('connect', () => console.log('Connected'))
.on('text', (text) => process.stdout.write(text))
.on('error', (err) => console.error(err))
.on('end', () => console.log('\nDone'));Register a one-time event listener:
stream.once(event: keyof MessageStreamEvents, listener: Function): MessageStream;Example:
stream.once('message', (message) => {
console.log('First message received:', message.id);
});Remove an event listener:
stream.off(event: keyof MessageStreamEvents, listener: Function): MessageStream;Example:
const textHandler = (text) => console.log(text);
stream.on('text', textHandler);
// Later, remove the listener
stream.off('text', textHandler);Wait for a specific event to fire:
stream.emitted(event: keyof MessageStreamEvents): Promise<EventData>;Example:
// Wait for first text
const firstText = await stream.emitted('text');
console.log('First text received:', firstText);
// Wait for complete message
const message = await stream.emitted('message');
console.log('Message received:', message);Wait for stream to complete:
stream.done(): Promise<void>;Example:
const stream = client.messages.stream({ /* ... */ });
// Register handlers
stream.on('text', (text) => process.stdout.write(text));
// Wait for completion
await stream.done();
console.log('Stream finished');Get the complete accumulated message:
stream.finalMessage(): Promise<Message>;Example:
const stream = client.messages.stream({ /* ... */ });
const message = await stream.finalMessage();
console.log('Complete message:', message.content);
console.log('Usage:', message.usage);Get just the text content from the final message:
stream.finalText(): Promise<string>;Example:
const stream = client.messages.stream({ /* ... */ });
const text = await stream.finalText();
console.log('Response:', text);Abort the stream:
stream.abort(): void;Example:
const stream = client.messages.stream({ /* ... */ });
// Abort after 5 seconds
setTimeout(() => {
stream.abort();
}, 5000);
try {
await stream.done();
} catch (error) {
if (error instanceof Anthropic.APIUserAbortError) {
console.log('Stream was aborted');
}
}Get the stream, response, and request ID:
stream.withResponse(): Promise<{
data: MessageStream;
response: Response;
request_id: string | null;
}>;Example:
const { data: stream, response, request_id } = await client.messages
.stream({ /* ... */ })
.withResponse();
console.log('Request ID:', request_id);
console.log('Status:', response.status);
stream.on('text', (text) => console.log(text));
await stream.done();Convert to a Web ReadableStream:
stream.toReadableStream(): ReadableStream;Example:
const stream = client.messages.stream({ /* ... */ });
const readableStream = stream.toReadableStream();
// Use with Response for server-side streaming
return new Response(readableStream, {
headers: { 'Content-Type': 'text/event-stream' },
});MessageStream is async iterable:
for await (const event of stream) {
// Process event
}Example:
const stream = client.messages.stream({ /* ... */ });
for await (const event of stream) {
if (event.type === 'content_block_delta') {
if (event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
}type MessageStreamEvent =
| MessageStartEvent
| MessageDeltaEvent
| MessageStopEvent
| ContentBlockStartEvent
| ContentBlockDeltaEvent
| ContentBlockStopEvent
;
interface MessageStartEvent {
type: 'message_start';
message: Message;
}
interface MessageDeltaEvent {
type: 'message_delta';
delta: {
stop_reason: StopReason | null;
stop_sequence: string | null;
};
usage: MessageDeltaUsage;
}
interface MessageStopEvent {
type: 'message_stop';
}
interface ContentBlockStartEvent {
type: 'content_block_start';
index: number;
content_block: ContentBlock;
}
interface ContentBlockDeltaEvent {
type: 'content_block_delta';
index: number;
delta: RawContentBlockDelta;
}
interface ContentBlockStopEvent {
type: 'content_block_stop';
index: number;
}type RawContentBlockDelta =
| TextDelta
| ThinkingDelta
| InputJSONDelta
| CitationsDelta
| SignatureDelta
;
interface TextDelta {
type: 'text_delta';
text: string;
}
interface ThinkingDelta {
type: 'thinking_delta';
thinking: string;
}
interface InputJSONDelta {
type: 'input_json_delta';
partial_json: string;
}
interface CitationsDelta {
type: 'citations_delta';
citations: TextCitation[];
}
interface SignatureDelta {
type: 'signature_delta';
signature: string;
}const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [
{
role: 'user',
content: 'Write a poem about the ocean.',
}
],
});
stream.on('text', (text) => {
process.stdout.write(text);
});
await stream.done();let accumulatedText = '';
const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [
{
role: 'user',
content: 'Explain machine learning.',
}
],
});
stream.on('text', (delta, snapshot) => {
accumulatedText = snapshot;
});
await stream.done();
console.log('Complete response:', accumulatedText);const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
tools: [
{
name: 'calculator',
description: 'Perform calculations',
input_schema: {
type: 'object',
properties: {
expression: { type: 'string' },
},
required: ['expression'],
},
},
],
messages: [
{
role: 'user',
content: 'What is 15 * 23?',
}
],
});
stream.on('contentBlock', (block) => {
if (block.type === 'tool_use') {
console.log('Tool called:', block.name);
console.log('Input:', block.input);
}
});
const message = await stream.finalMessage();
if (message.stop_reason === 'tool_use') {
// Handle tool execution
const toolBlock = message.content.find(b => b.type === 'tool_use');
// Execute tool and continue conversation
}const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 4096,
thinking: {
type: 'enabled',
budget_tokens: 2000,
},
messages: [
{
role: 'user',
content: 'Solve this logic puzzle: ...',
}
],
});
stream.on('thinking', (delta, snapshot) => {
console.log('Reasoning:', delta);
});
stream.on('text', (delta) => {
console.log('Answer:', delta);
});
await stream.done();let totalTokens = 0;
const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Tell me a story.' }],
});
stream.on('text', (delta) => {
totalTokens += delta.length;
console.log(`Tokens: ${totalTokens}`);
});
stream.on('message', (message) => {
console.log('Actual usage:', message.usage);
});
await stream.done();// Backend (Express/Node.js)
app.get('/api/stream', async (req, res) => {
const stream = client.messages.stream({
model: 'claude-sonnet-4-5-20250929',
max_tokens: 1024,
messages: [{ role: 'user', content: req.query.prompt }],
});
// Convert to ReadableStream and pipe to response
const readableStream = stream.toReadableStream();
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Stream to client
for await (const chunk of readableStream) {
res.write(chunk);
}
res.end();
});
// Frontend
const eventSource = new EventSource('/api/stream?prompt=Hello');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(data);
};Or reconstruct MessageStream on frontend:
// Backend
app.get('/api/stream', async (req, res) => {
const stream = client.messages.stream({ /* ... */ });
const readableStream = stream.toReadableStream();
return new Response(readableStream, {
headers: { 'Content-Type': 'text/event-stream' },
});
});
// Frontend
const response = await fetch('/api/stream');
const stream = Anthropic.MessageStream.fromReadableStream(response.body);
stream.on('text', (text) => {
console.log(text);
});
await stream.finalMessage();const stream = client.messages.stream({ /* ... */ });
stream.on('error', (error) => {
console.error('Stream error:', error);
if (error instanceof Anthropic.APIError) {
if (error.status === 429) {
console.log('Rate limited, retry later');
}
}
});
stream.on('abort', () => {
console.log('Stream was aborted');
});
await stream.done().catch(err => {
console.error('Stream failed:', err);
});try {
const stream = client.messages.stream({ /* ... */ });
for await (const event of stream) {
// Process events
}
} catch (error) {
if (error instanceof Anthropic.APIUserAbortError) {
console.log('User aborted');
} else if (error instanceof Anthropic.APIConnectionError) {
console.log('Network error');
} else {
throw error;
}
}const stream = client.messages.stream({ /* ... */ });
// Abort after condition
stream.on('text', (delta) => {
if (delta.includes('STOP')) {
stream.abort();
}
});
await stream.done();const stream = client.messages.stream({ /* ... */ });
const timeout = setTimeout(() => {
stream.abort();
}, 30000); // 30 seconds
try {
await stream.done();
clearTimeout(timeout);
} catch (error) {
if (error instanceof Anthropic.APIUserAbortError) {
console.log('Timed out');
}
}const abortController = new AbortController();
const stream = client.messages.stream(
{ /* ... */ },
{ signal: abortController.signal }
);
// Abort externally
setTimeout(() => abortController.abort(), 10000);
await stream.done();// ✅ Good: Use stream.on for processing
const stream = client.messages.stream({ /* ... */ });
stream.on('text', (text) => {
// Process immediately, don't accumulate
sendToClient(text);
});
await stream.done();
// ❌ Bad: Accumulating everything in memory
let allText = '';
stream.on('text', (delta, snapshot) => {
allText = snapshot; // Keeps growing
});// ✅ Good: Handle errors
const stream = client.messages.stream({ /* ... */ });
stream.on('error', (error) => {
logger.error('Stream error', { error, requestId: stream.request_id });
notifyUser('Generation failed, please retry');
});
await stream.done().catch(() => {
// Already handled in error event
});// ✅ Good: Ensure cleanup
const stream = client.messages.stream({ /* ... */ });
try {
stream.on('text', (text) => saveToDatabase(text));
await stream.done();
} finally {
// Cleanup resources
closeConnections();
}