Building applications with LLMs through composability
—
Rate limiters control the rate at which requests are made to external services, preventing API quota exhaustion, managing costs, and avoiding rate limit errors. LangChain provides rate limiting utilities through the langchain.rate_limiters module, re-exported from langchain-core.
Rate limiters are particularly useful when working with chat models and embeddings that have API rate limits. They can be attached to models to automatically throttle requests according to configured limits.
Abstract base class for all rate limiter implementations:
class BaseRateLimiter:
"""
Abstract base class for rate limiting implementations.
Rate limiters control the rate at which requests are made to prevent
exceeding API quotas or rate limits. Implementations must define how
requests are tracked and throttled.
"""
def acquire(self) -> None:
"""
Acquire permission to make a request (synchronous).
Blocks until the request is allowed according to the rate limit policy.
This method should be called before making an API request.
"""
...
async def aacquire(self) -> None:
"""
Acquire permission to make a request (asynchronous).
Awaits until the request is allowed according to the rate limit policy.
This method should be called before making an async API request.
"""
...Methods:
acquire() - Block until a request is allowed (synchronous). Called automatically by models before making API requests.aacquire() - Await until a request is allowed (asynchronous). Called automatically by models before making async API requests.Usage Pattern:
Rate limiters implement a token bucket or similar algorithm. When acquire() or aacquire() is called, the rate limiter checks if a request slot is available. If not, it blocks/awaits until one becomes available according to the configured rate limits.
In-memory implementation of rate limiting with configurable requests per second/minute/hour/day:
class InMemoryRateLimiter(BaseRateLimiter):
"""
In-memory rate limiter with configurable time-based limits.
Supports rate limiting by requests per second, minute, hour, and day.
Uses a token bucket algorithm to track and enforce limits.
"""
def __init__(
self,
*,
requests_per_second: float | None = None,
requests_per_minute: float | None = None,
requests_per_hour: float | None = None,
requests_per_day: float | None = None,
) -> None:
"""
Initialize an in-memory rate limiter.
At least one rate limit parameter must be specified. Multiple limits
can be combined - the most restrictive limit will apply.
Args:
requests_per_second: Maximum requests per second (optional)
requests_per_minute: Maximum requests per minute (optional)
requests_per_hour: Maximum requests per hour (optional)
requests_per_day: Maximum requests per day (optional)
"""
...Parameters:
requests_per_second (float | None): Maximum number of requests allowed per second. Optional.requests_per_minute (float | None): Maximum number of requests allowed per minute. Optional.requests_per_hour (float | None): Maximum number of requests allowed per hour. Optional.requests_per_day (float | None): Maximum number of requests allowed per day. Optional.Requirements:
requests_per_second=0.5 means one request every 2 seconds)Basic Usage:
from langchain.rate_limiters import InMemoryRateLimiter
# Limit to 10 requests per second
rate_limiter = InMemoryRateLimiter(requests_per_second=10)
# Limit to 100 requests per minute
rate_limiter = InMemoryRateLimiter(requests_per_minute=100)
# Combine multiple limits (most restrictive applies)
rate_limiter = InMemoryRateLimiter(
requests_per_second=5,
requests_per_minute=100,
requests_per_hour=1000
)How It Works:
The InMemoryRateLimiter uses a token bucket algorithm:
Create and use a simple rate limiter:
from langchain.rate_limiters import InMemoryRateLimiter
# Create rate limiter: 5 requests per second
rate_limiter = InMemoryRateLimiter(requests_per_second=5)
# Manual usage (typically not needed - models handle this automatically)
rate_limiter.acquire() # Blocks if rate limit would be exceeded
# ... make API request ...
# Async version
await rate_limiter.aacquire()
# ... make async API request ...Attach a rate limiter to a chat model to automatically throttle requests:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
from langchain.messages import HumanMessage
# Create rate limiter: 10 requests per second
rate_limiter = InMemoryRateLimiter(requests_per_second=10)
# Initialize model with rate limiter
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=rate_limiter
)
# Make requests - automatically rate limited
for i in range(50):
response = model.invoke([HumanMessage(content=f"Question {i}")])
print(f"Response {i}: {response.content}")
# Rate limiter ensures we don't exceed 10 requests/secondUse fractional limits for slow request rates:
from langchain.rate_limiters import InMemoryRateLimiter
# 0.5 requests per second = 1 request every 2 seconds
slow_limiter = InMemoryRateLimiter(requests_per_second=0.5)
# 30 requests per minute = 1 request every 2 seconds
same_limiter = InMemoryRateLimiter(requests_per_minute=30)
# Both configurations produce the same rate limitCombine different time windows for complex rate limiting:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# Enforce multiple limits simultaneously:
# - Max 10 requests per second (burst limit)
# - Max 500 requests per minute (sustained limit)
# - Max 10,000 requests per day (daily quota)
rate_limiter = InMemoryRateLimiter(
requests_per_second=10,
requests_per_minute=500,
requests_per_day=10000
)
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=rate_limiter
)
# The most restrictive limit applies at any given time
# Burst traffic is limited to 10/sec
# Sustained traffic is limited to 500/min
# Total daily traffic is limited to 10,000Match your rate limits to provider API quotas:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# OpenAI free tier: 3 RPM (requests per minute)
openai_free_limiter = InMemoryRateLimiter(requests_per_minute=3)
model_free = init_chat_model(
"openai:gpt-4o",
rate_limiter=openai_free_limiter
)
# OpenAI tier 1: 500 RPM, 10,000 RPD
openai_tier1_limiter = InMemoryRateLimiter(
requests_per_minute=500,
requests_per_day=10000
)
model_tier1 = init_chat_model(
"openai:gpt-4o",
rate_limiter=openai_tier1_limiter
)
# Anthropic tier 1: 5 RPS (requests per second)
anthropic_limiter = InMemoryRateLimiter(requests_per_second=5)
model_anthropic = init_chat_model(
"anthropic:claude-3-5-sonnet-20241022",
rate_limiter=anthropic_limiter
)Process large batches while respecting rate limits:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
from langchain.messages import HumanMessage
# Create rate limiter: 100 requests per minute
rate_limiter = InMemoryRateLimiter(requests_per_minute=100)
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=rate_limiter
)
# Process 500 questions
questions = [f"What is {i} + {i}?" for i in range(500)]
responses = []
for question in questions:
# Rate limiter automatically paces requests
# Takes ~5 minutes to complete 500 requests at 100 RPM
response = model.invoke([HumanMessage(content=question)])
responses.append(response.content)
print(f"Processed {len(responses)} questions with rate limiting")Rate limiting works seamlessly with async operations:
import asyncio
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
from langchain.messages import HumanMessage
# Create rate limiter
rate_limiter = InMemoryRateLimiter(requests_per_second=10)
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=rate_limiter
)
async def process_questions():
questions = [f"Question {i}" for i in range(50)]
# Process concurrently, but rate limited
tasks = [
model.ainvoke([HumanMessage(content=q)])
for q in questions
]
# Rate limiter ensures we don't exceed 10 requests/second
# even though we're processing concurrently
responses = await asyncio.gather(*tasks)
return responses
# Run async processing
responses = await process_questions()Share a single rate limiter across multiple models:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# Single rate limiter shared across models
shared_limiter = InMemoryRateLimiter(requests_per_second=10)
# Both models share the same rate limit pool
model_a = init_chat_model("openai:gpt-4o", rate_limiter=shared_limiter)
model_b = init_chat_model("openai:gpt-4o-mini", rate_limiter=shared_limiter)
# Total requests across both models won't exceed 10/second
response_a = model_a.invoke([HumanMessage(content="Hello from A")])
response_b = model_b.invoke([HumanMessage(content="Hello from B")])Use rate limiting to manage API costs:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# Limit daily usage to control costs
# Example: 1,000 requests/day at $0.01/request = $10/day max
cost_limiter = InMemoryRateLimiter(requests_per_day=1000)
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=cost_limiter
)
# Application won't exceed 1,000 requests per day
# Provides predictable daily cost ceilingSet limits slightly below API quotas for safety:
from langchain.rate_limiters import InMemoryRateLimiter
# If API allows 100 RPM, use 90 RPM to leave safety margin
conservative_limiter = InMemoryRateLimiter(requests_per_minute=90)
# Accounts for:
# - Clock skew between client and server
# - Slight timing variations
# - Other requests from same accountUse different limits for different environments:
import os
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# Development: Lower limits to avoid exhausting quota during testing
dev_limiter = InMemoryRateLimiter(
requests_per_second=1,
requests_per_minute=50
)
# Production: Higher limits for real traffic
prod_limiter = InMemoryRateLimiter(
requests_per_second=10,
requests_per_minute=500
)
# Select based on environment
rate_limiter = prod_limiter if os.getenv("ENV") == "production" else dev_limiter
model = init_chat_model("openai:gpt-4o", rate_limiter=rate_limiter)Start conservative and increase as needed:
from langchain.rate_limiters import InMemoryRateLimiter
# Phase 1: Initial testing (very conservative)
phase1_limiter = InMemoryRateLimiter(requests_per_minute=10)
# Phase 2: Limited rollout
phase2_limiter = InMemoryRateLimiter(requests_per_minute=100)
# Phase 3: Full production
phase3_limiter = InMemoryRateLimiter(
requests_per_second=10,
requests_per_minute=500,
requests_per_day=10000
)Create separate rate limiters for different users or tenants:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
# Dictionary of per-user rate limiters
user_limiters = {}
def get_model_for_user(user_id: str):
"""Get a chat model with per-user rate limiting."""
# Create rate limiter for new users
if user_id not in user_limiters:
# Each user gets 100 requests per hour
user_limiters[user_id] = InMemoryRateLimiter(
requests_per_hour=100
)
return init_chat_model(
"openai:gpt-4o",
rate_limiter=user_limiters[user_id]
)
# Each user has independent rate limits
model_user1 = get_model_for_user("user_1")
model_user2 = get_model_for_user("user_2")Rate limiters work with streaming responses:
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
from langchain.messages import HumanMessage
rate_limiter = InMemoryRateLimiter(requests_per_second=5)
model = init_chat_model(
"openai:gpt-4o",
rate_limiter=rate_limiter
)
# Rate limiter applies to stream initiation
# (not to individual chunks within a stream)
for chunk in model.stream([HumanMessage(content="Tell me a story")]):
print(chunk.content, end="", flush=True)Disable rate limiting by not providing a rate limiter:
from langchain.chat_models import init_chat_model
# No rate limiting - useful for:
# - Self-hosted models with no limits
# - Testing without artificial delays
# - When using provider's built-in rate limiting
model = init_chat_model("openai:gpt-4o") # No rate_limiter parameterfrom langchain_core.rate_limiters import BaseRateLimiter, InMemoryRateLimiter
# Base class for custom rate limiter implementations
class BaseRateLimiter:
def acquire(self) -> None: ...
async def aacquire(self) -> None: ...
# In-memory rate limiter with time-based limits
class InMemoryRateLimiter(BaseRateLimiter):
def __init__(
self,
*,
requests_per_second: float | None = None,
requests_per_minute: float | None = None,
requests_per_hour: float | None = None,
requests_per_day: float | None = None,
) -> None: ...Type Annotations:
from typing import Optional
from langchain.rate_limiters import BaseRateLimiter, InMemoryRateLimiter
# Type hint for rate limiter parameters
rate_limiter: Optional[BaseRateLimiter] = None
# Creating typed rate limiters
limiter: InMemoryRateLimiter = InMemoryRateLimiter(requests_per_second=10)InMemoryRateLimiter uses a token bucket algorithm:
Example:
With requests_per_second=10:
When multiple limits are specified, separate buckets track each:
rate_limiter = InMemoryRateLimiter(
requests_per_second=10, # Bucket 1: 10 tokens, refills at 10/sec
requests_per_minute=500, # Bucket 2: 500 tokens, refills at 500/min
requests_per_day=10000 # Bucket 3: 10,000 tokens, refills at 10000/day
)
# Each request must acquire tokens from ALL buckets
# Most restrictive bucket determines when request is allowedInMemoryRateLimiter is thread-safe and can be used across multiple threads:
from concurrent.futures import ThreadPoolExecutor
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(requests_per_second=10)
model = init_chat_model("openai:gpt-4o", rate_limiter=rate_limiter)
# Safe to use across multiple threads
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(model.invoke, [HumanMessage(content=f"Q{i}")])
for i in range(100)
]
results = [f.result() for f in futures]InMemoryRateLimiter has minimal memory overhead:
Current limitations of InMemoryRateLimiter:
For distributed systems, consider:
BaseRateLimiter implementationSet rate limits to match your API tier:
# Research your provider's rate limits first
# Then configure accordingly:
rate_limiter = InMemoryRateLimiter(
requests_per_minute=YOUR_API_TIER_RPM,
requests_per_day=YOUR_API_TIER_RPD
)Set limits 10-20% below actual quotas:
# API allows 100 RPM, use 80-90 RPM
rate_limiter = InMemoryRateLimiter(requests_per_minute=90)Use multiple time windows for comprehensive limiting:
# Prevent both burst traffic and quota exhaustion
rate_limiter = InMemoryRateLimiter(
requests_per_second=10, # Burst protection
requests_per_minute=500, # Sustained protection
requests_per_day=10000 # Quota protection
)Track how rate limiting affects your application:
import time
from langchain.chat_models import init_chat_model
from langchain.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter(requests_per_second=5)
model = init_chat_model("openai:gpt-4o", rate_limiter=rate_limiter)
# Measure rate limiting overhead
start = time.time()
for i in range(50):
model.invoke([HumanMessage(content=f"Question {i}")])
elapsed = time.time() - start
print(f"50 requests took {elapsed:.2f} seconds")
print(f"Average rate: {50/elapsed:.2f} requests/second")Choose the right time window for your use case:
requests_per_second: High-frequency APIs, burst protectionrequests_per_minute: Most common tier, general usagerequests_per_hour: Medium-term quotas, batch processingrequests_per_day: Daily quotas, cost managementInclude rate limit configuration in documentation:
# Example configuration for deployment
RATE_LIMITS = {
"development": {
"requests_per_second": 1,
"requests_per_minute": 50,
},
"staging": {
"requests_per_second": 5,
"requests_per_minute": 200,
},
"production": {
"requests_per_second": 10,
"requests_per_minute": 500,
"requests_per_day": 10000,
}
}Install with Tessl CLI
npx tessl i tessl/pypi-langchain