or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

agent.mdindex.mdmessages.mdmodels.mdoutput.mdsettings.mdstreaming.mdtools.md

streaming.mddocs/

0

# Streaming and Async

1

2

Comprehensive streaming support for real-time interactions with immediate validation, delta updates, and event handling. Includes both async and sync streaming interfaces.

3

4

## Capabilities

5

6

### Agent Streaming

7

8

Stream agent responses in real-time with comprehensive event handling.

9

10

```python { .api }

11

class AgentStream[AgentDepsT, OutputDataT]:

12

"""

13

Agent stream interface for real-time response processing.

14

"""

15

async def __anext__(self) -> AgentStreamEvent[AgentDepsT, OutputDataT]:

16

"""

17

Get next event from the stream.

18

19

Returns:

20

Next stream event (part delta, tool call, result, etc.)

21

"""

22

23

async def get_final_result(self) -> FinalResult[OutputDataT]:

24

"""

25

Get the final result after stream completion.

26

27

Returns:

28

Final result with complete data and metadata

29

"""

30

31

async def run_stream(

32

self,

33

user_prompt: str,

34

*,

35

message_history: list[ModelMessage] | None = None,

36

deps: AgentDepsT = None,

37

model_settings: ModelSettings | None = None

38

) -> AgentStream[AgentDepsT, OutputDataT]:

39

"""

40

Run agent with streaming response.

41

42

Parameters:

43

- user_prompt: User's input message

44

- message_history: Previous conversation messages

45

- deps: Dependencies to pass to tools and system prompt

46

- model_settings: Model settings for this run

47

48

Returns:

49

Async iterable stream of agent events

50

"""

51

```

52

53

### Streaming Results

54

55

Result objects for streaming operations.

56

57

```python { .api }

58

class StreamedRunResult[AgentDepsT, OutputDataT]:

59

"""

60

Result from a streamed agent run.

61

"""

62

def __init__(

63

self,

64

stream: AgentStream[AgentDepsT, OutputDataT],

65

agent: AbstractAgent[AgentDepsT, OutputDataT]

66

): ...

67

68

async def stream_events(self) -> AsyncIterator[AgentStreamEvent]: ...

69

async def get_final_result(self) -> FinalResult[OutputDataT]: ...

70

71

class FinalResult[OutputDataT]:

72

"""

73

Final result marker containing complete response data.

74

"""

75

data: OutputDataT

76

usage: RunUsage

77

messages: list[ModelMessage]

78

cost: float | None

79

```

80

81

### Model-Level Streaming

82

83

Direct model streaming interfaces for low-level control.

84

85

```python { .api }

86

class StreamedResponse:

87

"""

88

Streamed model response for real-time processing.

89

"""

90

async def __aiter__(self) -> AsyncIterator[ModelResponseStreamEvent]:

91

"""Iterate over streaming response events."""

92

93

async def get_final_response(self) -> ModelResponse:

94

"""Get complete response after streaming finishes."""

95

96

class StreamedResponseSync:

97

"""

98

Synchronous streamed response for non-async contexts.

99

"""

100

def __iter__(self) -> Iterator[ModelResponseStreamEvent]:

101

"""Iterate over streaming response events synchronously."""

102

103

def get_final_response(self) -> ModelResponse:

104

"""Get complete response after streaming finishes."""

105

```

106

107

### Direct Model Streaming Functions

108

109

Functions for direct model interaction with streaming.

110

111

```python { .api }

112

async def model_request_stream(

113

model: Model,

114

messages: list[ModelMessage],

115

*,

116

model_settings: ModelSettings | None = None

117

) -> StreamedResponse:

118

"""

119

Make streaming request directly to model.

120

121

Parameters:

122

- model: Model to request from

123

- messages: Conversation messages

124

- model_settings: Model configuration

125

126

Returns:

127

Streaming response with real-time updates

128

"""

129

130

def model_request_stream_sync(

131

model: Model,

132

messages: list[ModelMessage],

133

*,

134

model_settings: ModelSettings | None = None

135

) -> StreamedResponseSync:

136

"""

137

Make streaming request directly to model synchronously.

138

139

Parameters:

140

- model: Model to request from

141

- messages: Conversation messages

142

- model_settings: Model configuration

143

144

Returns:

145

Synchronous streaming response

146

"""

147

```

148

149

### Stream Event Types

150

151

Comprehensive event types for different streaming scenarios.

152

153

