or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

adapters.mdconfiguration.mddatasets.mdevaluation.mdindex.mdlanguage-models.mdmodules.mdoptimization.mdprediction.mdretrieval.mdsignatures.mdstreaming.mdutilities.md
tile.json

streaming.mddocs/

Streaming

Support for streaming responses from language models in real-time. Enables progressive output display and interactive user experiences.

Capabilities

Streamify

Decorator to enable streaming for DSPy modules.

def streamify(module):
    """
    Enable streaming for a module.

    Wraps a module to capture and stream LM responses in real-time
    as they are generated. Returns a context manager.

    Args:
        module: Module to enable streaming for

    Returns:
        StreamListener context manager
    """
    pass

Usage:

import dspy

# Configure
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

# Create module
qa = dspy.ChainOfThought("question -> answer")

# Stream responses
with dspy.streamify(qa) as stream:
    # Execute module
    result = qa(question="What is machine learning?")

    # Stream chunks as they arrive
    for chunk in stream:
        print(chunk, end="", flush=True)

# Final result available
print("\n\nFinal answer:", result.answer)

Stream Listener

Listens to streaming responses from a module.

class StreamListener:
    """
    Stream listener for module responses.

    Context manager that captures streaming output from module
    execution. Provides iterator interface for consuming chunks.
    """

    def __iter__(self):
        """
        Iterate over stream chunks.

        Yields:
            str: Text chunks as they arrive
        """
        pass

    def __enter__(self):
        """
        Enter context manager.

        Returns:
            Self for use in with statement
        """
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit context manager.

        Args:
            exc_type: Exception type if raised
            exc_val: Exception value if raised
            exc_tb: Exception traceback if raised
        """
        pass

Usage:

import dspy

qa = dspy.ChainOfThought("question -> answer")

# Stream with listener
listener = dspy.streamify(qa)
with listener as stream:
    result = qa(question="Explain quantum computing")

    # Iterate over chunks
    for chunk in stream:
        # Process each chunk
        print(chunk, end="")
        # Could send to frontend, write to file, etc.

print("\n\nStreaming complete")

Stream Response

Container for streaming response chunks.

class StreamResponse:
    """
    Container for streaming response.

    Wraps streaming chunks with metadata about the stream state.
    """

    content: str
    """Current chunk content."""

    is_final: bool
    """Whether this is the final chunk."""

    metadata: dict
    """Additional metadata about the chunk."""

Status Message

Status message in streaming responses.

class StatusMessage:
    """
    Status message during streaming.

    Provides status updates during module execution
    (e.g., "Generating response...", "Calling tool...").
    """

    message: str
    """Status message text."""

    timestamp: float
    """Timestamp of status update."""

Status Message Provider

Provider for status messages during streaming.

class StatusMessageProvider:
    """
    Provider for status messages.

    Emits status updates during streaming to provide
    feedback about module execution state.
    """

    def send_status(self, message: str):
        """
        Send status message.

        Args:
            message (str): Status message to send
        """
        pass

Streaming Response Helper

Creates a streaming response handler.

def streaming_response(module):
    """
    Create streaming response handler.

    Args:
        module: Module to stream from

    Returns:
        Streaming response handler
    """
    pass

Apply Sync Streaming

Apply synchronous streaming to a module.

def apply_sync_streaming(module):
    """
    Apply synchronous streaming to module.

    Args:
        module: Module to make streaming

    Returns:
        Streaming-enabled module
    """
    pass

Streaming Patterns

Basic Streaming

Simple streaming output:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

qa = dspy.Predict("question -> answer")

# Stream to console
with dspy.streamify(qa) as stream:
    result = qa(question="Write a short poem about coding")

    for chunk in stream:
        print(chunk, end="", flush=True)

print()  # New line after streaming

Web Application Streaming

Stream to web frontend:

import dspy
from flask import Flask, Response, stream_with_context

app = Flask(__name__)
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

qa = dspy.ChainOfThought("question -> answer")

@app.route('/stream/<question>')
def stream_answer(question):
    """Stream answer to frontend."""

    def generate():
        with dspy.streamify(qa) as stream:
            # Execute in background
            result = qa(question=question)

            # Stream chunks to client
            for chunk in stream:
                yield f"data: {chunk}\n\n"

        # Send completion marker
        yield "data: [DONE]\n\n"

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream'
    )

# Frontend JavaScript:
# const eventSource = new EventSource('/stream/what-is-ml');
# eventSource.onmessage = (event) => {
#     if (event.data === '[DONE]') {
#         eventSource.close();
#     } else {
#         document.getElementById('output').textContent += event.data;
#     }
# };

Streaming with Progress

Show progress during streaming:

import dspy
import sys

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

class StreamingRAG(dspy.Module):
    def __init__(self):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=3)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        # Show retrieval status
        print("[Retrieving documents...]", file=sys.stderr)
        context = self.retrieve(query=question).passages

        # Show generation status
        print("[Generating answer...]", file=sys.stderr)
        return self.generate(context=context, question=question)

rag = StreamingRAG()

# Stream with progress indicators
with dspy.streamify(rag) as stream:
    result = rag(question="What is deep learning?")

    print("\nAnswer: ", end="")
    for chunk in stream:
        print(chunk, end="", flush=True)

Buffered Streaming

Buffer chunks before displaying:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
qa = dspy.Predict("question -> answer")

# Accumulate chunks
with dspy.streamify(qa) as stream:
    result = qa(question="Explain AI safety")

    buffer = []
    for chunk in stream:
        buffer.append(chunk)

        # Display every 10 chunks or at word boundaries
        if len(buffer) >= 10 or chunk.endswith(' '):
            print(''.join(buffer), end="", flush=True)
            buffer.clear()

    # Display remaining
    if buffer:
        print(''.join(buffer), end="", flush=True)

Async Streaming

Asynchronous streaming for concurrent operations:

import dspy
import asyncio

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

async def stream_answer(question):
    """Async streaming."""
    qa = dspy.Predict("question -> answer")

    # Note: actual async streaming implementation depends on
    # LM provider support for async streaming
    with dspy.streamify(qa) as stream:
        result = await qa.acall(question=question)

        async for chunk in stream:
            print(chunk, end="", flush=True)
            await asyncio.sleep(0)  # Yield to event loop

# Run async
asyncio.run(stream_answer("What is machine learning?"))

Multi-Module Streaming

Stream from multiple modules:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

class Pipeline(dspy.Module):
    def __init__(self):
        super().__init__()
        self.summarize = dspy.Predict("text -> summary")
        self.translate = dspy.Predict("english_text -> french_text")

    def forward(self, text):
        summary = self.summarize(text=text)
        translation = self.translate(english_text=summary.summary)
        return translation

pipeline = Pipeline()

# Stream entire pipeline
with dspy.streamify(pipeline) as stream:
    result = pipeline(text="Long English article...")

    print("Streaming pipeline output:")
    for chunk in stream:
        print(chunk, end="", flush=True)

Streaming with Callbacks

Add callbacks to streaming:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

class StreamCallback:
    """Callback for stream events."""

    def __init__(self):
        self.chunks = []
        self.start_time = None
        self.end_time = None

    def on_start(self):
        """Called when streaming starts."""
        import time
        self.start_time = time.time()
        print("[Streaming started]")

    def on_chunk(self, chunk):
        """Called for each chunk."""
        self.chunks.append(chunk)
        # Could send to analytics, logging, etc.

    def on_end(self):
        """Called when streaming ends."""
        import time
        self.end_time = time.time()
        duration = self.end_time - self.start_time
        print(f"\n[Streaming complete: {duration:.2f}s, {len(self.chunks)} chunks]")

qa = dspy.Predict("question -> answer")
callback = StreamCallback()

callback.on_start()
with dspy.streamify(qa) as stream:
    result = qa(question="What is quantum computing?")

    for chunk in stream:
        callback.on_chunk(chunk)
        print(chunk, end="", flush=True)

callback.on_end()

Rate-Limited Streaming

Control streaming rate:

import dspy
import time

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

qa = dspy.Predict("question -> answer")

# Stream with rate limit (chars per second)
RATE_LIMIT = 50  # 50 characters per second
CHUNK_SIZE = 5   # Process 5 chars at a time

with dspy.streamify(qa) as stream:
    result = qa(question="Explain artificial intelligence")

    buffer = ""
    for chunk in stream:
        buffer += chunk

        # Release buffer at rate limit
        while len(buffer) >= CHUNK_SIZE:
            print(buffer[:CHUNK_SIZE], end="", flush=True)
            buffer = buffer[CHUNK_SIZE:]
            time.sleep(CHUNK_SIZE / RATE_LIMIT)

    # Flush remaining
    if buffer:
        print(buffer, end="", flush=True)

Error Handling with Streaming

Handle errors during streaming:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

qa = dspy.Predict("question -> answer")

try:
    with dspy.streamify(qa) as stream:
        result = qa(question="Complex question")

        accumulated = []
        for chunk in stream:
            accumulated.append(chunk)
            print(chunk, end="", flush=True)

except Exception as e:
    print(f"\n[Error during streaming: {e}]")
    # Partial output still available in accumulated
    print(f"Partial output: {''.join(accumulated)}")
else:
    print("\n[Streaming successful]")

Save Streamed Output

Save streaming output to file:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

qa = dspy.ChainOfThought("question -> answer")

# Stream to file and console
with open("output.txt", "w") as f:
    with dspy.streamify(qa) as stream:
        result = qa(question="Explain machine learning in detail")

        for chunk in stream:
            # Write to file
            f.write(chunk)
            f.flush()

            # Display to console
            print(chunk, end="", flush=True)

print("\n\nOutput saved to output.txt")