Rate limiting utilities for Python with multiple strategies and storage backends
—
The limits library provides complete asynchronous support through the limits.aio module, offering the same functionality as the synchronous API but optimized for async/await applications and frameworks like FastAPI, aiohttp, and asyncio-based applications.
Asynchronous versions of all rate limiting strategies with identical interfaces but using async/await patterns.
from abc import ABC, abstractmethod
from limits.limits import RateLimitItem
from limits.util import WindowStats
class RateLimiter(ABC):
"""Abstract base class for async rate limiting strategies"""
def __init__(self, storage: "limits.aio.storage.Storage"):
"""
Initialize async rate limiter with async storage backend.
Args:
storage: Async storage backend instance
"""
@abstractmethod
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Asynchronously consume the rate limit.
Args:
item: Rate limit item defining limits
identifiers: Unique identifiers for this limit instance
cost: Cost of this request (default: 1)
Returns:
True if request is allowed, False if rate limit exceeded
"""
@abstractmethod
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""
Asynchronously check if rate limit allows request without consuming.
Args:
item: Rate limit item defining limits
identifiers: Unique identifiers for this limit instance
cost: Expected cost to consume (default: 1)
Returns:
True if request would be allowed, False otherwise
"""
@abstractmethod
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""
Asynchronously get current window statistics.
Args:
item: Rate limit item defining limits
identifiers: Unique identifiers for this limit instance
Returns:
WindowStats with reset_time and remaining quota
"""
async def clear(self, item: RateLimitItem, *identifiers: str) -> None:
"""
Asynchronously clear rate limit data.
Args:
item: Rate limit item defining limits
identifiers: Unique identifiers for this limit instance
"""
class FixedWindowRateLimiter(RateLimiter):
"""Async fixed window rate limiting strategy"""
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of fixed window hit"""
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of fixed window test"""
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""Async version of fixed window stats"""
class MovingWindowRateLimiter(RateLimiter):
"""Async moving window rate limiting strategy"""
def __init__(self, storage: "limits.aio.storage.Storage"):
"""
Initialize async moving window rate limiter.
Requires async storage backend with MovingWindowSupport.
Args:
storage: Async storage with moving window support
Raises:
NotImplementedError: If storage lacks moving window support
"""
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of moving window hit"""
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of moving window test"""
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""Async version of moving window stats"""
class SlidingWindowCounterRateLimiter(RateLimiter):
"""Async sliding window counter rate limiting strategy"""
def __init__(self, storage: "limits.aio.storage.Storage"):
"""
Initialize async sliding window counter rate limiter.
Requires async storage backend with SlidingWindowCounterSupport.
Args:
storage: Async storage with sliding window counter support
Raises:
NotImplementedError: If storage lacks sliding window counter support
"""
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of sliding window counter hit"""
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of sliding window counter test"""
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
"""Async version of sliding window counter stats"""
class FixedWindowElasticExpiryRateLimiter(FixedWindowRateLimiter):
"""Async fixed window with elastic expiry (deprecated in 4.1)"""
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
"""Async version of elastic expiry hit"""Asynchronous storage implementations providing the same interface as synchronous versions but with async/await support.
from abc import ABC, abstractmethod
class Storage(ABC):
"""Base class for async storage backends"""
@abstractmethod
async def incr(self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1) -> int:
"""Asynchronously increment counter for key"""
@abstractmethod
async def get(self, key: str) -> int:
"""Asynchronously get current counter value"""
@abstractmethod
async def get_expiry(self, key: str) -> float:
"""Asynchronously get expiration time for key"""
@abstractmethod
async def check(self) -> bool:
"""Asynchronously check storage health"""
@abstractmethod
async def reset(self) -> None:
"""Asynchronously reset all stored data"""
@abstractmethod
async def clear(self, key: str) -> None:
"""Asynchronously clear data for specific key"""
class MovingWindowSupport(ABC):
"""Interface for async storage supporting moving window"""
@abstractmethod
async def acquire_entry(self, key: str, limit: int, expiry: int, amount: int = 1) -> bool:
"""Asynchronously acquire entry in moving window"""
@abstractmethod
async def get_moving_window(self, key: str, limit: int, expiry: int) -> tuple[float, int]:
"""Asynchronously get moving window state"""
class SlidingWindowCounterSupport(ABC):
"""Interface for async storage supporting sliding window counter"""
@abstractmethod
async def get_sliding_window(self, key: str, expiry: int) -> tuple[int, float, int, float]:
"""Asynchronously get sliding window counter state"""
@abstractmethod
async def acquire_sliding_window_entry(self, key: str, limit: int, expiry: int, amount: int) -> bool:
"""Asynchronously acquire sliding window counter entry"""
class MemoryStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""Async in-memory storage backend"""
def __init__(self):
"""Initialize async memory storage"""
class RedisStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""Async Redis storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async Redis storage"""
class RedisClusterStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""Async Redis Cluster storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async Redis Cluster storage"""
class RedisSentinelStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""Async Redis Sentinel storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async Redis Sentinel storage"""
class MemcachedStorage(Storage):
"""Async Memcached storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async Memcached storage"""
class MongoDBStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""Async MongoDB storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async MongoDB storage"""
class EtcdStorage(Storage):
"""Async etcd storage backend"""
def __init__(self, uri: str, **options):
"""Initialize async etcd storage"""import asyncio
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter
async def rate_limit_example():
# Create rate limit and async storage
rate_limit = RateLimitItemPerMinute(60) # 60 requests per minute
storage = storage_from_string("async+redis://localhost:6379")
# Create async rate limiter
limiter = FixedWindowRateLimiter(storage)
user_id = "user123"
# Test rate limit asynchronously
if await limiter.test(rate_limit, user_id):
# Consume rate limit asynchronously
success = await limiter.hit(rate_limit, user_id)
if success:
print("Request allowed")
else:
print("Rate limit exceeded")
else:
print("Rate limit would be exceeded")
# Get window statistics asynchronously
stats = await limiter.get_window_stats(rate_limit, user_id)
print(f"Remaining: {stats.remaining}")
print(f"Reset time: {stats.reset_time}")
# Run async example
asyncio.run(rate_limit_example())from fastapi import FastAPI, HTTPException, Request
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter
app = FastAPI()
# Initialize async rate limiting components
rate_limit = RateLimitItemPerMinute(100) # 100 requests per minute
storage = None
limiter = None
@app.on_event("startup")
async def setup_rate_limiting():
global storage, limiter
storage = storage_from_string("async+redis://localhost:6379")
limiter = FixedWindowRateLimiter(storage)
@app.middleware("http")
async def rate_limiting_middleware(request: Request, call_next):
# Get client identifier (IP address or user ID)
client_id = request.client.host
# Check rate limit
if not await limiter.test(rate_limit, client_id):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={"Retry-After": "60"}
)
# Consume rate limit
await limiter.hit(rate_limit, client_id)
# Process request
response = await call_next(request)
# Add rate limiting headers
stats = await limiter.get_window_stats(rate_limit, client_id)
response.headers["X-RateLimit-Remaining"] = str(stats.remaining)
response.headers["X-RateLimit-Reset"] = str(int(stats.reset_time))
return response
@app.get("/api/data")
async def get_data():
return {"message": "Data retrieved successfully"}from aiohttp import web, ClientError
from limits import RateLimitItemPerSecond
from limits.aio.storage import storage_from_string
from limits.aio.strategies import SlidingWindowCounterRateLimiter
async def rate_limiting_middleware(request, handler):
# Get rate limiter from app context
limiter = request.app['rate_limiter']
rate_limit = request.app['rate_limit']
# Use IP address as identifier
client_id = request.remote
# Check and consume rate limit
if await limiter.test(rate_limit, client_id):
await limiter.hit(rate_limit, client_id)
# Add rate limit headers
stats = await limiter.get_window_stats(rate_limit, client_id)
response = await handler(request)
response.headers['X-RateLimit-Remaining'] = str(stats.remaining)
response.headers['X-RateLimit-Reset'] = str(int(stats.reset_time))
return response
else:
# Rate limit exceeded
raise web.HTTPTooManyRequests(
text="Rate limit exceeded",
headers={'Retry-After': '1'}
)
async def hello_handler(request):
return web.json_response({'message': 'Hello, World!'})
async def create_app():
# Setup rate limiting
rate_limit = RateLimitItemPerSecond(10) # 10 requests per second
storage = storage_from_string("async+memory://")
limiter = SlidingWindowCounterRateLimiter(storage)
# Create app
app = web.Application(middlewares=[rate_limiting_middleware])
app['rate_limit'] = rate_limit
app['rate_limiter'] = limiter
# Add routes
app.router.add_get('/', hello_handler)
return app
if __name__ == '__main__':
app = create_app()
web.run_app(app, host='localhost', port=8080)import asyncio
from contextlib import asynccontextmanager
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import MovingWindowRateLimiter
@asynccontextmanager
async def rate_limiter_context():
"""Context manager for async rate limiter lifecycle"""
storage = storage_from_string("async+redis://localhost:6379")
limiter = MovingWindowRateLimiter(storage)
try:
yield limiter
finally:
# Cleanup if needed
await storage.reset()
async def process_requests():
rate_limit = RateLimitItemPerMinute(50)
async with rate_limiter_context() as limiter:
# Process multiple requests
for i in range(100):
user_id = f"user_{i % 10}" # 10 different users
if await limiter.test(rate_limit, user_id):
success = await limiter.hit(rate_limit, user_id)
if success:
print(f"Processing request {i} for {user_id}")
# Simulate async work
await asyncio.sleep(0.1)
else:
print(f"Rate limit exceeded for {user_id}")
asyncio.run(process_requests())import asyncio
from limits import RateLimitItemPerSecond
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter
async def process_batch_requests():
# Setup
rate_limit = RateLimitItemPerSecond(100) # 100 requests per second
storage = storage_from_string("async+memory://")
limiter = FixedWindowRateLimiter(storage)
# Simulate batch of requests from different users
requests = [
("user1", 5), # 5 requests from user1
("user2", 10), # 10 requests from user2
("user3", 3), # 3 requests from user3
("user1", 2), # 2 more requests from user1
]
async def process_user_requests(user_id, count):
"""Process multiple requests for a single user"""
results = []
for i in range(count):
# Test with cost (each request has cost of 1)
if await limiter.test(rate_limit, user_id, cost=1):
success = await limiter.hit(rate_limit, user_id, cost=1)
results.append(f"{user_id} request {i+1}: {'success' if success else 'failed'}")
else:
results.append(f"{user_id} request {i+1}: rate limited")
return results
# Process all user requests concurrently
tasks = [process_user_requests(user_id, count) for user_id, count in requests]
results = await asyncio.gather(*tasks)
# Print results
for user_results in results:
for result in user_results:
print(result)
# Show final stats for each user
unique_users = set(user_id for user_id, _ in requests)
for user_id in unique_users:
stats = await limiter.get_window_stats(rate_limit, user_id)
print(f"{user_id} - Remaining: {stats.remaining}, Reset: {stats.reset_time}")
asyncio.run(process_batch_requests())# Async strategy registry
STRATEGIES: dict[str, type[RateLimiter]] = {
"fixed-window": FixedWindowRateLimiter,
"moving-window": MovingWindowRateLimiter,
"sliding-window-counter": SlidingWindowCounterRateLimiter,
"fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter
}Install with Tessl CLI
npx tessl i tessl/pypi-limits