Library to easily interface with LLM API providers
—
Advanced routing system for intelligent load balancing, automatic fallbacks, and retry logic across multiple model deployments. The Router class provides enterprise-grade reliability features including health monitoring, cost optimization, and performance tracking.
Main router class that manages multiple model deployments with intelligent routing strategies and automatic failover capabilities.
class Router:
def __init__(
self,
model_list: Optional[List[DeploymentTypedDict]] = None,
# Caching configuration
redis_url: Optional[str] = None,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_password: Optional[str] = None,
cache_responses: Optional[bool] = False,
cache_kwargs: dict = {},
caching_groups: Optional[List[tuple]] = None,
client_ttl: int = 3600,
# Reliability settings
num_retries: Optional[int] = None,
max_fallbacks: Optional[int] = None,
timeout: Optional[float] = None,
stream_timeout: Optional[float] = None,
default_litellm_params: Optional[dict] = None,
default_max_parallel_requests: Optional[int] = None,
set_verbose: bool = False,
debug_level: Literal["DEBUG", "INFO"] = "INFO",
# Fallback configuration
default_fallbacks: Optional[List[str]] = None,
fallbacks: List = [],
context_window_fallbacks: List = [],
content_policy_fallbacks: List = [],
# Routing strategy
routing_strategy: Literal[
"simple-shuffle",
"least-busy",
"usage-based-routing",
"latency-based-routing",
"cost-based-routing"
] = "simple-shuffle",
# Authentication and validation
enable_pre_call_checks: bool = False,
allowed_fails: int = 3,
cooldown_time: float = 1,
retry_policy: Optional[Dict[str, Any]] = None,
**kwargs
)
"""
Initialize Router with multiple model deployments and routing configuration.
Args:
model_list (Optional[List[DeploymentTypedDict]]): List of model deployment configurations
routing_strategy (str): Strategy for selecting deployments ("simple-shuffle", "least-busy", etc.)
num_retries (Optional[int]): Number of retries per deployment
max_fallbacks (Optional[int]): Maximum fallback deployments to try
timeout (Optional[float]): Request timeout in seconds
cache_responses (Optional[bool]): Enable response caching
fallbacks (List): Global fallback model list
enable_pre_call_checks (bool): Validate deployments before requests
"""Router provides the same completion interfaces as global functions but with intelligent routing and fallback capabilities.
def completion(
self,
model: str,
messages: List[Dict[str, Any]],
# All standard completion parameters
**kwargs
) -> Union[ModelResponse, Iterator[ModelResponseStream]]
"""
Route completion request through configured deployments with fallbacks.
Args:
Same as litellm.completion() but routes through multiple deployments
Returns:
Union[ModelResponse, Iterator[ModelResponseStream]]: Routed completion response
"""
async def acompletion(
self,
model: str,
messages: List[Dict[str, Any]],
**kwargs
) -> Union[ModelResponse, AsyncIterator[ModelResponseStream]]
"""
Async version of router completion with intelligent routing.
"""
def text_completion(
self,
model: str,
prompt: str,
**kwargs
) -> Union[TextCompletionResponse, Iterator[TextCompletionResponse]]
"""
Route text completion request through configured deployments.
"""
async def atext_completion(
self,
model: str,
prompt: str,
**kwargs
) -> Union[TextCompletionResponse, AsyncIterator[TextCompletionResponse]]
"""
Async text completion with routing.
"""
def embedding(
self,
model: str,
input: Union[str, List[str], List[int], List[List[int]]],
**kwargs
) -> EmbeddingResponse
"""
Route embedding request through configured deployments.
"""
async def aembedding(
self,
model: str,
input: Union[str, List[str], List[int], List[List[int]]],
**kwargs
) -> EmbeddingResponse
"""
Async embedding with routing.
"""
def image_generation(
self,
prompt: str,
**kwargs
) -> ImageResponse
"""
Route image generation through configured deployments.
"""
def transcription(
self,
model: str,
file: Union[str, bytes, IO],
**kwargs
) -> TranscriptionResponse
"""
Route transcription through configured deployments.
"""
def speech(
self,
model: str,
input: str,
voice: str,
**kwargs
) -> bytes
"""
Route speech synthesis through configured deployments.
"""
def moderation(
self,
input: Union[str, List[str]],
**kwargs
) -> ModerationCreateResponse
"""
Route moderation through configured deployments.
"""Methods for managing model deployments dynamically during runtime.
def add_deployment(self, deployment: DeploymentTypedDict) -> None:
"""
Add a new model deployment to the router.
Args:
deployment (DeploymentTypedDict): Deployment configuration
"""
def delete_deployment(self, deployment_id: str) -> None:
"""
Remove a deployment from the router.
Args:
deployment_id (str): ID of deployment to remove
"""
def get_deployments(self) -> List[DeploymentTypedDict]:
"""
Get all configured deployments.
Returns:
List[DeploymentTypedDict]: List of all deployments
"""
def set_model_list(self, model_list: List[DeploymentTypedDict]) -> None:
"""
Replace entire model list with new deployments.
Args:
model_list (List[DeploymentTypedDict]): New list of deployments
"""
def update_deployment(
self,
deployment_id: str,
**kwargs
) -> None:
"""
Update configuration of existing deployment.
Args:
deployment_id (str): ID of deployment to update
**kwargs: Updated configuration parameters
"""Health check and monitoring capabilities for deployment status and performance.
def health_check(
self,
model: Optional[str] = None
) -> Dict[str, Any]:
"""
Check health status of deployments.
Args:
model (Optional[str]): Specific model to check, or all if None
Returns:
Dict[str, Any]: Health status report with deployment statuses
"""
async def ahealth_check(
self,
model: Optional[str] = None
) -> Dict[str, Any]:
"""
Async health check of deployments.
Args:
model (Optional[str]): Specific model to check
Returns:
Dict[str, Any]: Health status report
"""Cost tracking, usage analytics, and performance metrics for router deployments.
def get_model_cost_map(self) -> Dict[str, Any]:
"""
Get cost information for all configured models.
Returns:
Dict[str, Any]: Model cost mapping with pricing details
"""
def print_deployment_metrics(self) -> None:
"""
Print detailed metrics for all deployments including:
- Request counts and success rates
- Average latency and throughput
- Cost tracking and token usage
- Error rates and failure types
"""
def reset_cost(self) -> None:
"""
Reset accumulated cost tracking data.
"""
def get_usage_stats(self) -> Dict[str, Any]:
"""
Get comprehensive usage statistics.
Returns:
Dict[str, Any]: Usage statistics including tokens, costs, latencies
"""class DeploymentTypedDict(TypedDict):
"""Model deployment configuration"""
model_name: str
litellm_params: Dict[str, Any]
model_info: Optional[Dict[str, Any]]
class LiteLLMParams(TypedDict):
"""Parameters for LiteLLM model configuration"""
model: str
api_key: Optional[str]
api_base: Optional[str]
api_version: Optional[str]
timeout: Optional[float]
max_retries: Optional[int]
custom_llm_provider: Optional[str]
class ModelInfo(TypedDict):
"""Model metadata and capabilities"""
id: Optional[str]
mode: Optional[Literal["chat", "completion", "embedding"]]
input_cost_per_token: Optional[float]
output_cost_per_token: Optional[float]
max_tokens: Optional[int]
supports_function_calling: Optional[bool]
supports_vision: Optional[bool]from litellm import Router
# Configure multiple OpenAI deployments
model_list = [
{
"model_name": "gpt-4",
"litellm_params": {
"model": "gpt-4",
"api_key": "sk-key1",
"api_base": "https://api.openai.com/v1"
}
},
{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
"api_key": "azure-key",
"api_base": "https://my-azure.openai.azure.com/",
"api_version": "2024-02-01"
}
}
]
router = Router(model_list=model_list)
# Use router like normal completion
response = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)from litellm import Router
model_list = [
{
"model_name": "gpt-4-primary",
"litellm_params": {
"model": "gpt-4",
"api_key": "primary-key"
},
"model_info": {
"id": "primary-deployment"
}
},
{
"model_name": "gpt-4-fallback",
"litellm_params": {
"model": "azure/gpt-4",
"api_key": "azure-key",
"api_base": "https://backup.openai.azure.com/",
"api_version": "2024-02-01"
},
"model_info": {
"id": "backup-deployment"
}
}
]
router = Router(
model_list=model_list,
routing_strategy="least-busy",
num_retries=3,
max_fallbacks=2,
timeout=30,
enable_pre_call_checks=True,
fallbacks=["gpt-3.5-turbo", "claude-3-haiku-20240307"]
)router = Router(
model_list=model_list,
redis_url="redis://localhost:6379",
cache_responses=True,
client_ttl=3600, # 1 hour cache TTL
cache_kwargs={
"ttl": 600, # 10 minute default TTL
"namespace": "litellm_cache"
}
)
# Cached responses for identical requests
response1 = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "What is 2+2?"}]
)
# This will return cached response
response2 = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "What is 2+2?"}]
)model_list = [
{
"model_name": "gpt-4",
"litellm_params": {"model": "gpt-4"},
"model_info": {
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00006
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_info": {
"input_cost_per_token": 0.000001,
"output_cost_per_token": 0.000002
}
}
]
router = Router(
model_list=model_list,
routing_strategy="cost-based-routing"
)
# Router will prefer cheaper models when possible
response = router.completion(
model="gpt-4", # Will route to gpt-3.5-turbo if suitable
messages=[{"role": "user", "content": "Simple question"}]
)# Check overall health
health = router.health_check()
print("Router Health:", health)
# Check specific model
gpt4_health = router.health_check(model="gpt-4")
print("GPT-4 Health:", gpt4_health)
# Print detailed metrics
router.print_deployment_metrics()
# Get cost information
costs = router.get_model_cost_map()
print("Cost Map:", costs)# Add new deployment at runtime
new_deployment = {
"model_name": "claude-3",
"litellm_params": {
"model": "claude-3-sonnet-20240229",
"api_key": "anthropic-key"
},
"model_info": {
"id": "claude-deployment"
}
}
router.add_deployment(new_deployment)
# Update existing deployment
router.update_deployment(
deployment_id="primary-deployment",
api_key="new-primary-key"
)
# Remove deployment
router.delete_deployment("backup-deployment")
# Get current deployments
deployments = router.get_deployments()
print(f"Active deployments: {len(deployments)}")router = Router(
model_list=model_list,
# Global fallbacks for any model
fallbacks=["gpt-3.5-turbo", "claude-3-haiku-20240307"],
# Context window fallbacks
context_window_fallbacks=[
{"gpt-4": ["claude-3-sonnet-20240229"]}, # If gpt-4 context exceeded
{"claude-3-opus-20240229": ["gpt-4"]} # If claude opus context exceeded
],
# Content policy fallbacks
content_policy_fallbacks=[
{"gpt-4": ["claude-3-sonnet-20240229"]} # If content policy violation
]
)
try:
response = router.completion(
model="gpt-4",
messages=[{"role": "user", "content": "Very long prompt..."}]
)
except Exception as e:
print(f"All fallbacks exhausted: {e}")import asyncio
async def concurrent_requests():
router = Router(model_list=model_list)
tasks = []
for i in range(10):
task = router.acompletion(
model="gpt-4",
messages=[{"role": "user", "content": f"Request {i}"}]
)
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
responses = asyncio.run(concurrent_requests())retry_policy = {
"max_retries": 5,
"base_delay": 1.0, # Base delay between retries
"max_delay": 60.0, # Maximum delay between retries
"backoff_factor": 2.0, # Exponential backoff multiplier
"jitter": True # Add random jitter to prevent thundering herd
}
router = Router(
model_list=model_list,
retry_policy=retry_policy,
allowed_fails=2, # Deployments marked unhealthy after 2 failures
cooldown_time=300 # 5 minute cooldown for unhealthy deployments
)Install with Tessl CLI
npx tessl i tessl/pypi-litellm