CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dspy

A declarative framework for programming foundation models through compositional Python code, enabling modular AI systems with automated optimization algorithms that synthesize examples, generate instructions, and finetune weights based on user-defined metrics.

Overview
Eval results
Files

utilities.mddocs/

Utilities

Helper functions for async/sync conversion, module serialization, usage tracking, and logging configuration.

Capabilities

Async/Sync Conversion

Asyncify

Convert synchronous functions to async.

def asyncify(func: callable):
    """
    Convert synchronous function to async.

    Wraps a synchronous function to run in an async context without
    blocking the event loop. Useful for running sync code in async programs.

    Args:
        func (callable): Synchronous function to convert

    Returns:
        Async version of function
    """
    pass

Usage:

import dspy
import time

# Synchronous function
def slow_computation(x):
    time.sleep(2)
    return x * 2

# Convert to async
async_computation = dspy.asyncify(slow_computation)

# Use in async context
import asyncio

async def main():
    result = await async_computation(21)
    print(result)  # 42

asyncio.run(main())

Syncify

Convert async functions to sync.

def syncify(func: callable):
    """
    Convert async function to sync.

    Wraps an async function to run synchronously. Useful for calling
    async code from synchronous contexts.

    Args:
        func (callable): Async function to convert

    Returns:
        Sync version of function
    """
    pass

Usage:

import dspy
import asyncio

# Async function
async def async_fetch(url):
    await asyncio.sleep(1)
    return f"Data from {url}"

# Convert to sync
sync_fetch = dspy.syncify(async_fetch)

# Use in sync context
result = sync_fetch("https://example.com")
print(result)  # "Data from https://example.com"

# Useful for integrating async DSPy modules in sync code
async def async_qa(question):
    qa = dspy.ChainOfThought("question -> answer")
    return await qa.acall(question=question)

sync_qa = dspy.syncify(async_qa)
result = sync_qa("What is ML?")

Saving and Loading

Load Module

Load a saved DSPy module from disk.

def load(path: str, allow_pickle: bool = False):
    """
    Load saved module from disk.

    Args:
        path (str): Directory path to saved module
        allow_pickle (bool): Allow loading pickled data (default: False for security)

    Returns:
        Module instance

    Raises:
        FileNotFoundError: If path doesn't exist
        SecurityError: If pickle data found but allow_pickle=False
    """
    pass

Usage:

import dspy

# Save module
module = MyModule()
module.save("my_module/")

# Load module
loaded = dspy.load("my_module/")
result = loaded(input=data)

# Load with pickle support (use cautiously)
loaded = dspy.load("my_module/", allow_pickle=True)

Module Save Method:

The save functionality is part of Module class:

import dspy

class MyRAG(dspy.Module):
    def __init__(self):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=3)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        context = self.retrieve(query=question).passages
        return self.generate(context=context, question=question)

# Create and optimize
rag = MyRAG()
optimizer = dspy.BootstrapFewShot(metric=my_metric)
compiled = optimizer.compile(rag, trainset=trainset)

# Save optimized module
compiled.save("optimized_rag/")

# Load later
loaded_rag = dspy.load("optimized_rag/")
result = loaded_rag(question="What is AI?")

Usage Tracking

Track token usage within a context.

def track_usage():
    """
    Track token usage in context.

    Context manager that tracks LM token usage within its scope.
    Provides methods to access accumulated usage statistics.

    Returns:
        UsageTracker context manager
    """
    pass

UsageTracker:

class UsageTracker:
    """
    Token usage tracker.

    Tracks token consumption across multiple LM calls within a context.
    """

    def get_total_tokens(self) -> int:
        """
        Get total tokens used.

        Returns:
            Total token count
        """
        pass

    def get_prompt_tokens(self) -> int:
        """
        Get prompt tokens used.

        Returns:
            Prompt token count
        """
        pass

    def get_completion_tokens(self) -> int:
        """
        Get completion tokens used.

        Returns:
            Completion token count
        """
        pass

    def get_usage_stats(self) -> dict:
        """
        Get detailed usage statistics.

        Returns:
            Dictionary with detailed token usage
        """
        pass

Usage:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

# Track usage in context
with dspy.track_usage() as tracker:
    qa = dspy.ChainOfThought("question -> answer")

    result1 = qa(question="What is ML?")
    result2 = qa(question="What is DL?")
    result3 = qa(question="What is AI?")

    # Get usage statistics
    total = tracker.get_total_tokens()
    prompt = tracker.get_prompt_tokens()
    completion = tracker.get_completion_tokens()

    print(f"Total tokens: {total}")
    print(f"Prompt tokens: {prompt}")
    print(f"Completion tokens: {completion}")

    # Detailed stats
    stats = tracker.get_usage_stats()
    print(f"Number of calls: {stats['num_calls']}")
    print(f"Average tokens per call: {stats['avg_tokens']}")

