Support for streaming responses from language models in real-time. Enables progressive output display and interactive user experiences.
Decorator to enable streaming for DSPy modules.
def streamify(module):
"""
Enable streaming for a module.
Wraps a module to capture and stream LM responses in real-time
as they are generated. Returns a context manager.
Args:
module: Module to enable streaming for
Returns:
StreamListener context manager
"""
passUsage:
import dspy
# Configure
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
# Create module
qa = dspy.ChainOfThought("question -> answer")
# Stream responses
with dspy.streamify(qa) as stream:
# Execute module
result = qa(question="What is machine learning?")
# Stream chunks as they arrive
for chunk in stream:
print(chunk, end="", flush=True)
# Final result available
print("\n\nFinal answer:", result.answer)Listens to streaming responses from a module.
class StreamListener:
"""
Stream listener for module responses.
Context manager that captures streaming output from module
execution. Provides iterator interface for consuming chunks.
"""
def __iter__(self):
"""
Iterate over stream chunks.
Yields:
str: Text chunks as they arrive
"""
pass
def __enter__(self):
"""
Enter context manager.
Returns:
Self for use in with statement
"""
pass
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit context manager.
Args:
exc_type: Exception type if raised
exc_val: Exception value if raised
exc_tb: Exception traceback if raised
"""
passUsage:
import dspy
qa = dspy.ChainOfThought("question -> answer")
# Stream with listener
listener = dspy.streamify(qa)
with listener as stream:
result = qa(question="Explain quantum computing")
# Iterate over chunks
for chunk in stream:
# Process each chunk
print(chunk, end="")
# Could send to frontend, write to file, etc.
print("\n\nStreaming complete")Container for streaming response chunks.
class StreamResponse:
"""
Container for streaming response.
Wraps streaming chunks with metadata about the stream state.
"""
content: str
"""Current chunk content."""
is_final: bool
"""Whether this is the final chunk."""
metadata: dict
"""Additional metadata about the chunk."""Status message in streaming responses.
class StatusMessage:
"""
Status message during streaming.
Provides status updates during module execution
(e.g., "Generating response...", "Calling tool...").
"""
message: str
"""Status message text."""
timestamp: float
"""Timestamp of status update."""Provider for status messages during streaming.
class StatusMessageProvider:
"""
Provider for status messages.
Emits status updates during streaming to provide
feedback about module execution state.
"""
def send_status(self, message: str):
"""
Send status message.
Args:
message (str): Status message to send
"""
passCreates a streaming response handler.
def streaming_response(module):
"""
Create streaming response handler.
Args:
module: Module to stream from
Returns:
Streaming response handler
"""
passApply synchronous streaming to a module.
def apply_sync_streaming(module):
"""
Apply synchronous streaming to module.
Args:
module: Module to make streaming
Returns:
Streaming-enabled module
"""
passSimple streaming output:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.Predict("question -> answer")
# Stream to console
with dspy.streamify(qa) as stream:
result = qa(question="Write a short poem about coding")
for chunk in stream:
print(chunk, end="", flush=True)
print() # New line after streamingStream to web frontend:
import dspy
from flask import Flask, Response, stream_with_context
app = Flask(__name__)
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.ChainOfThought("question -> answer")
@app.route('/stream/<question>')
def stream_answer(question):
"""Stream answer to frontend."""
def generate():
with dspy.streamify(qa) as stream:
# Execute in background
result = qa(question=question)
# Stream chunks to client
for chunk in stream:
yield f"data: {chunk}\n\n"
# Send completion marker
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream'
)
# Frontend JavaScript:
# const eventSource = new EventSource('/stream/what-is-ml');
# eventSource.onmessage = (event) => {
# if (event.data === '[DONE]') {
# eventSource.close();
# } else {
# document.getElementById('output').textContent += event.data;
# }
# };Show progress during streaming:
import dspy
import sys
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
class StreamingRAG(dspy.Module):
def __init__(self):
super().__init__()
self.retrieve = dspy.Retrieve(k=3)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
# Show retrieval status
print("[Retrieving documents...]", file=sys.stderr)
context = self.retrieve(query=question).passages
# Show generation status
print("[Generating answer...]", file=sys.stderr)
return self.generate(context=context, question=question)
rag = StreamingRAG()
# Stream with progress indicators
with dspy.streamify(rag) as stream:
result = rag(question="What is deep learning?")
print("\nAnswer: ", end="")
for chunk in stream:
print(chunk, end="", flush=True)Buffer chunks before displaying:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.Predict("question -> answer")
# Accumulate chunks
with dspy.streamify(qa) as stream:
result = qa(question="Explain AI safety")
buffer = []
for chunk in stream:
buffer.append(chunk)
# Display every 10 chunks or at word boundaries
if len(buffer) >= 10 or chunk.endswith(' '):
print(''.join(buffer), end="", flush=True)
buffer.clear()
# Display remaining
if buffer:
print(''.join(buffer), end="", flush=True)Asynchronous streaming for concurrent operations:
import dspy
import asyncio
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
async def stream_answer(question):
"""Async streaming."""
qa = dspy.Predict("question -> answer")
# Note: actual async streaming implementation depends on
# LM provider support for async streaming
with dspy.streamify(qa) as stream:
result = await qa.acall(question=question)
async for chunk in stream:
print(chunk, end="", flush=True)
await asyncio.sleep(0) # Yield to event loop
# Run async
asyncio.run(stream_answer("What is machine learning?"))Stream from multiple modules:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
class Pipeline(dspy.Module):
def __init__(self):
super().__init__()
self.summarize = dspy.Predict("text -> summary")
self.translate = dspy.Predict("english_text -> french_text")
def forward(self, text):
summary = self.summarize(text=text)
translation = self.translate(english_text=summary.summary)
return translation
pipeline = Pipeline()
# Stream entire pipeline
with dspy.streamify(pipeline) as stream:
result = pipeline(text="Long English article...")
print("Streaming pipeline output:")
for chunk in stream:
print(chunk, end="", flush=True)Add callbacks to streaming:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
class StreamCallback:
"""Callback for stream events."""
def __init__(self):
self.chunks = []
self.start_time = None
self.end_time = None
def on_start(self):
"""Called when streaming starts."""
import time
self.start_time = time.time()
print("[Streaming started]")
def on_chunk(self, chunk):
"""Called for each chunk."""
self.chunks.append(chunk)
# Could send to analytics, logging, etc.
def on_end(self):
"""Called when streaming ends."""
import time
self.end_time = time.time()
duration = self.end_time - self.start_time
print(f"\n[Streaming complete: {duration:.2f}s, {len(self.chunks)} chunks]")
qa = dspy.Predict("question -> answer")
callback = StreamCallback()
callback.on_start()
with dspy.streamify(qa) as stream:
result = qa(question="What is quantum computing?")
for chunk in stream:
callback.on_chunk(chunk)
print(chunk, end="", flush=True)
callback.on_end()Control streaming rate:
import dspy
import time
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.Predict("question -> answer")
# Stream with rate limit (chars per second)
RATE_LIMIT = 50 # 50 characters per second
CHUNK_SIZE = 5 # Process 5 chars at a time
with dspy.streamify(qa) as stream:
result = qa(question="Explain artificial intelligence")
buffer = ""
for chunk in stream:
buffer += chunk
# Release buffer at rate limit
while len(buffer) >= CHUNK_SIZE:
print(buffer[:CHUNK_SIZE], end="", flush=True)
buffer = buffer[CHUNK_SIZE:]
time.sleep(CHUNK_SIZE / RATE_LIMIT)
# Flush remaining
if buffer:
print(buffer, end="", flush=True)Handle errors during streaming:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.Predict("question -> answer")
try:
with dspy.streamify(qa) as stream:
result = qa(question="Complex question")
accumulated = []
for chunk in stream:
accumulated.append(chunk)
print(chunk, end="", flush=True)
except Exception as e:
print(f"\n[Error during streaming: {e}]")
# Partial output still available in accumulated
print(f"Partial output: {''.join(accumulated)}")
else:
print("\n[Streaming successful]")Save streaming output to file:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.ChainOfThought("question -> answer")
# Stream to file and console
with open("output.txt", "w") as f:
with dspy.streamify(qa) as stream:
result = qa(question="Explain machine learning in detail")
for chunk in stream:
# Write to file
f.write(chunk)
f.flush()
# Display to console
print(chunk, end="", flush=True)
print("\n\nOutput saved to output.txt")