or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batches.mdbeta.mdclient-initialization.mderrors.mdindex.mdmessages.mdmodels.mdplatform-clients.mdstreaming.mdtools-builtin.mdtools-decorators.mdtools-function.mdtools-memory.mdtools-runners.mdtools.mdtypes.md

streaming.mddocs/

0

# Streaming

1

2

Real-time streaming of message responses as they are generated by the Anthropic API. The streaming functionality provides both synchronous and asynchronous interfaces for receiving incremental updates during message generation, including text deltas, thinking processes, tool use, and other content blocks.

3

4

## Core Imports

5

6

```python

7

from anthropic import Anthropic, AsyncAnthropic

8

from anthropic import MessageStream, AsyncMessageStream

9

from anthropic import MessageStreamManager, AsyncMessageStreamManager

10

from anthropic import BetaMessageStream, BetaAsyncMessageStream

11

from anthropic import BetaMessageStreamManager, BetaAsyncMessageStreamManager

12

```

13

14

## Basic Usage

15

16

### Basic Synchronous Streaming

17

18

```python

19

from anthropic import Anthropic

20

21

client = Anthropic()

22

23

with client.messages.stream(

24

model="claude-sonnet-4-5-20250929",

25

max_tokens=1024,

26

messages=[{"role": "user", "content": "Tell me a story"}]

27

) as stream:

28

for event in stream:

29

if event.type == "text":

30

print(event.text, end="", flush=True)

31

32

# Get the complete message after streaming

33

message = stream.get_final_message()

34

```

35

36

### Basic Asynchronous Streaming

37

38

```python

39

from anthropic import AsyncAnthropic

40

import asyncio

41

42

client = AsyncAnthropic()

43

44

async def main():

45

async with client.messages.stream(

46

model="claude-sonnet-4-5-20250929",

47

max_tokens=1024,

48

messages=[{"role": "user", "content": "Tell me a story"}]

49

) as stream:

50

async for event in stream:

51

if event.type == "text":

52

print(event.text, end="", flush=True)

53

54

message = await stream.get_final_message()

55

56

asyncio.run(main())

57

```

58

59

### Text Stream Iterator

60

61

For simple text-only streaming, use the `text_stream` property:

62

63

```python

64

with client.messages.stream(

65

model="claude-sonnet-4-5-20250929",

66

max_tokens=1024,

67

messages=[{"role": "user", "content": "Count to 10"}]

68

) as stream:

69

for text in stream.text_stream:

70

print(text, end="", flush=True)

71

print()

72

```

73

74

## Capabilities

75

76

### Stream Management Classes

77

78

Context managers that wrap streaming responses and handle resource cleanup automatically.

79

80

```python { .api }

81

class MessageStreamManager:

82

"""

83

Synchronous context manager for MessageStream.

84

85

Returned by client.messages.stream() to enable context manager usage.

86

Automatically closes the stream when exiting the context.

87

"""

88

89

def __enter__(self) -> MessageStream: ...

90

def __exit__(

91

self,

92

exc_type: type[BaseException] | None,

93

exc: BaseException | None,

94

exc_tb: TracebackType | None

95

) -> None: ...

96

97

class AsyncMessageStreamManager:

98

"""

99

Asynchronous context manager for AsyncMessageStream.

100

101

Returned by async client.messages.stream() to enable async context manager usage.

102

Automatically closes the stream when exiting the context.

103

"""

104

105

async def __aenter__(self) -> AsyncMessageStream: ...

106

async def __aexit__(

107

self,

108

exc_type: type[BaseException] | None,

109

exc: BaseException | None,

110

exc_tb: TracebackType | None

111

) -> None: ...

112

113

class BetaMessageStreamManager(Generic[ResponseFormatT]):

114

"""

115

Synchronous context manager for BetaMessageStream.

116

117

Supports structured output parsing when ResponseFormatT is provided.

118

"""

119

120

def __enter__(self) -> BetaMessageStream[ResponseFormatT]: ...

121

def __exit__(

122

self,

123

exc_type: type[BaseException] | None,

124

exc: BaseException | None,

125

exc_tb: TracebackType | None

126

) -> None: ...

127

128

class BetaAsyncMessageStreamManager(Generic[ResponseFormatT]):

129

"""

130

Asynchronous context manager for BetaAsyncMessageStream.

131

132

Supports structured output parsing when ResponseFormatT is provided.

133

"""

134

135

async def __aenter__(self) -> BetaAsyncMessageStream[ResponseFormatT]: ...

136

async def __aexit__(

137

self,

138

exc_type: type[BaseException] | None,

139

exc: BaseException | None,

140

exc_tb: TracebackType | None

141

) -> None: ...

142

```

