Python Rate-Limiter using Leaky-Bucket Algorithm for controlling request rates in applications with multiple backend storage options.
81
The main Limiter class provides the primary rate limiting functionality with support for both synchronous and asynchronous operations, configurable delays, and exception handling.
The central class for rate limiting operations, orchestrating rate limit checking, delay handling, and exception management.
class Limiter:
def __init__(
self,
argument: Union[BucketFactory, AbstractBucket, Rate, List[Rate]],
clock: Optional[AbstractClock] = None,
raise_when_fail: bool = True,
max_delay: Optional[Union[int, Duration]] = None,
retry_until_max_delay: bool = False,
buffer_ms: int = 50
):
"""
Initialize Limiter with rate limiting configuration.
Parameters:
- argument: Rate configuration, bucket, or bucket factory
- clock: Time source for rate calculations (default: TimeClock)
- raise_when_fail: Raise exception when rate limit exceeded (default: True)
- max_delay: Maximum delay allowed for rate limiting operations
- retry_until_max_delay: Retry operations until max delay reached
- buffer_ms: Buffer time in milliseconds for delay calculations
"""Basic rate limiting operations for synchronous code.
def try_acquire(self, name: str, weight: int = 1) -> Union[bool, Awaitable[bool]]:
"""
Try to acquire permission for a rate-limited operation.
Parameters:
- name: Identifier for the rate-limited resource
- weight: Weight/cost of the operation (default: 1)
Returns:
- bool: True if operation is allowed, False if rate limited
- Awaitable[bool]: For async buckets, returns awaitable boolean
"""Usage example:
from pyrate_limiter import Limiter, Rate, Duration
# Create limiter with 10 requests per minute
limiter = Limiter(Rate(10, Duration.MINUTE))
# Check if request is allowed
if limiter.try_acquire("user123"):
print("Request allowed")
# Proceed with operation
else:
print("Rate limit exceeded")
# Handle rate limiting
# Weighted requests (e.g., expensive operations)
if limiter.try_acquire("bulk_operation", weight=5):
print("Bulk operation allowed")Rate limiting for asynchronous code with proper async/await handling.
async def try_acquire_async(self, name: str, weight: int = 1) -> bool:
"""
Async version of try_acquire with thread-local async locking.
Parameters:
- name: Identifier for the rate-limited resource
- weight: Weight/cost of the operation (default: 1)
Returns:
- bool: True if operation is allowed, False if rate limited
"""Usage example:
import asyncio
from pyrate_limiter import Limiter, Rate, Duration
async def async_example():
limiter = Limiter(Rate(5, Duration.SECOND))
# Async rate limiting
if await limiter.try_acquire_async("async_user"):
print("Async request allowed")
# Proceed with async operation
await some_async_operation()
else:
print("Async rate limit exceeded")Function decoration for automatic rate limiting.
def as_decorator(self) -> Callable[[ItemMapping], DecoratorWrapper]:
"""
Create decorator for automatic rate limiting of functions.
Returns:
- Callable: Decorator factory function
"""Usage example:
from pyrate_limiter import Limiter, Rate, Duration
limiter = Limiter(Rate(10, Duration.MINUTE))
# Decorator with mapping function
@limiter.as_decorator()
def extract_user_info(*args, **kwargs):
# Extract name and weight from function arguments
user_id = args[0] if args else kwargs.get('user_id', 'default')
weight = kwargs.get('weight', 1)
return (user_id, weight)
@extract_user_info
def api_call(user_id, data=None, weight=1):
return f"Processing API call for {user_id}"
# Async decorator support
@limiter.as_decorator()
def async_extract_info(*args, **kwargs):
return (args[0], 1)
@async_extract_info
async def async_api_call(user_id):
await asyncio.sleep(0.1)
return f"Async processing for {user_id}"Managing and inspecting rate limiter state.
def buckets(self) -> List[AbstractBucket]:
"""Get list of active buckets."""
def dispose(self, bucket: Union[int, AbstractBucket]) -> bool:
"""
Dispose/remove a specific bucket.
Parameters:
- bucket: Bucket object or bucket ID to remove
Returns:
- bool: True if bucket was removed successfully
"""
def close(self) -> None:
"""Release resources held by the limiter."""Using Limiter as a context manager for automatic resource cleanup.
def __enter__(self):
"""Enter context manager."""
def __exit__(self, exc_type, exc, tb) -> None:
"""Exit context manager and cleanup resources."""Usage example:
from pyrate_limiter import Limiter, Rate, Duration
# Automatic cleanup with context manager
with Limiter(Rate(5, Duration.SECOND)) as limiter:
success = limiter.try_acquire("user123")
if success:
print("Request processed")
# Resources automatically cleaned up hereConfiguring multiple rate limits for the same resource.
from pyrate_limiter import Limiter, Rate, Duration
# Multiple rate limits: 10/second AND 100/minute
rates = [
Rate(10, Duration.SECOND),
Rate(100, Duration.MINUTE),
Rate(1000, Duration.HOUR)
]
limiter = Limiter(rates)
# All rate limits must be satisfied
if limiter.try_acquire("user123"):
print("All rate limits satisfied")When max_delay is configured, the limiter can wait for rate limits to reset instead of immediately failing.
from pyrate_limiter import Limiter, Rate, Duration
# Configure with delay handling
limiter = Limiter(
Rate(5, Duration.SECOND),
max_delay=Duration.SECOND * 2, # Wait up to 2 seconds
retry_until_max_delay=True # Keep retrying within delay window
)
# This may block/delay if rate limited
success = limiter.try_acquire("user123")Install with Tessl CLI
npx tessl i tessl/pypi-pyrate-limiterdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10