```python { .api }

154

class PartStartEvent:

155

"""Event fired when a new response part starts."""

156

part: ModelResponsePart

157

kind: Literal['part-start']

158

159

class PartDeltaEvent:

160

"""Event fired for incremental updates to response parts."""

161

delta: ModelResponsePartDelta

162

kind: Literal['part-delta']

163

164

class FinalResultEvent:

165

"""Event fired when final result is ready."""

166

result: Any

167

kind: Literal['final-result']

168

169

class FunctionToolCallEvent:

170

"""Event fired when function tool is called."""

171

tool_name: str

172

args: dict[str, Any]

173

tool_id: str | None

174

kind: Literal['function-tool-call']

175

176

class FunctionToolResultEvent:

177

"""Event fired when function tool returns result."""

178

tool_name: str

179

result: Any

180

tool_id: str | None

181

kind: Literal['function-tool-result']

182

183

class BuiltinToolCallEvent:

184

"""Event fired when built-in tool is called."""

185

tool_name: str

186

args: dict[str, Any]

187

tool_id: str | None

188

kind: Literal['builtin-tool-call']

189

190

class BuiltinToolResultEvent:

191

"""Event fired when built-in tool returns result."""

192

tool_name: str

193

result: Any

194

tool_id: str | None

195

kind: Literal['builtin-tool-result']

196

197

ModelResponseStreamEvent = PartStartEvent | PartDeltaEvent

198

199

HandleResponseEvent = (

200

FunctionToolCallEvent |

201

FunctionToolResultEvent |

202

BuiltinToolCallEvent |

203

BuiltinToolResultEvent

204

)

205

206

AgentStreamEvent = (

207

ModelResponseStreamEvent |

208

HandleResponseEvent |

209

FinalResultEvent

210

)

211

```

212

213

### Delta Types

214

215

Delta update types for incremental streaming updates.

216

217

```python { .api }

218

class TextPartDelta:

219

"""Incremental text content update."""

220

content: str

221

kind: Literal['text']

222

223

class ThinkingPartDelta:

224

"""Incremental thinking content update (for reasoning models)."""

225

content: str

226

kind: Literal['thinking']

227

228

class ToolCallPartDelta:

229

"""Incremental tool call update."""

230

tool_name: str | None

231

args: dict[str, Any] | None

232

tool_id: str | None

233

kind: Literal['tool-call']

234

235

ModelResponsePartDelta = (

236

TextPartDelta |

237

ThinkingPartDelta |

238

ToolCallPartDelta

239

)

240

```

241

242

### Stream Event Handlers

243

244

Event handler interfaces for processing streaming events.

245

246

```python { .api }

247

class EventStreamHandler[AgentDepsT]:

248

"""

249

Handler for streaming events during agent execution.

250

"""

251

async def on_model_request(

252

self,

253

messages: list[ModelMessage]

254

) -> None:

255

"""Called when model request is about to be made."""

256

257

async def on_model_response_start(self) -> None:

258

"""Called when model response starts streaming."""

259

260

async def on_model_response_part(

261

self,

262

part: ModelResponsePart

263

) -> None:

264

"""Called for each response part."""

265

266

async def on_tool_call(

267

self,

268

tool_name: str,

269

args: dict[str, Any]

270

) -> None:

271

"""Called when tool is about to be called."""

272

273

async def on_tool_result(

274

self,

275

tool_name: str,

276

result: Any

277

) -> None:

278

"""Called when tool returns result."""

279

```

280

281

## Usage Examples

282

283

### Basic Agent Streaming

284

285

```python

286

import asyncio

287

from pydantic_ai import Agent

288

289

agent = Agent(

290

model='gpt-4',

291

system_prompt='You are a helpful assistant.'

292

)

293

294

async def stream_response():

295

stream = await agent.run_stream('Tell me a story about a robot')

296

297

async for event in stream:

298

if event.kind == 'part-delta' and event.delta.kind == 'text':

299

# Print each text chunk as it arrives

300

print(event.delta.content, end='', flush=True)

301

elif event.kind == 'final-result':

302

print(f"\n\nFinal result ready: {len(event.result)} characters")

303

break

304

305

asyncio.run(stream_response())

306

```

307

308

### Streaming with Tool Calls

309

310

```python

311

import asyncio

312

from pydantic_ai import Agent, tool

313

314

@tool

315

def get_weather(location: str) -> str:

316

"""Get weather for a location."""

317

return f"Weather in {location}: Sunny, 22°C"

318

319

agent = Agent(

320

model='gpt-4',

321

tools=[get_weather],

322

system_prompt='You can check weather using tools.'

323

)

324

325

async def stream_with_tools():

326

stream = await agent.run_stream('What is the weather in Paris?')

327

328

async for event in stream:

329

if event.kind == 'part-delta':

330

print(event.delta.content, end='', flush=True)

331

elif event.kind == 'function-tool-call':

332

print(f"\n[Calling tool: {event.tool_name}({event.args})]")

333

elif event.kind == 'function-tool-result':

334

print(f"[Tool result: {event.result}]")

335

elif event.kind == 'final-result':

336

print(f"\n\nComplete response: {event.result}")

337

break

338

339

asyncio.run(stream_with_tools())

340

```

341

342

### Direct Model Streaming

343

344

