- Spec files
pypi-pydantic-ai
Describes: pkg:pypi/pydantic-ai@0.8.x
- Description
- Agent Framework / shim to use Pydantic with LLMs
- Author
- tessl
- Last updated
streaming.md docs/
1# Streaming and Async23Comprehensive streaming support for real-time interactions with immediate validation, delta updates, and event handling. Includes both async and sync streaming interfaces.45## Capabilities67### Agent Streaming89Stream agent responses in real-time with comprehensive event handling.1011```python { .api }12class AgentStream[AgentDepsT, OutputDataT]:13"""14Agent stream interface for real-time response processing.15"""16async def __anext__(self) -> AgentStreamEvent[AgentDepsT, OutputDataT]:17"""18Get next event from the stream.1920Returns:21Next stream event (part delta, tool call, result, etc.)22"""2324async def get_final_result(self) -> FinalResult[OutputDataT]:25"""26Get the final result after stream completion.2728Returns:29Final result with complete data and metadata30"""3132async def run_stream(33self,34user_prompt: str,35*,36message_history: list[ModelMessage] | None = None,37deps: AgentDepsT = None,38model_settings: ModelSettings | None = None39) -> AgentStream[AgentDepsT, OutputDataT]:40"""41Run agent with streaming response.4243Parameters:44- user_prompt: User's input message45- message_history: Previous conversation messages46- deps: Dependencies to pass to tools and system prompt47- model_settings: Model settings for this run4849Returns:50Async iterable stream of agent events51"""52```5354### Streaming Results5556Result objects for streaming operations.5758```python { .api }59class StreamedRunResult[AgentDepsT, OutputDataT]:60"""61Result from a streamed agent run.62"""63def __init__(64self,65stream: AgentStream[AgentDepsT, OutputDataT],66agent: AbstractAgent[AgentDepsT, OutputDataT]67): ...6869async def stream_events(self) -> AsyncIterator[AgentStreamEvent]: ...70async def get_final_result(self) -> FinalResult[OutputDataT]: ...7172class FinalResult[OutputDataT]:73"""74Final result marker containing complete response data.75"""76data: OutputDataT77usage: RunUsage78messages: list[ModelMessage]79cost: float | None80```8182### Model-Level Streaming8384Direct model streaming interfaces for low-level control.8586```python { .api }87class StreamedResponse:88"""89Streamed model response for real-time processing.90"""91async def __aiter__(self) -> AsyncIterator[ModelResponseStreamEvent]:92"""Iterate over streaming response events."""9394async def get_final_response(self) -> ModelResponse:95"""Get complete response after streaming finishes."""9697class StreamedResponseSync:98"""99Synchronous streamed response for non-async contexts.100"""101def __iter__(self) -> Iterator[ModelResponseStreamEvent]:102"""Iterate over streaming response events synchronously."""103104def get_final_response(self) -> ModelResponse:105"""Get complete response after streaming finishes."""106```107108### Direct Model Streaming Functions109110Functions for direct model interaction with streaming.111112```python { .api }113async def model_request_stream(114model: Model,115messages: list[ModelMessage],116*,117model_settings: ModelSettings | None = None118) -> StreamedResponse:119"""120Make streaming request directly to model.121122Parameters:123- model: Model to request from124- messages: Conversation messages125- model_settings: Model configuration126127Returns:128Streaming response with real-time updates129"""130131def model_request_stream_sync(132model: Model,133messages: list[ModelMessage],134*,135model_settings: ModelSettings | None = None136) -> StreamedResponseSync:137"""138Make streaming request directly to model synchronously.139140Parameters:141- model: Model to request from142- messages: Conversation messages143- model_settings: Model configuration144145Returns:146Synchronous streaming response147"""148```149150### Stream Event Types151152Comprehensive event types for different streaming scenarios.153154```python { .api }155class PartStartEvent:156"""Event fired when a new response part starts."""157part: ModelResponsePart158kind: Literal['part-start']159160class PartDeltaEvent:161"""Event fired for incremental updates to response parts."""162delta: ModelResponsePartDelta163kind: Literal['part-delta']164165class FinalResultEvent:166"""Event fired when final result is ready."""167result: Any168kind: Literal['final-result']169170class FunctionToolCallEvent:171"""Event fired when function tool is called."""172tool_name: str173args: dict[str, Any]174tool_id: str | None175kind: Literal['function-tool-call']176177class FunctionToolResultEvent:178"""Event fired when function tool returns result."""179tool_name: str180result: Any181tool_id: str | None182kind: Literal['function-tool-result']183184class BuiltinToolCallEvent:185"""Event fired when built-in tool is called."""186tool_name: str187args: dict[str, Any]188tool_id: str | None189kind: Literal['builtin-tool-call']190191class BuiltinToolResultEvent:192"""Event fired when built-in tool returns result."""193tool_name: str194result: Any195tool_id: str | None196kind: Literal['builtin-tool-result']197198ModelResponseStreamEvent = PartStartEvent | PartDeltaEvent199200HandleResponseEvent = (201FunctionToolCallEvent |202FunctionToolResultEvent |203BuiltinToolCallEvent |204BuiltinToolResultEvent205)206207AgentStreamEvent = (208ModelResponseStreamEvent |209HandleResponseEvent |210FinalResultEvent211)212```213214### Delta Types215216Delta update types for incremental streaming updates.217218```python { .api }219class TextPartDelta:220"""Incremental text content update."""221content: str222kind: Literal['text']223224class ThinkingPartDelta:225"""Incremental thinking content update (for reasoning models)."""226content: str227kind: Literal['thinking']228229class ToolCallPartDelta:230"""Incremental tool call update."""231tool_name: str | None232args: dict[str, Any] | None233tool_id: str | None234kind: Literal['tool-call']235236ModelResponsePartDelta = (237TextPartDelta |238ThinkingPartDelta |239ToolCallPartDelta240)241```242243### Stream Event Handlers244245Event handler interfaces for processing streaming events.246247```python { .api }248class EventStreamHandler[AgentDepsT]:249"""250Handler for streaming events during agent execution.251"""252async def on_model_request(253self,254messages: list[ModelMessage]255) -> None:256"""Called when model request is about to be made."""257258async def on_model_response_start(self) -> None:259"""Called when model response starts streaming."""260261async def on_model_response_part(262self,263part: ModelResponsePart264) -> None:265"""Called for each response part."""266267async def on_tool_call(268self,269tool_name: str,270args: dict[str, Any]271) -> None:272"""Called when tool is about to be called."""273274async def on_tool_result(275self,276tool_name: str,277result: Any278) -> None:279"""Called when tool returns result."""280```281282## Usage Examples283284### Basic Agent Streaming285286```python287import asyncio288from pydantic_ai import Agent289290agent = Agent(291model='gpt-4',292system_prompt='You are a helpful assistant.'293)294295async def stream_response():296stream = await agent.run_stream('Tell me a story about a robot')297298async for event in stream:299if event.kind == 'part-delta' and event.delta.kind == 'text':300# Print each text chunk as it arrives301print(event.delta.content, end='', flush=True)302elif event.kind == 'final-result':303print(f"\n\nFinal result ready: {len(event.result)} characters")304break305306asyncio.run(stream_response())307```308309### Streaming with Tool Calls310311```python312import asyncio313from pydantic_ai import Agent, tool314315@tool316def get_weather(location: str) -> str:317"""Get weather for a location."""318return f"Weather in {location}: Sunny, 22°C"319320agent = Agent(321model='gpt-4',322tools=[get_weather],323system_prompt='You can check weather using tools.'324)325326async def stream_with_tools():327stream = await agent.run_stream('What is the weather in Paris?')328329async for event in stream:330if event.kind == 'part-delta':331print(event.delta.content, end='', flush=True)332elif event.kind == 'function-tool-call':333print(f"\n[Calling tool: {event.tool_name}({event.args})]")334elif event.kind == 'function-tool-result':335print(f"[Tool result: {event.result}]")336elif event.kind == 'final-result':337print(f"\n\nComplete response: {event.result}")338break339340asyncio.run(stream_with_tools())341```342343### Direct Model Streaming344345```python346import asyncio347from pydantic_ai.models import OpenAIModel348from pydantic_ai.direct import model_request_stream349from pydantic_ai.messages import ModelRequest, UserPromptPart350351async def direct_stream():352model = OpenAIModel('gpt-4')353messages = [ModelRequest([UserPromptPart('Count to 10')])]354355stream = await model_request_stream(model, messages)356357async for event in stream:358if event.kind == 'part-delta' and event.delta.kind == 'text':359print(event.delta.content, end='', flush=True)360361final_response = await stream.get_final_response()362print(f"\n\nFinal response has {len(final_response.parts)} parts")363364asyncio.run(direct_stream())365```366367### Synchronous Streaming368369```python370from pydantic_ai.models import OpenAIModel371from pydantic_ai.direct import model_request_stream_sync372from pydantic_ai.messages import ModelRequest, UserPromptPart373374def sync_stream():375model = OpenAIModel('gpt-4')376messages = [ModelRequest([UserPromptPart('Write a haiku')])]377378stream = model_request_stream_sync(model, messages)379380for event in stream:381if event.kind == 'part-delta' and event.delta.kind == 'text':382print(event.delta.content, end='', flush=True)383384final_response = stream.get_final_response()385print(f"\n\nComplete haiku received")386387sync_stream()388```389390### Streaming with Structured Output391392```python393import asyncio394from pydantic_ai import Agent395from pydantic import BaseModel396397class StoryInfo(BaseModel):398title: str399characters: list[str]400setting: str401plot_summary: str402403agent = Agent(404model='gpt-4',405system_prompt='Create story information.',406result_type=StoryInfo407)408409async def stream_structured():410stream = await agent.run_stream('Create a sci-fi story about time travel')411412text_content = ""413async for event in stream:414if event.kind == 'part-delta' and event.delta.kind == 'text':415text_content += event.delta.content416print(event.delta.content, end='', flush=True)417elif event.kind == 'final-result':418print(f"\n\nStructured result:")419print(f"Title: {event.result.title}")420print(f"Characters: {', '.join(event.result.characters)}")421print(f"Setting: {event.result.setting}")422break423424asyncio.run(stream_structured())425```426427### Advanced Event Handling428429```python430import asyncio431from pydantic_ai import Agent, tool432433@tool434def search_database(query: str) -> list[dict]:435"""Search database for information."""436return [{"id": 1, "title": "Result 1"}, {"id": 2, "title": "Result 2"}]437438agent = Agent(439model='gpt-4',440tools=[search_database],441system_prompt='You can search for information.'442)443444async def handle_all_events():445stream = await agent.run_stream('Search for Python tutorials')446447tool_calls_made = 0448text_chunks_received = 0449450async for event in stream:451if event.kind == 'part-start':452print(f"New part started: {event.part.kind}")453elif event.kind == 'part-delta':454text_chunks_received += 1455if event.delta.kind == 'text':456print(event.delta.content, end='', flush=True)457elif event.kind == 'function-tool-call':458tool_calls_made += 1459print(f"\n[Tool call #{tool_calls_made}: {event.tool_name}]")460elif event.kind == 'function-tool-result':461print(f"[Got {len(event.result)} results]")462elif event.kind == 'final-result':463print(f"\n\nStream completed:")464print(f"- Text chunks: {text_chunks_received}")465print(f"- Tool calls: {tool_calls_made}")466print(f"- Final result length: {len(event.result)}")467break468469asyncio.run(handle_all_events())470```471472### Stream Error Handling473474```python475import asyncio476from pydantic_ai import Agent477from pydantic_ai.exceptions import ModelHTTPError, AgentRunError478479agent = Agent(model='gpt-4')480481async def stream_with_error_handling():482try:483stream = await agent.run_stream('Generate a very long response')484485async for event in stream:486if event.kind == 'part-delta':487print(event.delta.content, end='', flush=True)488elif event.kind == 'final-result':489print(f"\n\nSuccess! Result: {event.result[:100]}...")490break491492except ModelHTTPError as e:493print(f"Model HTTP error: {e}")494except AgentRunError as e:495print(f"Agent run error: {e}")496except Exception as e:497print(f"Unexpected error: {e}")498499asyncio.run(stream_with_error_handling())500```