CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-limits

Rate limiting utilities for Python with multiple strategies and storage backends

Pending
Overview
Eval results
Files

async.mddocs/

Asynchronous API

The limits library provides complete asynchronous support through the limits.aio module, offering the same functionality as the synchronous API but optimized for async/await applications and frameworks like FastAPI, aiohttp, and asyncio-based applications.

Capabilities

Async Rate Limiting Strategies

Asynchronous versions of all rate limiting strategies with identical interfaces but using async/await patterns.

from abc import ABC, abstractmethod
from limits.limits import RateLimitItem
from limits.util import WindowStats

class RateLimiter(ABC):
    """Abstract base class for async rate limiting strategies"""
    
    def __init__(self, storage: "limits.aio.storage.Storage"):
        """
        Initialize async rate limiter with async storage backend.
        
        Args:
            storage: Async storage backend instance
        """
    
    @abstractmethod
    async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """
        Asynchronously consume the rate limit.
        
        Args:
            item: Rate limit item defining limits
            identifiers: Unique identifiers for this limit instance
            cost: Cost of this request (default: 1)
            
        Returns:
            True if request is allowed, False if rate limit exceeded
        """
    
    @abstractmethod
    async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """
        Asynchronously check if rate limit allows request without consuming.
        
        Args:
            item: Rate limit item defining limits  
            identifiers: Unique identifiers for this limit instance
            cost: Expected cost to consume (default: 1)
            
        Returns:
            True if request would be allowed, False otherwise
        """
    
    @abstractmethod
    async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
        """
        Asynchronously get current window statistics.
        
        Args:
            item: Rate limit item defining limits
            identifiers: Unique identifiers for this limit instance
            
        Returns:
            WindowStats with reset_time and remaining quota
        """
    
    async def clear(self, item: RateLimitItem, *identifiers: str) -> None:
        """
        Asynchronously clear rate limit data.
        
        Args:
            item: Rate limit item defining limits
            identifiers: Unique identifiers for this limit instance
        """

class FixedWindowRateLimiter(RateLimiter):
    """Async fixed window rate limiting strategy"""
    
    async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of fixed window hit"""
    
    async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of fixed window test"""
    
    async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
        """Async version of fixed window stats"""

class MovingWindowRateLimiter(RateLimiter):
    """Async moving window rate limiting strategy"""
    
    def __init__(self, storage: "limits.aio.storage.Storage"):
        """
        Initialize async moving window rate limiter.
        
        Requires async storage backend with MovingWindowSupport.
        
        Args:
            storage: Async storage with moving window support
            
        Raises:
            NotImplementedError: If storage lacks moving window support
        """
    
    async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of moving window hit"""
    
    async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of moving window test"""
    
    async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
        """Async version of moving window stats"""

class SlidingWindowCounterRateLimiter(RateLimiter):
    """Async sliding window counter rate limiting strategy"""
    
    def __init__(self, storage: "limits.aio.storage.Storage"):
        """
        Initialize async sliding window counter rate limiter.
        
        Requires async storage backend with SlidingWindowCounterSupport.
        
        Args:
            storage: Async storage with sliding window counter support
            
        Raises:
            NotImplementedError: If storage lacks sliding window counter support
        """
    
    async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of sliding window counter hit"""
    
    async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of sliding window counter test"""
    
    async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
        """Async version of sliding window counter stats"""

class FixedWindowElasticExpiryRateLimiter(FixedWindowRateLimiter):
    """Async fixed window with elastic expiry (deprecated in 4.1)"""
    
    async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
        """Async version of elastic expiry hit"""

Async Storage Backends

Asynchronous storage implementations providing the same interface as synchronous versions but with async/await support.

from abc import ABC, abstractmethod

class Storage(ABC):
    """Base class for async storage backends"""
    
    @abstractmethod
    async def incr(self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1) -> int:
        """Asynchronously increment counter for key"""
    
    @abstractmethod
    async def get(self, key: str) -> int:
        """Asynchronously get current counter value"""
    
    @abstractmethod
    async def get_expiry(self, key: str) -> float:
        """Asynchronously get expiration time for key"""
    
    @abstractmethod
    async def check(self) -> bool:
        """Asynchronously check storage health"""
    
    @abstractmethod
    async def reset(self) -> None:
        """Asynchronously reset all stored data"""
    
    @abstractmethod
    async def clear(self, key: str) -> None:
        """Asynchronously clear data for specific key"""