143

144

### Message Stream Classes

145

146

High-level wrappers that accumulate streaming events and provide access to the final message.

147

148

```python { .api }

149

class MessageStream:

150

"""

151

Synchronous message stream with automatic event accumulation.

152

153

Provides both raw stream events and convenient text_stream iterator.

154

Accumulates content into a final Message object.

155

"""

156

157

text_stream: Iterator[str]

158

"""Iterator over just the text deltas in the stream."""

159

160

@property

161

def response(self) -> httpx.Response:

162

"""The underlying HTTP response object."""

163

...

164

165

@property

166

def request_id(self) -> str | None:

167

"""The request ID from response headers."""

168

...

169

170

@property

171

def current_message_snapshot(self) -> Message:

172

"""Current accumulated message state."""

173

...

174

175

def __iter__(self) -> Iterator[MessageStreamEvent]:

176

"""Iterate over stream events."""

177

...

178

179

def __enter__(self) -> Self:

180

"""Enter context manager."""

181

...

182

183

def __exit__(

184

self,

185

exc_type: type[BaseException] | None,

186

exc: BaseException | None,

187

exc_tb: TracebackType | None

188

) -> None:

189

"""Exit context manager and close stream."""

190

...

191

192

def close(self) -> None:

193

"""

194

Close the response and release the connection.

195

196

Automatically called if the response body is read to completion.

197

"""

198

...

199

200

def get_final_message(self) -> Message:

201

"""

202

Waits until the stream has been read to completion and returns

203

the accumulated Message object.

204

"""

205

...

206

207

def get_final_text(self) -> str:

208

"""

209

Returns all text content blocks concatenated together.

210

211

Raises RuntimeError if no text content blocks were returned.

212

"""

213

...

214

215

def until_done(self) -> None:

216

"""Blocks until the stream has been consumed."""

217

...

218

219

class AsyncMessageStream:

220

"""

221

Asynchronous message stream with automatic event accumulation.

222

223

Async equivalent of MessageStream with the same functionality.

224

"""

225

226

text_stream: AsyncIterator[str]

227

"""Async iterator over just the text deltas in the stream."""

228

229

@property

230

def response(self) -> httpx.Response:

231

"""The underlying HTTP response object."""

232

...

233

234

@property

235

def request_id(self) -> str | None:

236

"""The request ID from response headers."""

237

...

238

239

@property

240

def current_message_snapshot(self) -> Message:

241

"""Current accumulated message state."""

242

...

243

244

def __aiter__(self) -> AsyncIterator[MessageStreamEvent]:

245

"""Async iterate over stream events."""

246

...

247

248

async def __aenter__(self) -> Self:

249

"""Enter async context manager."""

250

...

251

252

async def __aexit__(

253

self,

254

exc_type: type[BaseException] | None,

255

exc: BaseException | None,

256

exc_tb: TracebackType | None

257

) -> None:

258

"""Exit async context manager and close stream."""

259

...

260

261

async def close(self) -> None:

262

"""

263

Close the response and release the connection.

264

265

Automatically called if the response body is read to completion.

266

"""

267

...

268

269

async def get_final_message(self) -> Message:

270

"""

271

Waits until the stream has been read to completion and returns

272

the accumulated Message object.

273

"""

274

...

275

276

async def get_final_text(self) -> str:

277

"""

278

Returns all text content blocks concatenated together.

279

280

Raises RuntimeError if no text content blocks were returned.

281

"""

282

...

283

284

async def until_done(self) -> None:

285

"""Waits until the stream has been consumed."""

286

...

287

```

