Rate limiting utilities for Python with multiple strategies and storage backends
—
Storage backends handle the persistence and distribution of rate limit data across different storage systems. The choice of storage backend determines whether rate limiting is local to a single process or distributed across multiple instances.
Factory function for creating storage instances from URI strings, supporting both synchronous and asynchronous storage backends.
def storage_from_string(storage_string: str, **options: float | str | bool) -> Storage:
"""
Create storage instance from URI string.
Supports various storage schemes including memory, Redis, Memcached,
MongoDB, and etcd. Can create both sync and async storage instances
based on URI scheme.
Args:
storage_string: URI like "redis://localhost:6379" or "memory://"
options: Additional options passed to storage constructor
Returns:
Storage instance matching the URI scheme
Raises:
ConfigurationError: If storage scheme is unknown or unsupported
Examples:
memory = storage_from_string("memory://")
redis = storage_from_string("redis://localhost:6379")
async_redis = storage_from_string("async+redis://localhost:6379")
"""Abstract base classes defining the storage interface and common functionality.
from abc import ABC, abstractmethod
from limits.util import LazyDependency
class Storage(LazyDependency, ABC):
"""
Base class for all storage backends.
Provides common interface for storing and retrieving rate limit data.
Extends LazyDependency to handle optional dependencies for specific backends.
"""
STORAGE_SCHEME: list[str] | None # Supported URI schemes
def __init__(self, uri: str | None = None, wrap_exceptions: bool = False, **options):
"""
Initialize storage backend.
Args:
uri: Connection URI for the storage backend
wrap_exceptions: Whether to wrap storage exceptions in StorageError
options: Additional backend-specific options
"""
@property
@abstractmethod
def base_exceptions(self) -> type[Exception] | tuple[type[Exception], ...]:
"""Base exception types that this storage backend can raise"""
@abstractmethod
def incr(self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1) -> int:
"""
Increment counter for key.
Args:
key: Storage key for the rate limit
expiry: Expiration time in seconds
elastic_expiry: Whether to use elastic expiry behavior
amount: Amount to increment by
Returns:
New counter value after increment
"""
@abstractmethod
def get(self, key: str) -> int:
"""
Get current counter value for key.
Args:
key: Storage key
Returns:
Current counter value, 0 if key doesn't exist
"""
@abstractmethod
def get_expiry(self, key: str) -> float:
"""
Get expiration time for key.
Args:
key: Storage key
Returns:
Expiration timestamp, or 0 if key doesn't exist
"""
@abstractmethod
def check(self) -> bool:
"""
Health check for storage backend.
Returns:
True if storage is accessible and working
"""
@abstractmethod
def reset(self) -> int | None:
"""Reset/clear all stored data"""
@abstractmethod
def clear(self, key: str) -> None:
"""
Clear data for specific key.
Args:
key: Storage key to clear
"""
class MovingWindowSupport(ABC):
"""Interface for storage backends supporting moving window strategy"""
@abstractmethod
def acquire_entry(self, key: str, limit: int, expiry: int, amount: int = 1) -> bool:
"""
Acquire entry in moving window.
Args:
key: Storage key
limit: Rate limit amount
expiry: Window duration in seconds
amount: Number of entries to acquire
Returns:
True if entries were acquired, False if limit exceeded
"""
@abstractmethod
def get_moving_window(self, key: str, limit: int, expiry: int) -> tuple[float, int]:
"""
Get current moving window state.
Args:
key: Storage key
limit: Rate limit amount
expiry: Window duration in seconds
Returns:
Tuple of (window_start_time, current_count)
"""
class SlidingWindowCounterSupport(ABC):
"""Interface for storage backends supporting sliding window counter strategy"""
@abstractmethod
def get_sliding_window(self, key: str, expiry: int) -> tuple[int, float, int, float]:
"""
Get sliding window counter state.
Args:
key: Storage key
expiry: Window duration in seconds
Returns:
Tuple of (previous_count, previous_expires_in, current_count, current_expires_in)
"""
@abstractmethod
def acquire_sliding_window_entry(self, key: str, limit: int, expiry: int, amount: int) -> bool:
"""
Acquire entry using sliding window counter.
Args:
key: Storage key
limit: Rate limit amount
expiry: Window duration in seconds
amount: Number of entries to acquire
Returns:
True if entries were acquired, False if limit exceeded
"""In-memory storage backend for single-process applications or testing.
class MemoryStorage(Storage):
"""
In-memory storage backend.
Stores rate limit data in process memory. Not suitable for distributed
applications but useful for single-process apps, testing, and development.
Supports all rate limiting strategies including moving window and
sliding window counter.
"""
STORAGE_SCHEME = ["memory"]
def __init__(self):
"""Initialize in-memory storage with empty state"""Redis-based storage backends supporting various Redis deployment patterns.
class RedisStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""
Redis storage backend.
Uses Redis for distributed rate limiting across multiple application
instances. Supports all rate limiting strategies with high performance
and reliability.
"""
STORAGE_SCHEME = ["redis", "rediss", "redis+unix"]
def __init__(self, uri: str, **options):
"""
Initialize Redis storage.
Args:
uri: Redis connection URI (redis://host:port/db)
options: Additional Redis client options
"""
class RedisClusterStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""
Redis Cluster storage backend.
Supports Redis Cluster deployments for horizontal scaling and
high availability scenarios.
"""
STORAGE_SCHEME = ["redis+cluster"]
def __init__(self, uri: str, **options):
"""
Initialize Redis Cluster storage.
Args:
uri: Redis cluster URI (redis+cluster://host:port)
options: Additional cluster client options
"""
class RedisSentinelStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
"""
Redis Sentinel storage backend.
Supports Redis Sentinel for automatic failover and high availability.
"""
STORAGE_SCHEME = ["redis+sentinel"]
def __init__(self, uri: str, **options):
"""
Initialize Redis Sentinel storage.
Args:
uri: Sentinel URI (redis+sentinel://host:port/service)
options: Additional sentinel options
"""Memcached storage backend for distributed caching scenarios.
class MemcachedStorage(Storage):
"""
Memcached storage backend.
Uses Memcached for distributed rate limiting. Supports basic rate limiting
strategies but not moving window or sliding window counter due to
Memcached's limited data structure support.
"""
STORAGE_SCHEME = ["memcached"]
def __init__(self, uri: str, **options):
"""
Initialize Memcached storage.
Args:
uri: Memcached URI (memcached://host:port)
options: Additional memcached client options
"""MongoDB storage backend for document-based persistence.
class MongoDBStorageBase(Storage):
"""Base class for MongoDB storage implementations"""
class MongoDBStorage(MongoDBStorageBase, MovingWindowSupport, SlidingWindowCounterSupport):
"""
MongoDB storage backend.
Uses MongoDB for persistent rate limiting data with support for all
rate limiting strategies. Suitable for applications already using
MongoDB or requiring persistent rate limit data.
"""
STORAGE_SCHEME = ["mongodb"]
def __init__(self, uri: str, **options):
"""
Initialize MongoDB storage.
Args:
uri: MongoDB connection URI (mongodb://host:port/database)
options: Additional MongoDB client options
"""Etcd storage backend for distributed key-value storage.
class EtcdStorage(Storage):
"""
Etcd storage backend.
Uses etcd for distributed rate limiting in Kubernetes and other
cloud-native environments. Provides consistency guarantees but
with higher latency than Redis.
"""
STORAGE_SCHEME = ["etcd"]
def __init__(self, uri: str, **options):
"""
Initialize etcd storage.
Args:
uri: Etcd connection URI (etcd://host:port)
options: Additional etcd client options
"""Helper classes providing common functionality for storage implementations.
class TimestampedSlidingWindow:
"""Helper class for storage that support sliding window counter with timestamp-based keys"""
@classmethod
def sliding_window_keys(cls, key: str, expiry: int, at: float) -> tuple[str, str]:
"""
Generate keys for previous and current sliding window buckets.
Args:
key: Base key for the rate limit
expiry: Window duration in seconds
at: Timestamp to generate keys for (usually current time)
Returns:
Tuple of (previous_window_key, current_window_key)
Example:
With key="mykey", expiry=60, at=1738576292.6631825
Returns ("mykey/28976271", "mykey/28976270")
"""from limits.storage import storage_from_string
# In-memory storage (single process)
memory_storage = storage_from_string("memory://")
# Redis storage (distributed)
redis_storage = storage_from_string("redis://localhost:6379")
redis_with_db = storage_from_string("redis://localhost:6379/1")
redis_with_auth = storage_from_string("redis://:password@localhost:6379")
# Valkey storage (Redis-compatible)
valkey_storage = storage_from_string("valkey://localhost:6379")
valkey_with_ssl = storage_from_string("valkeys://localhost:6380")
# Redis Cluster
cluster_storage = storage_from_string("redis+cluster://localhost:7000")
# Redis Sentinel
sentinel_storage = storage_from_string(
"redis+sentinel://localhost:26379/mymaster"
)
# Memcached
memcached_storage = storage_from_string("memcached://localhost:11211")
# MongoDB
mongodb_storage = storage_from_string("mongodb://localhost:27017/ratelimits")
# Etcd
etcd_storage = storage_from_string("etcd://localhost:2379")from limits.storage import storage_from_string
# Redis with connection options
redis_storage = storage_from_string(
"redis://localhost:6379",
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True,
health_check_interval=30
)
# MongoDB with collection name
mongodb_storage = storage_from_string(
"mongodb://localhost:27017/mydb",
collection_name="custom_rate_limits"
)
# Memcached with timeout
memcached_storage = storage_from_string(
"memcached://localhost:11211",
timeout=10
)from limits.storage import storage_from_string
from limits.errors import ConfigurationError, StorageError
try:
# Create storage instance
storage = storage_from_string("redis://localhost:6379")
# Test connectivity
if storage.check():
print("Storage is accessible")
else:
print("Storage connectivity issues")
except ConfigurationError as e:
print(f"Configuration error: {e}")
except StorageError as e:
print(f"Storage error: {e}")from limits.storage import MemoryStorage
# Create storage instance
storage = MemoryStorage()
# Basic operations
key = "user:12345:api_calls"
expiry = 3600 # 1 hour
# Increment counter
current_count = storage.incr(key, expiry, amount=1)
print(f"Current count: {current_count}")
# Get current value
count = storage.get(key)
print(f"Retrieved count: {count}")
# Get expiry time
expire_time = storage.get_expiry(key)
print(f"Expires at: {expire_time}")
# Clear specific key
storage.clear(key)
# Reset all data
storage.reset()# Available storage schemes
SUPPORTED_SCHEMES = {
"memory": MemoryStorage,
"redis": RedisStorage,
"rediss": RedisStorage, # Redis with SSL
"redis+unix": RedisStorage, # Redis Unix socket
"valkey": RedisStorage, # Valkey backend
"valkeys": RedisStorage, # Valkey with SSL
"valkey+unix": RedisStorage, # Valkey Unix socket
"redis+cluster": RedisClusterStorage,
"valkey+cluster": RedisClusterStorage, # Valkey cluster
"redis+sentinel": RedisSentinelStorage,
"valkey+sentinel": RedisSentinelStorage, # Valkey sentinel
"memcached": MemcachedStorage,
"mongodb": MongoDBStorage,
"mongodb+srv": MongoDBStorage, # MongoDB with SRV records
"etcd": EtcdStorage,
}
# Async storage schemes (prefix with "async+")
ASYNC_SCHEMES = {
"async+memory": "limits.aio.storage.MemoryStorage",
"async+redis": "limits.aio.storage.RedisStorage",
"async+valkey": "limits.aio.storage.RedisStorage", # Valkey async support
"async+memcached": "limits.aio.storage.MemcachedStorage",
"async+mongodb": "limits.aio.storage.MongoDBStorage",
"async+etcd": "limits.aio.storage.EtcdStorage"
}Install with Tessl CLI
npx tessl i tessl/pypi-limits