pypi-pydantic-ai

Description
Agent Framework / shim to use Pydantic with LLMs
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-pydantic-ai@0.8.0

streaming.md docs/

1
# Streaming and Async
2
3
Comprehensive streaming support for real-time interactions with immediate validation, delta updates, and event handling. Includes both async and sync streaming interfaces.
4
5
## Capabilities
6
7
### Agent Streaming
8
9
Stream agent responses in real-time with comprehensive event handling.
10
11
```python { .api }
12
class AgentStream[AgentDepsT, OutputDataT]:
13
"""
14
Agent stream interface for real-time response processing.
15
"""
16
async def __anext__(self) -> AgentStreamEvent[AgentDepsT, OutputDataT]:
17
"""
18
Get next event from the stream.
19
20
Returns:
21
Next stream event (part delta, tool call, result, etc.)
22
"""
23
24
async def get_final_result(self) -> FinalResult[OutputDataT]:
25
"""
26
Get the final result after stream completion.
27
28
Returns:
29
Final result with complete data and metadata
30
"""
31
32
async def run_stream(
33
self,
34
user_prompt: str,
35
*,
36
message_history: list[ModelMessage] | None = None,
37
deps: AgentDepsT = None,
38
model_settings: ModelSettings | None = None
39
) -> AgentStream[AgentDepsT, OutputDataT]:
40
"""
41
Run agent with streaming response.
42
43
Parameters:
44
- user_prompt: User's input message
45
- message_history: Previous conversation messages
46
- deps: Dependencies to pass to tools and system prompt
47
- model_settings: Model settings for this run
48
49
Returns:
50
Async iterable stream of agent events
51
"""
52
```
53
54
### Streaming Results
55
56
Result objects for streaming operations.
57
58
```python { .api }
59
class StreamedRunResult[AgentDepsT, OutputDataT]:
60
"""
61
Result from a streamed agent run.
62
"""
63
def __init__(
64
self,
65
stream: AgentStream[AgentDepsT, OutputDataT],
66
agent: AbstractAgent[AgentDepsT, OutputDataT]
67
): ...
68
69
async def stream_events(self) -> AsyncIterator[AgentStreamEvent]: ...
70
async def get_final_result(self) -> FinalResult[OutputDataT]: ...
71
72
class FinalResult[OutputDataT]:
73
"""
74
Final result marker containing complete response data.
75
"""
76
data: OutputDataT
77
usage: RunUsage
78
messages: list[ModelMessage]
79
cost: float | None
80
```
81
82
### Model-Level Streaming
83
84
Direct model streaming interfaces for low-level control.
85
86
```python { .api }
87
class StreamedResponse:
88
"""
89
Streamed model response for real-time processing.
90
"""
91
async def __aiter__(self) -> AsyncIterator[ModelResponseStreamEvent]:
92
"""Iterate over streaming response events."""
93
94
async def get_final_response(self) -> ModelResponse:
95
"""Get complete response after streaming finishes."""
96
97
class StreamedResponseSync:
98
"""
99
Synchronous streamed response for non-async contexts.
100
"""
101
def __iter__(self) -> Iterator[ModelResponseStreamEvent]:
102
"""Iterate over streaming response events synchronously."""
103
104
def get_final_response(self) -> ModelResponse:
105
"""Get complete response after streaming finishes."""
106
```
107
108
### Direct Model Streaming Functions
109
110
Functions for direct model interaction with streaming.
111
112
```python { .api }
113
async def model_request_stream(
114
model: Model,
115
messages: list[ModelMessage],
116
*,
117
model_settings: ModelSettings | None = None
118
) -> StreamedResponse:
119
"""
120
Make streaming request directly to model.
121
122
Parameters:
123
- model: Model to request from
124
- messages: Conversation messages
125
- model_settings: Model configuration
126
127
Returns:
128
Streaming response with real-time updates
129
"""
130
131
def model_request_stream_sync(
132
model: Model,
133
messages: list[ModelMessage],
134
*,
135
model_settings: ModelSettings | None = None
136
) -> StreamedResponseSync:
137
"""
138
Make streaming request directly to model synchronously.
139
140
Parameters:
141
- model: Model to request from
142
- messages: Conversation messages
143
- model_settings: Model configuration
144
145
Returns:
146
Synchronous streaming response
147
"""
148
```
149
150
### Stream Event Types
151
152
Comprehensive event types for different streaming scenarios.
153
154
```python { .api }
155
class PartStartEvent:
156
"""Event fired when a new response part starts."""
157
part: ModelResponsePart
158
kind: Literal['part-start']
159
160
class PartDeltaEvent:
161
"""Event fired for incremental updates to response parts."""
162
delta: ModelResponsePartDelta
163
kind: Literal['part-delta']
164
165
class FinalResultEvent:
166
"""Event fired when final result is ready."""
167
result: Any
168
kind: Literal['final-result']
169
170
class FunctionToolCallEvent:
171
"""Event fired when function tool is called."""
172
tool_name: str
173
args: dict[str, Any]
174
tool_id: str | None
175
kind: Literal['function-tool-call']
176
177
class FunctionToolResultEvent:
178
"""Event fired when function tool returns result."""
179
tool_name: str
180
result: Any
181
tool_id: str | None
182
kind: Literal['function-tool-result']
183
184
class BuiltinToolCallEvent:
185
"""Event fired when built-in tool is called."""
186
tool_name: str
187
args: dict[str, Any]
188
tool_id: str | None
189
kind: Literal['builtin-tool-call']
190
191
class BuiltinToolResultEvent:
192
"""Event fired when built-in tool returns result."""
193
tool_name: str
194
result: Any
195
tool_id: str | None
196
kind: Literal['builtin-tool-result']
197
198
ModelResponseStreamEvent = PartStartEvent | PartDeltaEvent
199
200
HandleResponseEvent = (
201
FunctionToolCallEvent |
202
FunctionToolResultEvent |
203
BuiltinToolCallEvent |
204
BuiltinToolResultEvent
205
)
206
207
AgentStreamEvent = (
208
ModelResponseStreamEvent |
209
HandleResponseEvent |
210
FinalResultEvent
211
)
212
```
213
214
### Delta Types
215
216
Delta update types for incremental streaming updates.
217
218
```python { .api }
219
class TextPartDelta:
220
"""Incremental text content update."""
221
content: str
222
kind: Literal['text']
223
224
class ThinkingPartDelta:
225
"""Incremental thinking content update (for reasoning models)."""
226
content: str
227
kind: Literal['thinking']
228
229
class ToolCallPartDelta:
230
"""Incremental tool call update."""
231
tool_name: str | None
232
args: dict[str, Any] | None
233
tool_id: str | None
234
kind: Literal['tool-call']
235
236
ModelResponsePartDelta = (
237
TextPartDelta |
238
ThinkingPartDelta |
239
ToolCallPartDelta
240
)
241
```
242
243
### Stream Event Handlers
244
245
Event handler interfaces for processing streaming events.
246
247
```python { .api }
248
class EventStreamHandler[AgentDepsT]:
249
"""
250
Handler for streaming events during agent execution.
251
"""
252
async def on_model_request(
253
self,
254
messages: list[ModelMessage]
255
) -> None:
256
"""Called when model request is about to be made."""
257
258
async def on_model_response_start(self) -> None:
259
"""Called when model response starts streaming."""
260
261
async def on_model_response_part(
262
self,
263
part: ModelResponsePart
264
) -> None:
265
"""Called for each response part."""
266
267
async def on_tool_call(
268
self,
269
tool_name: str,
270
args: dict[str, Any]
271
) -> None:
272
"""Called when tool is about to be called."""
273
274
async def on_tool_result(
275
self,
276
tool_name: str,
277
result: Any
278
) -> None:
279
"""Called when tool returns result."""
280
```
281
282
## Usage Examples
283
284
### Basic Agent Streaming
285
286
```python
287
import asyncio
288
from pydantic_ai import Agent
289
290
agent = Agent(
291
model='gpt-4',
292
system_prompt='You are a helpful assistant.'
293
)
294
295
async def stream_response():
296
stream = await agent.run_stream('Tell me a story about a robot')
297
298
async for event in stream:
299
if event.kind == 'part-delta' and event.delta.kind == 'text':
300
# Print each text chunk as it arrives
301
print(event.delta.content, end='', flush=True)
302
elif event.kind == 'final-result':
303
print(f"\n\nFinal result ready: {len(event.result)} characters")
304
break
305
306
asyncio.run(stream_response())
307
```
308
309
### Streaming with Tool Calls
310
311
```python
312
import asyncio
313
from pydantic_ai import Agent, tool
314
315
@tool
316
def get_weather(location: str) -> str:
317
"""Get weather for a location."""
318
return f"Weather in {location}: Sunny, 22°C"
319
320
agent = Agent(
321
model='gpt-4',
322
tools=[get_weather],
323
system_prompt='You can check weather using tools.'
324
)
325
326
async def stream_with_tools():
327
stream = await agent.run_stream('What is the weather in Paris?')
328
329
async for event in stream:
330
if event.kind == 'part-delta':
331
print(event.delta.content, end='', flush=True)
332
elif event.kind == 'function-tool-call':
333
print(f"\n[Calling tool: {event.tool_name}({event.args})]")
334
elif event.kind == 'function-tool-result':
335
print(f"[Tool result: {event.result}]")
336
elif event.kind == 'final-result':
337
print(f"\n\nComplete response: {event.result}")
338
break
339
340
asyncio.run(stream_with_tools())
341
```
342
343
### Direct Model Streaming
344
345
```python
346
import asyncio
347
from pydantic_ai.models import OpenAIModel
348
from pydantic_ai.direct import model_request_stream
349
from pydantic_ai.messages import ModelRequest, UserPromptPart
350
351
async def direct_stream():
352
model = OpenAIModel('gpt-4')
353
messages = [ModelRequest([UserPromptPart('Count to 10')])]
354
355
stream = await model_request_stream(model, messages)
356
357
async for event in stream:
358
if event.kind == 'part-delta' and event.delta.kind == 'text':
359
print(event.delta.content, end='', flush=True)
360
361
final_response = await stream.get_final_response()
362
print(f"\n\nFinal response has {len(final_response.parts)} parts")
363
364
asyncio.run(direct_stream())
365
```
366
367
### Synchronous Streaming
368
369
```python
370
from pydantic_ai.models import OpenAIModel
371
from pydantic_ai.direct import model_request_stream_sync
372
from pydantic_ai.messages import ModelRequest, UserPromptPart
373
374
def sync_stream():
375
model = OpenAIModel('gpt-4')
376
messages = [ModelRequest([UserPromptPart('Write a haiku')])]
377
378
stream = model_request_stream_sync(model, messages)
379
380
for event in stream:
381
if event.kind == 'part-delta' and event.delta.kind == 'text':
382
print(event.delta.content, end='', flush=True)
383
384
final_response = stream.get_final_response()
385
print(f"\n\nComplete haiku received")
386
387
sync_stream()
388
```
389
390
### Streaming with Structured Output
391
392
```python
393
import asyncio
394
from pydantic_ai import Agent
395
from pydantic import BaseModel
396
397
class StoryInfo(BaseModel):
398
title: str
399
characters: list[str]
400
setting: str
401
plot_summary: str
402
403
agent = Agent(
404
model='gpt-4',
405
system_prompt='Create story information.',
406
result_type=StoryInfo
407
)
408
409
async def stream_structured():
410
stream = await agent.run_stream('Create a sci-fi story about time travel')
411
412
text_content = ""
413
async for event in stream:
414
if event.kind == 'part-delta' and event.delta.kind == 'text':
415
text_content += event.delta.content
416
print(event.delta.content, end='', flush=True)
417
elif event.kind == 'final-result':
418
print(f"\n\nStructured result:")
419
print(f"Title: {event.result.title}")
420
print(f"Characters: {', '.join(event.result.characters)}")
421
print(f"Setting: {event.result.setting}")
422
break
423
424
asyncio.run(stream_structured())
425
```
426
427
### Advanced Event Handling
428
429
```python
430
import asyncio
431
from pydantic_ai import Agent, tool
432
433
@tool
434
def search_database(query: str) -> list[dict]:
435
"""Search database for information."""
436
return [{"id": 1, "title": "Result 1"}, {"id": 2, "title": "Result 2"}]
437
438
agent = Agent(
439
model='gpt-4',
440
tools=[search_database],
441
system_prompt='You can search for information.'
442
)
443
444
async def handle_all_events():
445
stream = await agent.run_stream('Search for Python tutorials')
446
447
tool_calls_made = 0
448
text_chunks_received = 0
449
450
async for event in stream:
451
if event.kind == 'part-start':
452
print(f"New part started: {event.part.kind}")
453
elif event.kind == 'part-delta':
454
text_chunks_received += 1
455
if event.delta.kind == 'text':
456
print(event.delta.content, end='', flush=True)
457
elif event.kind == 'function-tool-call':
458
tool_calls_made += 1
459
print(f"\n[Tool call #{tool_calls_made}: {event.tool_name}]")
460
elif event.kind == 'function-tool-result':
461
print(f"[Got {len(event.result)} results]")
462
elif event.kind == 'final-result':
463
print(f"\n\nStream completed:")
464
print(f"- Text chunks: {text_chunks_received}")
465
print(f"- Tool calls: {tool_calls_made}")
466
print(f"- Final result length: {len(event.result)}")
467
break
468
469
asyncio.run(handle_all_events())
470
```
471
472
### Stream Error Handling
473
474
```python
475
import asyncio
476
from pydantic_ai import Agent
477
from pydantic_ai.exceptions import ModelHTTPError, AgentRunError
478
479
agent = Agent(model='gpt-4')
480
481
async def stream_with_error_handling():
482
try:
483
stream = await agent.run_stream('Generate a very long response')
484
485
async for event in stream:
486
if event.kind == 'part-delta':
487
print(event.delta.content, end='', flush=True)
488
elif event.kind == 'final-result':
489
print(f"\n\nSuccess! Result: {event.result[:100]}...")
490
break
491
492
except ModelHTTPError as e:
493
print(f"Model HTTP error: {e}")
494
except AgentRunError as e:
495
print(f"Agent run error: {e}")
496
except Exception as e:
497
print(f"Unexpected error: {e}")
498
499
asyncio.run(stream_with_error_handling())
500
```