```python

345

import asyncio

346

from pydantic_ai.models import OpenAIModel

347

from pydantic_ai.direct import model_request_stream

348

from pydantic_ai.messages import ModelRequest, UserPromptPart

349

350

async def direct_stream():

351

model = OpenAIModel('gpt-4')

352

messages = [ModelRequest([UserPromptPart('Count to 10')])]

353

354

stream = await model_request_stream(model, messages)

355

356

async for event in stream:

357

if event.kind == 'part-delta' and event.delta.kind == 'text':

358

print(event.delta.content, end='', flush=True)

359

360

final_response = await stream.get_final_response()

361

print(f"\n\nFinal response has {len(final_response.parts)} parts")

362

363

asyncio.run(direct_stream())

364

```

365

366

### Synchronous Streaming

367

368

```python

369

from pydantic_ai.models import OpenAIModel

370

from pydantic_ai.direct import model_request_stream_sync

371

from pydantic_ai.messages import ModelRequest, UserPromptPart

372

373

def sync_stream():

374

model = OpenAIModel('gpt-4')

375

messages = [ModelRequest([UserPromptPart('Write a haiku')])]

376

377

stream = model_request_stream_sync(model, messages)

378

379

for event in stream:

380

if event.kind == 'part-delta' and event.delta.kind == 'text':

381

print(event.delta.content, end='', flush=True)

382

383

final_response = stream.get_final_response()

384

print(f"\n\nComplete haiku received")

385

386

sync_stream()

387

```

388

389

### Streaming with Structured Output

390

391

```python

392

import asyncio

393

from pydantic_ai import Agent

394

from pydantic import BaseModel

395

396

class StoryInfo(BaseModel):

397

title: str

398

characters: list[str]

399

setting: str

400

plot_summary: str

401

402

agent = Agent(

403

model='gpt-4',

404

system_prompt='Create story information.',

405

result_type=StoryInfo

406

)

407

408

async def stream_structured():

409

stream = await agent.run_stream('Create a sci-fi story about time travel')

410

411

text_content = ""

412

async for event in stream:

413

if event.kind == 'part-delta' and event.delta.kind == 'text':

414

text_content += event.delta.content

415

print(event.delta.content, end='', flush=True)

416

elif event.kind == 'final-result':

417

print(f"\n\nStructured result:")

418

print(f"Title: {event.result.title}")

419

print(f"Characters: {', '.join(event.result.characters)}")

420

print(f"Setting: {event.result.setting}")

421

break

422

423

asyncio.run(stream_structured())

424

```

425

426

### Advanced Event Handling

427

428

```python

429

import asyncio

430

from pydantic_ai import Agent, tool

431

432

@tool

433

def search_database(query: str) -> list[dict]:

434

"""Search database for information."""

435

return [{"id": 1, "title": "Result 1"}, {"id": 2, "title": "Result 2"}]

436

437

agent = Agent(

438

model='gpt-4',

439

tools=[search_database],

440

system_prompt='You can search for information.'

441

)

442

443

async def handle_all_events():

444

stream = await agent.run_stream('Search for Python tutorials')

445

446

tool_calls_made = 0

447

text_chunks_received = 0

448

449

async for event in stream:

450

if event.kind == 'part-start':

451

print(f"New part started: {event.part.kind}")

452

elif event.kind == 'part-delta':

453

text_chunks_received += 1

454

if event.delta.kind == 'text':

455

print(event.delta.content, end='', flush=True)

456

elif event.kind == 'function-tool-call':

457

tool_calls_made += 1

458

print(f"\n[Tool call #{tool_calls_made}: {event.tool_name}]")

459

elif event.kind == 'function-tool-result':

460

print(f"[Got {len(event.result)} results]")

461

elif event.kind == 'final-result':

462

print(f"\n\nStream completed:")

463

print(f"- Text chunks: {text_chunks_received}")

464

print(f"- Tool calls: {tool_calls_made}")

465

print(f"- Final result length: {len(event.result)}")

466

break

467

468

asyncio.run(handle_all_events())

469

```

470

471

### Stream Error Handling

472

473

```python

474

import asyncio

475

from pydantic_ai import Agent

476

from pydantic_ai.exceptions import ModelHTTPError, AgentRunError

477

478

agent = Agent(model='gpt-4')

479

480

async def stream_with_error_handling():

481

try:

482

stream = await agent.run_stream('Generate a very long response')

483

484

async for event in stream:

485

if event.kind == 'part-delta':

486

print(event.delta.content, end='', flush=True)

487

elif event.kind == 'final-result':

488

print(f"\n\nSuccess! Result: {event.result[:100]}...")

489

break

490

491

except ModelHTTPError as e:

492

print(f"Model HTTP error: {e}")

493

except AgentRunError as e:

494

print(f"Agent run error: {e}")

495

except Exception as e:

496

print(f"Unexpected error: {e}")

497

498

asyncio.run(stream_with_error_handling())

499

```