Track Complex Program:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

class RAG(dspy.Module):
    def __init__(self):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=3)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        context = self.retrieve(query=question).passages
        return self.generate(context=context, question=question)

rag = RAG()

# Track RAG usage
with dspy.track_usage() as tracker:
    questions = [
        "What is machine learning?",
        "What is deep learning?",
        "What is neural networks?"
    ]

    for q in questions:
        result = rag(question=q)

    print(f"Total tokens for {len(questions)} questions: {tracker.get_total_tokens()}")
    print(f"Average tokens per question: {tracker.get_total_tokens() / len(questions):.1f}")

Logging Configuration

Configure DSPy Loggers

Configure DSPy's logging system.

def configure_dspy_loggers(name: str):
    """
    Configure DSPy logging system.

    Args:
        name (str): Logger name to configure
    """
    pass

Enable Logging

Enable DSPy logging output.

def enable_logging():
    """Enable DSPy logging output."""
    pass

Disable Logging

Disable DSPy logging output.

def disable_logging():
    """Disable DSPy logging output."""
    pass

Usage:

import dspy

# Enable logging for debugging
dspy.enable_logging()

# Configure specific logger
dspy.configure_dspy_loggers("dspy.predict")

# Your code here
qa = dspy.Predict("question -> answer")
result = qa(question="What is ML?")

# Disable logging
dspy.disable_logging()

LiteLLM Logging

Control LiteLLM logging output (also available in configuration module).

def enable_litellm_logging():
    """Enable LiteLLM debug logging."""
    pass

def disable_litellm_logging():
    """Disable LiteLLM debug logging."""
    pass

Usage:

import dspy

# Disable verbose LiteLLM logs (recommended)
dspy.disable_litellm_logging()

# Enable for debugging LM calls
dspy.enable_litellm_logging()

# Your code
lm = dspy.LM('openai/gpt-4o-mini')
response = lm(prompt="Hello")

# Disable again
dspy.disable_litellm_logging()

Utility Patterns

Module Serialization

Save and load optimized modules:

import dspy

# Train and optimize
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

program = MyModule()
optimizer = dspy.BootstrapFewShot(metric=my_metric)
compiled = optimizer.compile(program, trainset=trainset)

# Save to disk
compiled.save("models/optimized_v1/")

# Load in production
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
production_model = dspy.load("models/optimized_v1/")

# Use immediately
result = production_model(input=data)

Version Management

Manage multiple model versions:

import dspy
import os
from datetime import datetime

class ModelManager:
    """Manage model versions."""

    def __init__(self, base_dir="models/"):
        self.base_dir = base_dir
        os.makedirs(base_dir, exist_ok=True)

    def save_model(self, model, name, version=None):
        """Save model with version."""
        if version is None:
            version = datetime.now().strftime("%Y%m%d_%H%M%S")

        path = os.path.join(self.base_dir, name, version)
        model.save(path)
        print(f"Saved {name} v{version} to {path}")
        return path

    def load_model(self, name, version="latest"):
        """Load model by version."""
        model_dir = os.path.join(self.base_dir, name)

        if version == "latest":
            # Get most recent version
            versions = sorted(os.listdir(model_dir))
            version = versions[-1]

        path = os.path.join(model_dir, version)
        model = dspy.load(path)
        print(f"Loaded {name} v{version} from {path}")
        return model

# Use manager
manager = ModelManager()

# Save new version
compiled = optimizer.compile(program, trainset=trainset)
manager.save_model(compiled, "rag_model", version="1.0.0")

# Load specific version
model_v1 = manager.load_model("rag_model", version="1.0.0")

# Load latest
latest = manager.load_model("rag_model", version="latest")

Async Integration

Integrate async DSPy modules:

import dspy
import asyncio

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

# Async wrapper for module
async def async_rag(question):
    """Async RAG call."""
    rag = RAG()
    return await rag.acall(question=question)

# Process multiple questions concurrently
async def process_batch(questions):
    """Process questions in parallel."""
    tasks = [async_rag(q) for q in questions]
    results = await asyncio.gather(*tasks)
    return results

# Run
questions = ["What is ML?", "What is AI?", "What is DL?"]
results = asyncio.run(process_batch(questions))