288

289

### Beta Message Stream Classes

290

291

Extended streaming classes with support for beta features like structured outputs and advanced content types.

292

293

```python { .api }

294

class BetaMessageStream(Generic[ResponseFormatT]):

295

"""

296

Synchronous beta message stream with structured output support.

297

298

Supports parsing of structured outputs when output_format is provided.

299

"""

300

301

text_stream: Iterator[str]

302

"""Iterator over just the text deltas in the stream."""

303

304

@property

305

def response(self) -> httpx.Response:

306

"""The underlying HTTP response object."""

307

...

308

309

@property

310

def request_id(self) -> str | None:

311

"""The request ID from response headers."""

312

...

313

314

@property

315

def current_message_snapshot(self) -> ParsedBetaMessage[ResponseFormatT]:

316

"""Current accumulated message state with parsed output."""

317

...

318

319

def __iter__(self) -> Iterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:

320

"""Iterate over stream events."""

321

...

322

323

def __enter__(self) -> Self:

324

"""Enter context manager."""

325

...

326

327

def __exit__(

328

self,

329

exc_type: type[BaseException] | None,

330

exc: BaseException | None,

331

exc_tb: TracebackType | None

332

) -> None:

333

"""Exit context manager and close stream."""

334

...

335

336

def close(self) -> None:

337

"""

338

Close the response and release the connection.

339

340

Automatically called if the response body is read to completion.

341

"""

342

...

343

344

def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:

345

"""

346

Waits until the stream has been read to completion and returns

347

the accumulated ParsedBetaMessage object with structured output.

348

"""

349

...

350

351

def get_final_text(self) -> str:

352

"""

353

Returns all text content blocks concatenated together.

354

355

Raises RuntimeError if no text content blocks were returned.

356

"""

357

...

358

359

def until_done(self) -> None:

360

"""Blocks until the stream has been consumed."""

361

...

362

363

class BetaAsyncMessageStream(Generic[ResponseFormatT]):

364

"""

365

Asynchronous beta message stream with structured output support.

366

367

Async equivalent of BetaMessageStream with the same functionality.

368

"""

369

370

text_stream: AsyncIterator[str]

371

"""Async iterator over just the text deltas in the stream."""

372

373

@property

374

def response(self) -> httpx.Response:

375

"""The underlying HTTP response object."""

376

...

377

378

@property

379

def request_id(self) -> str | None:

380

"""The request ID from response headers."""

381

...

382

383

@property

384

def current_message_snapshot(self) -> ParsedBetaMessage[ResponseFormatT]:

385

"""Current accumulated message state with parsed output."""

386

...

387

388

def __aiter__(self) -> AsyncIterator[ParsedBetaMessageStreamEvent[ResponseFormatT]]:

389

"""Async iterate over stream events."""

390

...

391

392

async def __aenter__(self) -> Self:

393

"""Enter async context manager."""

394

...

395

396

async def __aexit__(

397

self,

398

exc_type: type[BaseException] | None,

399

exc: BaseException | None,

400

exc_tb: TracebackType | None

401

) -> None:

402

"""Exit async context manager and close stream."""

403

...

404

405

async def close(self) -> None:

406

"""

407

Close the response and release the connection.

408

409

Automatically called if the response body is read to completion.

410

"""

411

...

412

413

async def get_final_message(self) -> ParsedBetaMessage[ResponseFormatT]:

414

"""

415

Waits until the stream has been read to completion and returns

416

the accumulated ParsedBetaMessage object with structured output.

417

"""

418

...

419

420

async def get_final_text(self) -> str:

421

"""

422

Returns all text content blocks concatenated together.

423

424

Raises RuntimeError if no text content blocks were returned.

425

"""

426

...

427

428

async def until_done(self) -> None:

429

"""Waits until the stream has been consumed."""

430

...

431

```

432

433

### Stream Event Types

434

435

Event objects emitted during streaming that represent different types of content and state changes.

436

437