class MovingWindowSupport(ABC):
    """Interface for async storage supporting moving window"""
    
    @abstractmethod
    async def acquire_entry(self, key: str, limit: int, expiry: int, amount: int = 1) -> bool:
        """Asynchronously acquire entry in moving window"""
    
    @abstractmethod
    async def get_moving_window(self, key: str, limit: int, expiry: int) -> tuple[float, int]:
        """Asynchronously get moving window state"""

class SlidingWindowCounterSupport(ABC):
    """Interface for async storage supporting sliding window counter"""
    
    @abstractmethod
    async def get_sliding_window(self, key: str, expiry: int) -> tuple[int, float, int, float]:
        """Asynchronously get sliding window counter state"""
    
    @abstractmethod
    async def acquire_sliding_window_entry(self, key: str, limit: int, expiry: int, amount: int) -> bool:
        """Asynchronously acquire sliding window counter entry"""

class MemoryStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
    """Async in-memory storage backend"""
    
    def __init__(self):
        """Initialize async memory storage"""

class RedisStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
    """Async Redis storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async Redis storage"""

class RedisClusterStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
    """Async Redis Cluster storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async Redis Cluster storage"""

class RedisSentinelStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
    """Async Redis Sentinel storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async Redis Sentinel storage"""

class MemcachedStorage(Storage):
    """Async Memcached storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async Memcached storage"""

class MongoDBStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
    """Async MongoDB storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async MongoDB storage"""

class EtcdStorage(Storage):
    """Async etcd storage backend"""
    
    def __init__(self, uri: str, **options):
        """Initialize async etcd storage"""

Usage Examples

Basic Async Rate Limiting

import asyncio
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter

async def rate_limit_example():
    # Create rate limit and async storage
    rate_limit = RateLimitItemPerMinute(60)  # 60 requests per minute
    storage = storage_from_string("async+redis://localhost:6379")
    
    # Create async rate limiter
    limiter = FixedWindowRateLimiter(storage)
    
    user_id = "user123"
    
    # Test rate limit asynchronously
    if await limiter.test(rate_limit, user_id):
        # Consume rate limit asynchronously
        success = await limiter.hit(rate_limit, user_id)
        if success:
            print("Request allowed")
        else:
            print("Rate limit exceeded")
    else:
        print("Rate limit would be exceeded")
    
    # Get window statistics asynchronously
    stats = await limiter.get_window_stats(rate_limit, user_id)
    print(f"Remaining: {stats.remaining}")
    print(f"Reset time: {stats.reset_time}")

# Run async example
asyncio.run(rate_limit_example())

FastAPI Integration

from fastapi import FastAPI, HTTPException, Request
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter

app = FastAPI()

# Initialize async rate limiting components
rate_limit = RateLimitItemPerMinute(100)  # 100 requests per minute
storage = None
limiter = None

@app.on_event("startup")
async def setup_rate_limiting():
    global storage, limiter
    storage = storage_from_string("async+redis://localhost:6379")
    limiter = FixedWindowRateLimiter(storage)

@app.middleware("http")
async def rate_limiting_middleware(request: Request, call_next):
    # Get client identifier (IP address or user ID)
    client_id = request.client.host
    
    # Check rate limit
    if not await limiter.test(rate_limit, client_id):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded",
            headers={"Retry-After": "60"}
        )
    
    # Consume rate limit
    await limiter.hit(rate_limit, client_id)
    
    # Process request
    response = await call_next(request)
    
    # Add rate limiting headers
    stats = await limiter.get_window_stats(rate_limit, client_id)
    response.headers["X-RateLimit-Remaining"] = str(stats.remaining)
    response.headers["X-RateLimit-Reset"] = str(int(stats.reset_time))
    
    return response

@app.get("/api/data")
async def get_data():
    return {"message": "Data retrieved successfully"}

aiohttp Integration

from aiohttp import web, ClientError
from limits import RateLimitItemPerSecond
from limits.aio.storage import storage_from_string
from limits.aio.strategies import SlidingWindowCounterRateLimiter