for q, r in zip(questions, results):
    print(f"Q: {q}")
    print(f"A: {r.answer}\n")

Cost Tracking

Track costs across operations:

import dspy

dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

# Cost tracking with usage
class CostTracker:
    """Track costs based on token usage."""

    # Pricing (per 1M tokens)
    PRICES = {
        'gpt-4o-mini': {'prompt': 0.15, 'completion': 0.60},
        'gpt-4o': {'prompt': 2.50, 'completion': 10.00},
    }

    def __init__(self, model_name):
        self.model_name = model_name
        self.total_cost = 0.0

    def calculate_cost(self, tracker):
        """Calculate cost from usage tracker."""
        prices = self.PRICES.get(self.model_name, {'prompt': 0, 'completion': 0})

        prompt_cost = (tracker.get_prompt_tokens() / 1_000_000) * prices['prompt']
        completion_cost = (tracker.get_completion_tokens() / 1_000_000) * prices['completion']

        cost = prompt_cost + completion_cost
        self.total_cost += cost
        return cost

# Use cost tracker
cost_tracker = CostTracker('gpt-4o-mini')

with dspy.track_usage() as usage_tracker:
    qa = dspy.ChainOfThought("question -> answer")

    for question in questions:
        result = qa(question=question)

    cost = cost_tracker.calculate_cost(usage_tracker)
    print(f"Cost: ${cost:.4f}")
    print(f"Total cost: ${cost_tracker.total_cost:.4f}")

Debugging Setup

Configure for debugging:

import dspy
import logging

# Set up logging
dspy.enable_logging()
dspy.enable_litellm_logging()

# Configure Python logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Disable caching for debugging
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False
)

# Configure with tracing
trace = []
dspy.configure(
    lm=dspy.LM('openai/gpt-4o-mini'),
    trace=trace
)

# Run program
qa = dspy.ChainOfThought("question -> answer")
result = qa(question="What is ML?")

# Inspect trace
print(f"\nTrace has {len(trace)} entries")
for i, entry in enumerate(trace):
    print(f"\nStep {i}:")
    print(entry)

# Inspect history
qa.inspect_history(n=1)

Production Setup

Configure for production:

import dspy

# Disable debug logging
dspy.disable_logging()
dspy.disable_litellm_logging()

# Enable caching for performance
dspy.configure_cache(
    enable_disk_cache=True,
    enable_memory_cache=True,
    disk_cache_dir="/var/cache/dspy",
    disk_size_limit_bytes=10_000_000_000,  # 10GB
    memory_max_entries=10000
)

# Configure with production settings
dspy.configure(
    lm=dspy.LM(
        'openai/gpt-4o-mini',
        temperature=0.0,  # Deterministic
        num_retries=3,    # Retry on failure
        cache=True
    ),
    track_usage=True  # Monitor costs
)

# Load optimized model
model = dspy.load("models/production/v1.0.0/")

# Use with error handling
try:
    with dspy.track_usage() as tracker:
        result = model(input=data)

        # Log usage
        tokens = tracker.get_total_tokens()
        print(f"Request used {tokens} tokens")

except Exception as e:
    # Log error
    print(f"Error: {e}")
    # Handle gracefully

Testing Utilities

Utilities for testing:

import dspy
import unittest

class TestDSPyModule(unittest.TestCase):
    """Test DSPy module."""

    @classmethod
    def setUpClass(cls):
        """Set up test environment."""
        # Disable logging
        dspy.disable_logging()
        dspy.disable_litellm_logging()

        # Configure with test LM
        dspy.configure(
            lm=dspy.LM('openai/gpt-4o-mini', temperature=0.0),
            track_usage=False
        )

    def test_module_output(self):
        """Test module produces valid output."""
        qa = dspy.Predict("question -> answer")
        result = qa(question="Test question")

        self.assertIsNotNone(result.answer)
        self.assertIsInstance(result.answer, str)

    def test_module_serialization(self):
        """Test save/load."""
        qa = dspy.ChainOfThought("question -> answer")

        # Save
        qa.save("/tmp/test_module/")

        # Load
        loaded = dspy.load("/tmp/test_module/")

        # Compare behavior
        result1 = qa(question="Test")
        result2 = loaded(question="Test")

        self.assertEqual(result1.answer, result2.answer)

if __name__ == '__main__':
    unittest.main()

Install with Tessl CLI

npx tessl i tessl/pypi-dspy

docs

adapters.md

configuration.md

datasets.md

evaluation.md

index.md

language-models.md

modules.md

optimization.md

prediction.md

retrieval.md

signatures.md

streaming.md

utilities.md

tile.json