```python { .api }

438

class TextEvent:

439

"""

440

Emitted when a text delta is received.

441

442

Provides both the incremental delta and complete accumulated text.

443

"""

444

445

type: Literal["text"]

446

text: str

447

"""The text delta"""

448

449

snapshot: str

450

"""The entire accumulated text"""

451

452

class CitationEvent:

453

"""

454

Emitted when a citation is added to text content.

455

"""

456

457

type: Literal["citation"]

458

citation: Citation

459

"""The new citation"""

460

461

snapshot: List[Citation]

462

"""All of the accumulated citations"""

463

464

class ThinkingEvent:

465

"""

466

Emitted when a thinking delta is received during extended thinking.

467

"""

468

469

type: Literal["thinking"]

470

thinking: str

471

"""The thinking delta"""

472

473

snapshot: str

474

"""The accumulated thinking so far"""

475

476

class SignatureEvent:

477

"""

478

Emitted when the signature of a thinking block is received.

479

"""

480

481

type: Literal["signature"]

482

signature: str

483

"""The signature of the thinking block"""

484

485

class InputJsonEvent:

486

"""

487

Emitted when tool input JSON is being streamed.

488

489

Provides partial JSON string and continuously parsed object.

490

"""

491

492

type: Literal["input_json"]

493

partial_json: str

494

"""

495

A partial JSON string delta

496

497

e.g. '"San Francisco,'

498

"""

499

500

snapshot: object

501

"""

502

The currently accumulated parsed object.

503

504

e.g. {'location': 'San Francisco, CA'}

505

"""

506

507

class MessageStopEvent:

508

"""

509

Emitted when message generation completes.

510

511

Contains the final accumulated message.

512

"""

513

514

type: Literal["message_stop"]

515

message: Message

516

517

class ContentBlockStopEvent:

518

"""

519

Emitted when a content block completes.

520

521

Contains the final content block.

522

"""

523

524

type: Literal["content_block_stop"]

525

content_block: ContentBlock

526

527

MessageStreamEvent = Union[

528

TextEvent,

529

CitationEvent,

530

ThinkingEvent,

531

SignatureEvent,

532

InputJsonEvent,

533

RawMessageStartEvent,

534

RawMessageDeltaEvent,

535

MessageStopEvent,

536

RawContentBlockStartEvent,

537

RawContentBlockDeltaEvent,

538

ContentBlockStopEvent,

539

]

540

"""Union of all possible message stream events."""

541

```

542

543

### Beta Stream Event Types

544

545

Extended event types for beta streaming features including structured outputs.

546

547

```python { .api }

548

class ParsedBetaTextEvent:

549

"""

550

Beta text event with structured output parsing support.

551

"""

552

553

type: Literal["text"]

554

text: str

555

"""The text delta"""

556

557

snapshot: str

558

"""The entire accumulated text"""

559

560

def parsed_snapshot(self) -> Dict[str, Any]:

561

"""

562

Parse the accumulated text as JSON.

563

564

Uses partial mode for incomplete JSON structures.

565

"""

566

...

567

568

class BetaCitationEvent:

569

"""

570

Beta citation event (same as standard CitationEvent).

571

"""

572

573

type: Literal["citation"]

574

citation: Citation

575

"""The new citation"""

576

577

snapshot: List[Citation]

578

"""All of the accumulated citations"""

579

580

class BetaThinkingEvent:

581

"""

582

Beta thinking event (same as standard ThinkingEvent).

583

"""

584

585

type: Literal["thinking"]

586

thinking: str

587

"""The thinking delta"""

588

589

snapshot: str

590

"""The accumulated thinking so far"""

591

592

class BetaSignatureEvent:

593

"""

594

Beta signature event (same as standard SignatureEvent).

595

"""

596

597

type: Literal["signature"]

598

signature: str

599

"""The signature of the thinking block"""

600

601

class BetaInputJsonEvent:

602

"""

603

Beta input JSON event (same as standard InputJsonEvent).

604

"""

605

606

type: Literal["input_json"]

607

partial_json: str

608

"""A partial JSON string delta"""

609

610

snapshot: object

611

"""The currently accumulated parsed object."""

612

613

class ParsedBetaMessageStopEvent(Generic[ResponseFormatT]):

614

"""

615

Beta message stop event with structured output.

616

"""

617

618

type: Literal["message_stop"]

619

message: ParsedBetaMessage[ResponseFormatT]

620

621

class ParsedBetaContentBlockStopEvent(Generic[ResponseFormatT]):

622

"""

623

Beta content block stop event with structured output.

624

"""

625

626

type: Literal["content_block_stop"]

627

content_block: ParsedBetaContentBlock[ResponseFormatT]

628

629

ParsedBetaMessageStreamEvent = Union[

630

ParsedBetaTextEvent,

631

BetaCitationEvent,

632

BetaThinkingEvent,

633

BetaSignatureEvent,

634

BetaInputJsonEvent,

635

BetaRawMessageStartEvent,

636

BetaRawMessageDeltaEvent,

637

ParsedBetaMessageStopEvent[ResponseFormatT],

638

BetaRawContentBlockStartEvent,

639

BetaRawContentBlockDeltaEvent,

640

ParsedBetaContentBlockStopEvent[ResponseFormatT],

641

]

642

"""Union of all possible beta message stream events."""

643

```

