Python Rate-Limiter using Leaky-Bucket Algorithm for controlling request rates in applications with multiple backend storage options.
81
Pre-configured factory functions and patterns for common use cases including multiprocessing support and simplified bucket creation. These functions provide convenient ways to create limiters without manually configuring buckets and clocks.
Create persistent limiters using SQLite storage with common configurations.
def create_sqlite_limiter(
rate_per_duration: int = 3,
duration: Union[int, Duration] = Duration.SECOND,
db_path: Optional[str] = None,
table_name: str = "rate_bucket",
max_delay: Union[int, Duration] = Duration.DAY,
buffer_ms: int = 50,
use_file_lock: bool = False,
async_wrapper: bool = False
) -> Limiter:
"""
Create a SQLite-backed rate limiter with configurable rate, persistence, and optional async support.
Parameters:
- rate_per_duration: Number of allowed requests per duration (default: 3)
- duration: Time window for the rate limit (default: Duration.SECOND)
- db_path: Path to the SQLite database file (None for temporary file)
- table_name: Name of the table used for rate buckets (default: "rate_bucket")
- max_delay: Maximum delay before failing requests (default: Duration.DAY)
- buffer_ms: Extra wait time in milliseconds to account for clock drift (default: 50)
- use_file_lock: Enable file locking for multi-process synchronization (default: False)
- async_wrapper: Whether to wrap the bucket for async usage (default: False)
Returns:
- Limiter: Configured SQLite-backed limiter instance
"""Usage example:
from pyrate_limiter import create_sqlite_limiter, Duration
# Basic SQLite limiter
limiter1 = create_sqlite_limiter(5, Duration.SECOND)
# Persistent SQLite limiter with custom database
limiter2 = create_sqlite_limiter(
rate_per_duration=100,
duration=Duration.MINUTE,
db_path="/var/lib/myapp/rate_limits.db",
table_name="api_limits",
use_file_lock=True # For multiprocessing
)
# Async SQLite limiter
limiter3 = create_sqlite_limiter(
rate_per_duration=10,
duration=Duration.SECOND,
db_path="rate_limits.db",
max_delay=Duration.SECOND * 5,
async_wrapper=True
)Create fast in-memory limiters for simple applications.
def create_inmemory_limiter(
rate_per_duration: int = 3,
duration: Union[int, Duration] = Duration.SECOND,
max_delay: Union[int, Duration] = Duration.DAY,
buffer_ms: int = 50,
async_wrapper: bool = False
) -> Limiter:
"""
Create an in-memory rate limiter with configurable rate, duration, delay, and optional async support.
Parameters:
- rate_per_duration: Number of allowed requests per duration (default: 3)
- duration: Time window for the rate limit (default: Duration.SECOND)
- max_delay: Maximum delay before failing requests (default: Duration.DAY)
- buffer_ms: Extra wait time in milliseconds to account for clock drift (default: 50)
- async_wrapper: Whether to wrap the bucket for async usage (default: False)
Returns:
- Limiter: Configured in-memory limiter instance
"""Usage example:
from pyrate_limiter import create_inmemory_limiter, Duration
# Basic in-memory limiter
limiter1 = create_inmemory_limiter() # 3 requests per second
# Custom rate limiter
limiter2 = create_inmemory_limiter(
rate_per_duration=20,
duration=Duration.MINUTE,
max_delay=Duration.SECOND * 10
)
# Async in-memory limiter
limiter3 = create_inmemory_limiter(
rate_per_duration=50,
duration=Duration.SECOND,
async_wrapper=True
)Create SQLite buckets directly for custom limiter configurations.
def create_sqlite_bucket(
rates: List[Rate],
db_path: Optional[str],
table_name: str = "pyrate_limiter",
use_file_lock: bool = False
):
"""
Create and initialize a SQLite bucket for rate limiting.
Parameters:
- rates: List of rate limit configurations
- db_path: Path to the SQLite database file (or in-memory if None)
- table_name: Name of the table to store rate bucket data (default: "pyrate_limiter")
- use_file_lock: Enable file locking for multi-process synchronization (default: False)
Returns:
- SQLiteBucket: Initialized SQLite-backed bucket
"""Usage example:
from pyrate_limiter import create_sqlite_bucket, Rate, Duration, Limiter
# Create bucket with multiple rates
rates = [
Rate(10, Duration.SECOND),
Rate(100, Duration.MINUTE),
Rate(1000, Duration.HOUR)
]
bucket = create_sqlite_bucket(
rates=rates,
db_path="complex_limits.db",
table_name="user_limits",
use_file_lock=True
)
# Use with custom limiter configuration
limiter = Limiter(
bucket,
max_delay=Duration.SECOND * 10,
buffer_ms=100
)Initialize a global limiter for multiprocessing scenarios.
LIMITER: Optional[Limiter] = None
def init_global_limiter(
bucket: AbstractBucket,
max_delay: Union[int, Duration] = Duration.HOUR,
raise_when_fail: bool = False,
retry_until_max_delay: bool = True,
buffer_ms: int = 50
) -> None:
"""
Initialize a global Limiter instance using the provided bucket.
Intended for use as an initializer for ProcessPoolExecutor.
Parameters:
- bucket: The rate-limiting bucket to be used
- max_delay: Maximum delay before failing requests (default: Duration.HOUR)
- raise_when_fail: Whether to raise an exception when a request fails (default: False)
- retry_until_max_delay: Retry until the maximum delay is reached (default: True)
- buffer_ms: Additional buffer time in milliseconds for retries (default: 50)
"""Usage example:
from pyrate_limiter import init_global_limiter, create_sqlite_bucket, LIMITER
from pyrate_limiter import Rate, Duration
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
def worker_init():
"""Initialize worker process with shared rate limiter."""
bucket = create_sqlite_bucket(
rates=[Rate(10, Duration.SECOND)],
db_path="/tmp/mp_rate_limits.db",
use_file_lock=True
)
init_global_limiter(bucket)
def worker_task(user_id):
"""Worker function using global rate limiter."""
if LIMITER and LIMITER.try_acquire(f"user_{user_id}"):
return f"Processed task for user {user_id}"
else:
return f"Rate limited for user {user_id}"
# Use with ProcessPoolExecutor
if __name__ == "__main__":
with ProcessPoolExecutor(
max_workers=4,
initializer=worker_init
) as executor:
futures = [executor.submit(worker_task, i) for i in range(20)]
results = [f.result() for f in futures]
print(results)Using MultiprocessBucket for cross-process rate limiting.
from pyrate_limiter import MultiprocessBucket, Limiter, Rate, Duration
from concurrent.futures import ProcessPoolExecutor
def create_mp_limiter():
"""Create multiprocess-safe limiter."""
bucket = MultiprocessBucket.init(
rates=[Rate(5, Duration.SECOND)]
)
return Limiter(bucket)
def worker_function(data):
"""Worker function with individual limiter."""
limiter = create_mp_limiter()
if limiter.try_acquire(f"worker_{data}"):
# Process data
return f"Processed {data}"
else:
return f"Rate limited {data}"
# Each process creates its own limiter with shared storage
with ProcessPoolExecutor(max_workers=3) as executor:
results = list(executor.map(worker_function, range(10)))
print(results)| Function | Storage | Async Support | Multiprocessing | Use Case |
|---|---|---|---|---|
create_inmemory_limiter | In-memory | Optional wrapper | No | Single process, fast access |
create_sqlite_limiter | SQLite file | Optional wrapper | With file lock | Persistent, cross-process |
create_sqlite_bucket | SQLite file | No | With file lock | Custom limiter configs |
init_global_limiter | Any bucket | Depends on bucket | Yes | Process pool patterns |
Factory functions provide several advantages:
Install with Tessl CLI
npx tessl i tessl/pypi-pyrate-limiterdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10