0
# Streaming and Async
1
2
Comprehensive streaming support for real-time interactions with immediate validation, delta updates, and event handling. Includes both async and sync streaming interfaces.
3
4
## Capabilities
5
6
### Agent Streaming
7
8
Stream agent responses in real-time with comprehensive event handling.
9
10
```python { .api }
11
class AgentStream[AgentDepsT, OutputDataT]:
12
"""
13
Agent stream interface for real-time response processing.
14
"""
15
async def __anext__(self) -> AgentStreamEvent[AgentDepsT, OutputDataT]:
16
"""
17
Get next event from the stream.
18
19
Returns:
20
Next stream event (part delta, tool call, result, etc.)
21
"""
22
23
async def get_final_result(self) -> FinalResult[OutputDataT]:
24
"""
25
Get the final result after stream completion.
26
27
Returns:
28
Final result with complete data and metadata
29
"""
30
31
async def run_stream(
32
self,
33
user_prompt: str,
34
*,
35
message_history: list[ModelMessage] | None = None,
36
deps: AgentDepsT = None,
37
model_settings: ModelSettings | None = None
38
) -> AgentStream[AgentDepsT, OutputDataT]:
39
"""
40
Run agent with streaming response.
41
42
Parameters:
43
- user_prompt: User's input message
44
- message_history: Previous conversation messages
45
- deps: Dependencies to pass to tools and system prompt
46
- model_settings: Model settings for this run
47
48
Returns:
49
Async iterable stream of agent events
50
"""
51
```
52
53
### Streaming Results
54
55
Result objects for streaming operations.
56
57
```python { .api }
58
class StreamedRunResult[AgentDepsT, OutputDataT]:
59
"""
60
Result from a streamed agent run.
61
"""
62
def __init__(
63
self,
64
stream: AgentStream[AgentDepsT, OutputDataT],
65
agent: AbstractAgent[AgentDepsT, OutputDataT]
66
): ...
67
68
async def stream_events(self) -> AsyncIterator[AgentStreamEvent]: ...
69
async def get_final_result(self) -> FinalResult[OutputDataT]: ...
70
71
class FinalResult[OutputDataT]:
72
"""
73
Final result marker containing complete response data.
74
"""
75
data: OutputDataT
76
usage: RunUsage
77
messages: list[ModelMessage]
78
cost: float | None
79
```
80
81
### Model-Level Streaming
82
83
Direct model streaming interfaces for low-level control.
84
85
```python { .api }
86
class StreamedResponse:
87
"""
88
Streamed model response for real-time processing.
89
"""
90
async def __aiter__(self) -> AsyncIterator[ModelResponseStreamEvent]:
91
"""Iterate over streaming response events."""
92
93
async def get_final_response(self) -> ModelResponse:
94
"""Get complete response after streaming finishes."""
95
96
class StreamedResponseSync:
97
"""
98
Synchronous streamed response for non-async contexts.
99
"""
100
def __iter__(self) -> Iterator[ModelResponseStreamEvent]:
101
"""Iterate over streaming response events synchronously."""
102
103
def get_final_response(self) -> ModelResponse:
104
"""Get complete response after streaming finishes."""
105
```
106
107
### Direct Model Streaming Functions
108
109
Functions for direct model interaction with streaming.
110
111
```python { .api }
112
async def model_request_stream(
113
model: Model,
114
messages: list[ModelMessage],
115
*,
116
model_settings: ModelSettings | None = None
117
) -> StreamedResponse:
118
"""
119
Make streaming request directly to model.
120
121
Parameters:
122
- model: Model to request from
123
- messages: Conversation messages
124
- model_settings: Model configuration
125
126
Returns:
127
Streaming response with real-time updates
128
"""
129
130
def model_request_stream_sync(
131
model: Model,
132
messages: list[ModelMessage],
133
*,
134
model_settings: ModelSettings | None = None
135
) -> StreamedResponseSync:
136
"""
137
Make streaming request directly to model synchronously.
138
139
Parameters:
140
- model: Model to request from
141
- messages: Conversation messages
142
- model_settings: Model configuration
143
144
Returns:
145
Synchronous streaming response
146
"""
147
```
148
149
### Stream Event Types
150
151
Comprehensive event types for different streaming scenarios.
152
153
```python { .api }
154
class PartStartEvent:
155
"""Event fired when a new response part starts."""
156
part: ModelResponsePart
157
kind: Literal['part-start']
158
159
class PartDeltaEvent:
160
"""Event fired for incremental updates to response parts."""
161
delta: ModelResponsePartDelta
162
kind: Literal['part-delta']
163
164
class FinalResultEvent:
165
"""Event fired when final result is ready."""
166
result: Any
167
kind: Literal['final-result']
168
169
class FunctionToolCallEvent:
170
"""Event fired when function tool is called."""
171
tool_name: str
172
args: dict[str, Any]
173
tool_id: str | None
174
kind: Literal['function-tool-call']
175
176
class FunctionToolResultEvent:
177
"""Event fired when function tool returns result."""
178
tool_name: str
179
result: Any
180
tool_id: str | None
181
kind: Literal['function-tool-result']
182
183
class BuiltinToolCallEvent:
184
"""Event fired when built-in tool is called."""
185
tool_name: str
186
args: dict[str, Any]
187
tool_id: str | None
188
kind: Literal['builtin-tool-call']
189
190
class BuiltinToolResultEvent:
191
"""Event fired when built-in tool returns result."""
192
tool_name: str
193
result: Any
194
tool_id: str | None
195
kind: Literal['builtin-tool-result']
196
197
ModelResponseStreamEvent = PartStartEvent | PartDeltaEvent
198
199
HandleResponseEvent = (
200
FunctionToolCallEvent |
201
FunctionToolResultEvent |
202
BuiltinToolCallEvent |
203
BuiltinToolResultEvent
204
)
205
206
AgentStreamEvent = (
207
ModelResponseStreamEvent |
208
HandleResponseEvent |
209
FinalResultEvent
210
)
211
```
212
213
### Delta Types
214
215
Delta update types for incremental streaming updates.
216
217
```python { .api }
218
class TextPartDelta:
219
"""Incremental text content update."""
220
content: str
221
kind: Literal['text']
222
223
class ThinkingPartDelta:
224
"""Incremental thinking content update (for reasoning models)."""
225
content: str
226
kind: Literal['thinking']
227
228
class ToolCallPartDelta:
229
"""Incremental tool call update."""
230
tool_name: str | None
231
args: dict[str, Any] | None
232
tool_id: str | None
233
kind: Literal['tool-call']
234
235
ModelResponsePartDelta = (
236
TextPartDelta |
237
ThinkingPartDelta |
238
ToolCallPartDelta
239
)
240
```
241
242
### Stream Event Handlers
243
244
Event handler interfaces for processing streaming events.
245
246
```python { .api }
247
class EventStreamHandler[AgentDepsT]:
248
"""
249
Handler for streaming events during agent execution.
250
"""
251
async def on_model_request(
252
self,
253
messages: list[ModelMessage]
254
) -> None:
255
"""Called when model request is about to be made."""
256
257
async def on_model_response_start(self) -> None:
258
"""Called when model response starts streaming."""
259
260
async def on_model_response_part(
261
self,
262
part: ModelResponsePart
263
) -> None:
264
"""Called for each response part."""
265
266
async def on_tool_call(
267
self,
268
tool_name: str,
269
args: dict[str, Any]
270
) -> None:
271
"""Called when tool is about to be called."""
272
273
async def on_tool_result(
274
self,
275
tool_name: str,
276
result: Any
277
) -> None:
278
"""Called when tool returns result."""
279
```
280
281
## Usage Examples
282
283
### Basic Agent Streaming
284
285
```python
286
import asyncio
287
from pydantic_ai import Agent
288
289
agent = Agent(
290
model='gpt-4',
291
system_prompt='You are a helpful assistant.'
292
)
293
294
async def stream_response():
295
stream = await agent.run_stream('Tell me a story about a robot')
296
297
async for event in stream:
298
if event.kind == 'part-delta' and event.delta.kind == 'text':
299
# Print each text chunk as it arrives
300
print(event.delta.content, end='', flush=True)
301
elif event.kind == 'final-result':
302
print(f"\n\nFinal result ready: {len(event.result)} characters")
303
break
304
305
asyncio.run(stream_response())
306
```
307
308
### Streaming with Tool Calls
309
310
```python
311
import asyncio
312
from pydantic_ai import Agent, tool
313
314
@tool
315
def get_weather(location: str) -> str:
316
"""Get weather for a location."""
317
return f"Weather in {location}: Sunny, 22°C"
318
319
agent = Agent(
320
model='gpt-4',
321
tools=[get_weather],
322
system_prompt='You can check weather using tools.'
323
)
324
325
async def stream_with_tools():
326
stream = await agent.run_stream('What is the weather in Paris?')
327
328
async for event in stream:
329
if event.kind == 'part-delta':
330
print(event.delta.content, end='', flush=True)
331
elif event.kind == 'function-tool-call':
332
print(f"\n[Calling tool: {event.tool_name}({event.args})]")
333
elif event.kind == 'function-tool-result':
334
print(f"[Tool result: {event.result}]")
335
elif event.kind == 'final-result':
336
print(f"\n\nComplete response: {event.result}")
337
break
338
339
asyncio.run(stream_with_tools())
340
```
341
342
### Direct Model Streaming
343
344
```python
345
import asyncio
346
from pydantic_ai.models import OpenAIModel
347
from pydantic_ai.direct import model_request_stream
348
from pydantic_ai.messages import ModelRequest, UserPromptPart
349
350
async def direct_stream():
351
model = OpenAIModel('gpt-4')
352
messages = [ModelRequest([UserPromptPart('Count to 10')])]
353
354
stream = await model_request_stream(model, messages)
355
356
async for event in stream:
357
if event.kind == 'part-delta' and event.delta.kind == 'text':
358
print(event.delta.content, end='', flush=True)
359
360
final_response = await stream.get_final_response()
361
print(f"\n\nFinal response has {len(final_response.parts)} parts")
362
363
asyncio.run(direct_stream())
364
```
365
366
### Synchronous Streaming
367
368
```python
369
from pydantic_ai.models import OpenAIModel
370
from pydantic_ai.direct import model_request_stream_sync
371
from pydantic_ai.messages import ModelRequest, UserPromptPart
372
373
def sync_stream():
374
model = OpenAIModel('gpt-4')
375
messages = [ModelRequest([UserPromptPart('Write a haiku')])]
376
377
stream = model_request_stream_sync(model, messages)
378
379
for event in stream:
380
if event.kind == 'part-delta' and event.delta.kind == 'text':
381
print(event.delta.content, end='', flush=True)
382
383
final_response = stream.get_final_response()
384
print(f"\n\nComplete haiku received")
385
386
sync_stream()
387
```
388
389
### Streaming with Structured Output
390
391
```python
392
import asyncio
393
from pydantic_ai import Agent
394
from pydantic import BaseModel
395
396
class StoryInfo(BaseModel):
397
title: str
398
characters: list[str]
399
setting: str
400
plot_summary: str
401
402
agent = Agent(
403
model='gpt-4',
404
system_prompt='Create story information.',
405
result_type=StoryInfo
406
)
407
408
async def stream_structured():
409
stream = await agent.run_stream('Create a sci-fi story about time travel')
410
411
text_content = ""
412
async for event in stream:
413
if event.kind == 'part-delta' and event.delta.kind == 'text':
414
text_content += event.delta.content
415
print(event.delta.content, end='', flush=True)
416
elif event.kind == 'final-result':
417
print(f"\n\nStructured result:")
418
print(f"Title: {event.result.title}")
419
print(f"Characters: {', '.join(event.result.characters)}")
420
print(f"Setting: {event.result.setting}")
421
break
422
423
asyncio.run(stream_structured())
424
```
425
426
### Advanced Event Handling
427
428
```python
429
import asyncio
430
from pydantic_ai import Agent, tool
431
432
@tool
433
def search_database(query: str) -> list[dict]:
434
"""Search database for information."""
435
return [{"id": 1, "title": "Result 1"}, {"id": 2, "title": "Result 2"}]
436
437
agent = Agent(
438
model='gpt-4',
439
tools=[search_database],
440
system_prompt='You can search for information.'
441
)
442
443
async def handle_all_events():
444
stream = await agent.run_stream('Search for Python tutorials')
445
446
tool_calls_made = 0
447
text_chunks_received = 0
448
449
async for event in stream:
450
if event.kind == 'part-start':
451
print(f"New part started: {event.part.kind}")
452
elif event.kind == 'part-delta':
453
text_chunks_received += 1
454
if event.delta.kind == 'text':
455
print(event.delta.content, end='', flush=True)
456
elif event.kind == 'function-tool-call':
457
tool_calls_made += 1
458
print(f"\n[Tool call #{tool_calls_made}: {event.tool_name}]")
459
elif event.kind == 'function-tool-result':
460
print(f"[Got {len(event.result)} results]")
461
elif event.kind == 'final-result':
462
print(f"\n\nStream completed:")
463
print(f"- Text chunks: {text_chunks_received}")
464
print(f"- Tool calls: {tool_calls_made}")
465
print(f"- Final result length: {len(event.result)}")
466
break
467
468
asyncio.run(handle_all_events())
469
```
470
471
### Stream Error Handling
472
473
```python
474
import asyncio
475
from pydantic_ai import Agent
476
from pydantic_ai.exceptions import ModelHTTPError, AgentRunError
477
478
agent = Agent(model='gpt-4')
479
480
async def stream_with_error_handling():
481
try:
482
stream = await agent.run_stream('Generate a very long response')
483
484
async for event in stream:
485
if event.kind == 'part-delta':
486
print(event.delta.content, end='', flush=True)
487
elif event.kind == 'final-result':
488
print(f"\n\nSuccess! Result: {event.result[:100]}...")
489
break
490
491
except ModelHTTPError as e:
492
print(f"Model HTTP error: {e}")
493
except AgentRunError as e:
494
print(f"Agent run error: {e}")
495
except Exception as e:
496
print(f"Unexpected error: {e}")
497
498
asyncio.run(stream_with_error_handling())
499
```