A declarative framework for programming foundation models through compositional Python code, enabling modular AI systems with automated optimization algorithms that synthesize examples, generate instructions, and finetune weights based on user-defined metrics.
Helper functions for async/sync conversion, module serialization, usage tracking, and logging configuration.
Convert synchronous functions to async.
def asyncify(func: callable):
"""
Convert synchronous function to async.
Wraps a synchronous function to run in an async context without
blocking the event loop. Useful for running sync code in async programs.
Args:
func (callable): Synchronous function to convert
Returns:
Async version of function
"""
passUsage:
import dspy
import time
# Synchronous function
def slow_computation(x):
time.sleep(2)
return x * 2
# Convert to async
async_computation = dspy.asyncify(slow_computation)
# Use in async context
import asyncio
async def main():
result = await async_computation(21)
print(result) # 42
asyncio.run(main())Convert async functions to sync.
def syncify(func: callable):
"""
Convert async function to sync.
Wraps an async function to run synchronously. Useful for calling
async code from synchronous contexts.
Args:
func (callable): Async function to convert
Returns:
Sync version of function
"""
passUsage:
import dspy
import asyncio
# Async function
async def async_fetch(url):
await asyncio.sleep(1)
return f"Data from {url}"
# Convert to sync
sync_fetch = dspy.syncify(async_fetch)
# Use in sync context
result = sync_fetch("https://example.com")
print(result) # "Data from https://example.com"
# Useful for integrating async DSPy modules in sync code
async def async_qa(question):
qa = dspy.ChainOfThought("question -> answer")
return await qa.acall(question=question)
sync_qa = dspy.syncify(async_qa)
result = sync_qa("What is ML?")Load a saved DSPy module from disk.
def load(path: str, allow_pickle: bool = False):
"""
Load saved module from disk.
Args:
path (str): Directory path to saved module
allow_pickle (bool): Allow loading pickled data (default: False for security)
Returns:
Module instance
Raises:
FileNotFoundError: If path doesn't exist
SecurityError: If pickle data found but allow_pickle=False
"""
passUsage:
import dspy
# Save module
module = MyModule()
module.save("my_module/")
# Load module
loaded = dspy.load("my_module/")
result = loaded(input=data)
# Load with pickle support (use cautiously)
loaded = dspy.load("my_module/", allow_pickle=True)Module Save Method:
The save functionality is part of Module class:
import dspy
class MyRAG(dspy.Module):
def __init__(self):
super().__init__()
self.retrieve = dspy.Retrieve(k=3)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(query=question).passages
return self.generate(context=context, question=question)
# Create and optimize
rag = MyRAG()
optimizer = dspy.BootstrapFewShot(metric=my_metric)
compiled = optimizer.compile(rag, trainset=trainset)
# Save optimized module
compiled.save("optimized_rag/")
# Load later
loaded_rag = dspy.load("optimized_rag/")
result = loaded_rag(question="What is AI?")Track token usage within a context.
def track_usage():
"""
Track token usage in context.
Context manager that tracks LM token usage within its scope.
Provides methods to access accumulated usage statistics.
Returns:
UsageTracker context manager
"""
passUsageTracker:
class UsageTracker:
"""
Token usage tracker.
Tracks token consumption across multiple LM calls within a context.
"""
def get_total_tokens(self) -> int:
"""
Get total tokens used.
Returns:
Total token count
"""
pass
def get_prompt_tokens(self) -> int:
"""
Get prompt tokens used.
Returns:
Prompt token count
"""
pass
def get_completion_tokens(self) -> int:
"""
Get completion tokens used.
Returns:
Completion token count
"""
pass
def get_usage_stats(self) -> dict:
"""
Get detailed usage statistics.
Returns:
Dictionary with detailed token usage
"""
passUsage:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
# Track usage in context
with dspy.track_usage() as tracker:
qa = dspy.ChainOfThought("question -> answer")
result1 = qa(question="What is ML?")
result2 = qa(question="What is DL?")
result3 = qa(question="What is AI?")
# Get usage statistics
total = tracker.get_total_tokens()
prompt = tracker.get_prompt_tokens()
completion = tracker.get_completion_tokens()
print(f"Total tokens: {total}")
print(f"Prompt tokens: {prompt}")
print(f"Completion tokens: {completion}")
# Detailed stats
stats = tracker.get_usage_stats()
print(f"Number of calls: {stats['num_calls']}")
print(f"Average tokens per call: {stats['avg_tokens']}")Track Complex Program:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
class RAG(dspy.Module):
def __init__(self):
super().__init__()
self.retrieve = dspy.Retrieve(k=3)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(query=question).passages
return self.generate(context=context, question=question)
rag = RAG()
# Track RAG usage
with dspy.track_usage() as tracker:
questions = [
"What is machine learning?",
"What is deep learning?",
"What is neural networks?"
]
for q in questions:
result = rag(question=q)
print(f"Total tokens for {len(questions)} questions: {tracker.get_total_tokens()}")
print(f"Average tokens per question: {tracker.get_total_tokens() / len(questions):.1f}")Configure DSPy's logging system.
def configure_dspy_loggers(name: str):
"""
Configure DSPy logging system.
Args:
name (str): Logger name to configure
"""
passEnable DSPy logging output.
def enable_logging():
"""Enable DSPy logging output."""
passDisable DSPy logging output.
def disable_logging():
"""Disable DSPy logging output."""
passUsage:
import dspy
# Enable logging for debugging
dspy.enable_logging()
# Configure specific logger
dspy.configure_dspy_loggers("dspy.predict")
# Your code here
qa = dspy.Predict("question -> answer")
result = qa(question="What is ML?")
# Disable logging
dspy.disable_logging()Control LiteLLM logging output (also available in configuration module).
def enable_litellm_logging():
"""Enable LiteLLM debug logging."""
pass
def disable_litellm_logging():
"""Disable LiteLLM debug logging."""
passUsage:
import dspy
# Disable verbose LiteLLM logs (recommended)
dspy.disable_litellm_logging()
# Enable for debugging LM calls
dspy.enable_litellm_logging()
# Your code
lm = dspy.LM('openai/gpt-4o-mini')
response = lm(prompt="Hello")
# Disable again
dspy.disable_litellm_logging()Save and load optimized modules:
import dspy
# Train and optimize
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
program = MyModule()
optimizer = dspy.BootstrapFewShot(metric=my_metric)
compiled = optimizer.compile(program, trainset=trainset)
# Save to disk
compiled.save("models/optimized_v1/")
# Load in production
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
production_model = dspy.load("models/optimized_v1/")
# Use immediately
result = production_model(input=data)Manage multiple model versions:
import dspy
import os
from datetime import datetime
class ModelManager:
"""Manage model versions."""
def __init__(self, base_dir="models/"):
self.base_dir = base_dir
os.makedirs(base_dir, exist_ok=True)
def save_model(self, model, name, version=None):
"""Save model with version."""
if version is None:
version = datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(self.base_dir, name, version)
model.save(path)
print(f"Saved {name} v{version} to {path}")
return path
def load_model(self, name, version="latest"):
"""Load model by version."""
model_dir = os.path.join(self.base_dir, name)
if version == "latest":
# Get most recent version
versions = sorted(os.listdir(model_dir))
version = versions[-1]
path = os.path.join(model_dir, version)
model = dspy.load(path)
print(f"Loaded {name} v{version} from {path}")
return model
# Use manager
manager = ModelManager()
# Save new version
compiled = optimizer.compile(program, trainset=trainset)
manager.save_model(compiled, "rag_model", version="1.0.0")
# Load specific version
model_v1 = manager.load_model("rag_model", version="1.0.0")
# Load latest
latest = manager.load_model("rag_model", version="latest")Integrate async DSPy modules:
import dspy
import asyncio
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
# Async wrapper for module
async def async_rag(question):
"""Async RAG call."""
rag = RAG()
return await rag.acall(question=question)
# Process multiple questions concurrently
async def process_batch(questions):
"""Process questions in parallel."""
tasks = [async_rag(q) for q in questions]
results = await asyncio.gather(*tasks)
return results
# Run
questions = ["What is ML?", "What is AI?", "What is DL?"]
results = asyncio.run(process_batch(questions))
for q, r in zip(questions, results):
print(f"Q: {q}")
print(f"A: {r.answer}\n")Track costs across operations:
import dspy
dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))
# Cost tracking with usage
class CostTracker:
"""Track costs based on token usage."""
# Pricing (per 1M tokens)
PRICES = {
'gpt-4o-mini': {'prompt': 0.15, 'completion': 0.60},
'gpt-4o': {'prompt': 2.50, 'completion': 10.00},
}
def __init__(self, model_name):
self.model_name = model_name
self.total_cost = 0.0
def calculate_cost(self, tracker):
"""Calculate cost from usage tracker."""
prices = self.PRICES.get(self.model_name, {'prompt': 0, 'completion': 0})
prompt_cost = (tracker.get_prompt_tokens() / 1_000_000) * prices['prompt']
completion_cost = (tracker.get_completion_tokens() / 1_000_000) * prices['completion']
cost = prompt_cost + completion_cost
self.total_cost += cost
return cost
# Use cost tracker
cost_tracker = CostTracker('gpt-4o-mini')
with dspy.track_usage() as usage_tracker:
qa = dspy.ChainOfThought("question -> answer")
for question in questions:
result = qa(question=question)
cost = cost_tracker.calculate_cost(usage_tracker)
print(f"Cost: ${cost:.4f}")
print(f"Total cost: ${cost_tracker.total_cost:.4f}")Configure for debugging:
import dspy
import logging
# Set up logging
dspy.enable_logging()
dspy.enable_litellm_logging()
# Configure Python logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Disable caching for debugging
dspy.configure_cache(
enable_disk_cache=False,
enable_memory_cache=False
)
# Configure with tracing
trace = []
dspy.configure(
lm=dspy.LM('openai/gpt-4o-mini'),
trace=trace
)
# Run program
qa = dspy.ChainOfThought("question -> answer")
result = qa(question="What is ML?")
# Inspect trace
print(f"\nTrace has {len(trace)} entries")
for i, entry in enumerate(trace):
print(f"\nStep {i}:")
print(entry)
# Inspect history
qa.inspect_history(n=1)Configure for production:
import dspy
# Disable debug logging
dspy.disable_logging()
dspy.disable_litellm_logging()
# Enable caching for performance
dspy.configure_cache(
enable_disk_cache=True,
enable_memory_cache=True,
disk_cache_dir="/var/cache/dspy",
disk_size_limit_bytes=10_000_000_000, # 10GB
memory_max_entries=10000
)
# Configure with production settings
dspy.configure(
lm=dspy.LM(
'openai/gpt-4o-mini',
temperature=0.0, # Deterministic
num_retries=3, # Retry on failure
cache=True
),
track_usage=True # Monitor costs
)
# Load optimized model
model = dspy.load("models/production/v1.0.0/")
# Use with error handling
try:
with dspy.track_usage() as tracker:
result = model(input=data)
# Log usage
tokens = tracker.get_total_tokens()
print(f"Request used {tokens} tokens")
except Exception as e:
# Log error
print(f"Error: {e}")
# Handle gracefullyUtilities for testing:
import dspy
import unittest
class TestDSPyModule(unittest.TestCase):
"""Test DSPy module."""
@classmethod
def setUpClass(cls):
"""Set up test environment."""
# Disable logging
dspy.disable_logging()
dspy.disable_litellm_logging()
# Configure with test LM
dspy.configure(
lm=dspy.LM('openai/gpt-4o-mini', temperature=0.0),
track_usage=False
)
def test_module_output(self):
"""Test module produces valid output."""
qa = dspy.Predict("question -> answer")
result = qa(question="Test question")
self.assertIsNotNone(result.answer)
self.assertIsInstance(result.answer, str)
def test_module_serialization(self):
"""Test save/load."""
qa = dspy.ChainOfThought("question -> answer")
# Save
qa.save("/tmp/test_module/")
# Load
loaded = dspy.load("/tmp/test_module/")
# Compare behavior
result1 = qa(question="Test")
result2 = loaded(question="Test")
self.assertEqual(result1.answer, result2.answer)
if __name__ == '__main__':
unittest.main()Install with Tessl CLI
npx tessl i tessl/pypi-dspy