644

645

## Usage Examples

646

647

### Event Filtering and Handling

648

649

Process specific event types from the stream:

650

651

```python

652

from anthropic import Anthropic

653

654

client = Anthropic()

655

656

with client.messages.stream(

657

model="claude-sonnet-4-5-20250929",

658

max_tokens=1024,

659

messages=[{"role": "user", "content": "Explain quantum computing"}]

660

) as stream:

661

for event in stream:

662

if event.type == "text":

663

print(event.text, end="", flush=True)

664

elif event.type == "content_block_stop":

665

print("\n\nContent block finished:", event.content_block.type)

666

elif event.type == "message_stop":

667

print("\n\nMessage complete!")

668

print("Stop reason:", event.message.stop_reason)

669

print("Tokens used:", event.message.usage.output_tokens)

670

```

671

672

### Streaming with Thinking

673

674

Access extended thinking process during streaming:

675

676

```python

677

from anthropic import Anthropic

678

679

client = Anthropic()

680

681

with client.messages.stream(

682

model="claude-sonnet-4-5-20250929",

683

max_tokens=3200,

684

thinking={"type": "enabled", "budget_tokens": 1600},

685

messages=[{"role": "user", "content": "Create a haiku about Anthropic."}]

686

) as stream:

687

thinking_started = False

688

text_started = False

689

690

for event in stream:

691

if event.type == "thinking":

692

if not thinking_started:

693

print("Thinking:\n---------")

694

thinking_started = True

695

print(event.thinking, end="", flush=True)

696

697

elif event.type == "text":

698

if not text_started:

699

print("\n\nText:\n-----")

700

text_started = True

701

print(event.text, end="", flush=True)

702

```

703

704

### Structured Output Streaming

705

706

Stream structured outputs with incremental parsing:

707

708

```python

709

import pydantic

710

from anthropic import Anthropic

711

712

class Order(pydantic.BaseModel):

713

product_name: str

714

price: float

715

quantity: int

716

717

client = Anthropic()

718

719

prompt = """

720

Extract the product name, price, and quantity from this customer message:

721

"Hi, I'd like to order 2 packs of Green Tea for 5.50 dollars each."

722

"""

723

724

with client.beta.messages.stream(

725

model="claude-sonnet-4-5-20250929-structured-outputs",

726

messages=[{"role": "user", "content": prompt}],

727

betas=["structured-outputs-2025-09-17"],

728

max_tokens=1024,

729

output_format=Order,

730

) as stream:

731

for event in stream:

732

if event.type == "text":

733

# Access incrementally parsed structured output

734

print(event.parsed_snapshot())

735

736

# Get final parsed output

737

final_message = stream.get_final_message()

738

if final_message.parsed_output:

739

order = final_message.parsed_output

740

print(f"Product: {order.product_name}")

741

print(f"Price: ${order.price}")

742

print(f"Quantity: {order.quantity}")

743

```

744

745

### Async Streaming with Error Handling

746

747

Handle errors gracefully in async streaming:

748

749