async def rate_limiting_middleware(request, handler):
    # Get rate limiter from app context
    limiter = request.app['rate_limiter']
    rate_limit = request.app['rate_limit']
    
    # Use IP address as identifier
    client_id = request.remote
    
    # Check and consume rate limit
    if await limiter.test(rate_limit, client_id):
        await limiter.hit(rate_limit, client_id)
        
        # Add rate limit headers
        stats = await limiter.get_window_stats(rate_limit, client_id)
        response = await handler(request)
        response.headers['X-RateLimit-Remaining'] = str(stats.remaining)
        response.headers['X-RateLimit-Reset'] = str(int(stats.reset_time))
        
        return response
    else:
        # Rate limit exceeded
        raise web.HTTPTooManyRequests(
            text="Rate limit exceeded",
            headers={'Retry-After': '1'}
        )

async def hello_handler(request):
    return web.json_response({'message': 'Hello, World!'})

async def create_app():
    # Setup rate limiting
    rate_limit = RateLimitItemPerSecond(10)  # 10 requests per second
    storage = storage_from_string("async+memory://")
    limiter = SlidingWindowCounterRateLimiter(storage)
    
    # Create app
    app = web.Application(middlewares=[rate_limiting_middleware])
    app['rate_limit'] = rate_limit
    app['rate_limiter'] = limiter
    
    # Add routes
    app.router.add_get('/', hello_handler)
    
    return app

if __name__ == '__main__':
    app = create_app()
    web.run_app(app, host='localhost', port=8080)

Async Context Manager Usage

import asyncio
from contextlib import asynccontextmanager
from limits import RateLimitItemPerMinute
from limits.aio.storage import storage_from_string
from limits.aio.strategies import MovingWindowRateLimiter

@asynccontextmanager
async def rate_limiter_context():
    """Context manager for async rate limiter lifecycle"""
    storage = storage_from_string("async+redis://localhost:6379")
    limiter = MovingWindowRateLimiter(storage)
    
    try:
        yield limiter
    finally:
        # Cleanup if needed
        await storage.reset()

async def process_requests():
    rate_limit = RateLimitItemPerMinute(50)
    
    async with rate_limiter_context() as limiter:
        # Process multiple requests
        for i in range(100):
            user_id = f"user_{i % 10}"  # 10 different users
            
            if await limiter.test(rate_limit, user_id):
                success = await limiter.hit(rate_limit, user_id)
                if success:
                    print(f"Processing request {i} for {user_id}")
                    # Simulate async work
                    await asyncio.sleep(0.1)
            else:
                print(f"Rate limit exceeded for {user_id}")

asyncio.run(process_requests())

Batch Operations with Async

import asyncio
from limits import RateLimitItemPerSecond
from limits.aio.storage import storage_from_string
from limits.aio.strategies import FixedWindowRateLimiter

async def process_batch_requests():
    # Setup
    rate_limit = RateLimitItemPerSecond(100)  # 100 requests per second
    storage = storage_from_string("async+memory://")
    limiter = FixedWindowRateLimiter(storage)
    
    # Simulate batch of requests from different users
    requests = [
        ("user1", 5),    # 5 requests from user1
        ("user2", 10),   # 10 requests from user2  
        ("user3", 3),    # 3 requests from user3
        ("user1", 2),    # 2 more requests from user1
    ]
    
    async def process_user_requests(user_id, count):
        """Process multiple requests for a single user"""
        results = []
        
        for i in range(count):
            # Test with cost (each request has cost of 1)
            if await limiter.test(rate_limit, user_id, cost=1):
                success = await limiter.hit(rate_limit, user_id, cost=1)
                results.append(f"{user_id} request {i+1}: {'success' if success else 'failed'}")
            else:
                results.append(f"{user_id} request {i+1}: rate limited")
                
        return results
    
    # Process all user requests concurrently
    tasks = [process_user_requests(user_id, count) for user_id, count in requests]
    results = await asyncio.gather(*tasks)
    
    # Print results
    for user_results in results:
        for result in user_results:
            print(result)
    
    # Show final stats for each user
    unique_users = set(user_id for user_id, _ in requests)
    for user_id in unique_users:
        stats = await limiter.get_window_stats(rate_limit, user_id)
        print(f"{user_id} - Remaining: {stats.remaining}, Reset: {stats.reset_time}")

asyncio.run(process_batch_requests())

Strategy Registry

# Async strategy registry
STRATEGIES: dict[str, type[RateLimiter]] = {
    "fixed-window": FixedWindowRateLimiter,
    "moving-window": MovingWindowRateLimiter,
    "sliding-window-counter": SlidingWindowCounterRateLimiter, 
    "fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter
}

Install with Tessl CLI

npx tessl i tessl/pypi-limits

docs

async.md

index.md

rate-limit-items.md

storage.md

strategies.md

utilities.md

tile.json