```python

750

from anthropic import AsyncAnthropic

751

import asyncio

752

753

client = AsyncAnthropic()

754

755

async def stream_with_error_handling():

756

try:

757

async with client.messages.stream(

758

model="claude-sonnet-4-5-20250929",

759

max_tokens=1024,

760

messages=[{"role": "user", "content": "Tell me a joke"}]

761

) as stream:

762

async for event in stream:

763

if event.type == "text":

764

print(event.text, end="", flush=True)

765

766

message = await stream.get_final_message()

767

print(f"\n\nFinal message ID: {message.id}")

768

769

except Exception as e:

770

print(f"Error during streaming: {e}")

771

772

asyncio.run(stream_with_error_handling())

773

```

774

775

### Tool Use Streaming

776

777

Stream tool use and input JSON as it's generated:

778

779

```python

780

from anthropic import Anthropic

781

782

client = Anthropic()

783

784

tools = [

785

{

786

"name": "get_weather",

787

"description": "Get the weather for a location",

788

"input_schema": {

789

"type": "object",

790

"properties": {

791

"location": {"type": "string"},

792

"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}

793

},

794

"required": ["location"]

795

}

796

}

797

]

798

799

with client.messages.stream(

800

model="claude-sonnet-4-5-20250929",

801

max_tokens=1024,

802

tools=tools,

803

messages=[{"role": "user", "content": "What's the weather in San Francisco?"}]

804

) as stream:

805

for event in stream:

806

if event.type == "content_block_start":

807

if event.content_block.type == "tool_use":

808

print(f"\nTool: {event.content_block.name}")

809

810

elif event.type == "input_json":

811

print(f"\nPartial input: {event.snapshot}")

812

813

elif event.type == "content_block_stop":

814

if event.content_block.type == "tool_use":

815

print(f"\nFinal input: {event.content_block.input}")

816

```

817

818

### Manual Stream Lifecycle

819

820

Control stream lifecycle without context manager:

821

822

```python

823

from anthropic import Anthropic

824

825

client = Anthropic()

826

827

# Start stream without context manager

828

stream_manager = client.messages.stream(

829

model="claude-sonnet-4-5-20250929",

830

max_tokens=1024,

831

messages=[{"role": "user", "content": "Count to 5"}]

832

)

833

834

try:

835

stream = stream_manager.__enter__()

836

837

for event in stream:

838

if event.type == "text":

839

print(event.text, end="", flush=True)

840

841

message = stream.get_final_message()

842

print(f"\n\nMessage ID: {message.id}")

843

844

finally:

845

stream_manager.__exit__(None, None, None)

846

```

847

848

### Accessing Response Metadata

849

850

Access HTTP response information during streaming:

851

852

```python

853

from anthropic import Anthropic

854

855

client = Anthropic()

856

857

with client.messages.stream(

858

model="claude-sonnet-4-5-20250929",

859

max_tokens=1024,

860

messages=[{"role": "user", "content": "Hello"}]

861

) as stream:

862

# Access response metadata

863

print(f"Request ID: {stream.request_id}")

864

print(f"Response headers: {stream.response.headers}")

865

866

for event in stream:

867

if event.type == "text":

868

print(event.text, end="", flush=True)

869

870

# Access final snapshot before closing

871

final_snapshot = stream.current_message_snapshot

872

print(f"\n\nFinal token count: {final_snapshot.usage.output_tokens}")

873

```

874

875

## Types

876

877

### Type Aliases and Backward Compatibility

878

879

```python { .api }

880

# Backward compatibility aliases for beta types

881

BetaTextEvent = ParsedBetaTextEvent

882

BetaMessageStopEvent = ParsedBetaMessageStopEvent[object]

883

BetaMessageStreamEvent = ParsedBetaMessageStreamEvent

884

BetaContentBlockStopEvent = ParsedBetaContentBlockStopEvent[object]

885

```

886

887

### Generic Type Parameters

888

889

```python { .api }

890

ResponseFormatT = TypeVar('ResponseFormatT')

891

"""

892

Type variable for structured output format in beta streaming.

893

894

Can be a Pydantic model or other response format definition.

895

"""

896

```

897