Python Rate-Limiter using Leaky-Bucket Algorithm for controlling request rates in applications with multiple backend storage options.
81
Different clock implementations for various deployment scenarios including local system time, monotonic time, and remote database-backed time sources. Clocks provide the time foundation for rate limiting calculations and can be chosen based on accuracy, consistency, and deployment requirements.
Standard clock using system time for basic rate limiting scenarios.
class TimeClock(AbstractClock):
def now(self) -> int:
"""
Get current system time in milliseconds.
Returns:
- int: Current time as milliseconds since epoch
Characteristics:
- Uses system time (time.time())
- Subject to system clock adjustments
- Fast and simple
- Default clock for most scenarios
"""Usage example:
from pyrate_limiter import TimeClock, Limiter, Rate, Duration
# Default clock (same as not specifying clock)
clock = TimeClock()
limiter = Limiter(Rate(10, Duration.SECOND), clock=clock)
# TimeClock is the default, so this is equivalent:
limiter = Limiter(Rate(10, Duration.SECOND))Clock using monotonic time that doesn't go backwards, ideal for measuring intervals.
class MonotonicClock(AbstractClock):
def __init__(self):
"""Initialize monotonic clock."""
def now(self) -> int:
"""
Get current monotonic time in milliseconds.
Returns:
- int: Monotonic time in milliseconds
Characteristics:
- Uses monotonic time (time.monotonic())
- Never goes backwards
- Not affected by system clock adjustments
- Ideal for measuring time intervals
- Cannot be compared across system restarts
"""Usage example:
from pyrate_limiter import MonotonicClock, Limiter, Rate, Duration
# Use monotonic clock for interval stability
clock = MonotonicClock()
limiter = Limiter(Rate(5, Duration.SECOND), clock=clock)
# Good for scenarios where system clock might be adjusted
# but rate limiting should continue smoothlyAsynchronous clock for testing and async environments.
class TimeAsyncClock(AbstractClock):
async def now(self) -> int:
"""
Get current system time asynchronously.
Returns:
- int: Current time as milliseconds since epoch
Characteristics:
- Async version of TimeClock
- Mainly for testing purposes
- Returns awaitable time value
"""Usage example:
from pyrate_limiter import TimeAsyncClock, BucketAsyncWrapper, InMemoryBucket
from pyrate_limiter import Rate, Duration
import asyncio
async def async_clock_example():
# Use async clock with async bucket
clock = TimeAsyncClock()
bucket = BucketAsyncWrapper(InMemoryBucket([Rate(10, Duration.SECOND)]))
# Clock will be used internally by the bucket/limiter
# This is primarily for testing scenariosRemote clock using SQLite database for distributed time consistency.
class SQLiteClock(AbstractClock):
def __init__(self, conn: Union[sqlite3.Connection, SQLiteBucket]):
"""
Initialize SQLite clock with database connection.
Parameters:
- conn: SQLite connection or SQLiteBucket instance
Note: In multiprocessing, use SQLiteBucket to share locks
"""
@classmethod
def default(cls) -> "SQLiteClock":
"""
Create default SQLite clock with temporary database.
Returns:
- SQLiteClock: Clock using temporary SQLite database
"""
def now(self) -> int:
"""
Get current time from SQLite database.
Returns:
- int: Database time in milliseconds since epoch
Characteristics:
- Uses SQLite's datetime functions
- Consistent across processes using same database
- Slightly slower than system clocks
- Good for distributed consistency
"""Usage example:
from pyrate_limiter import SQLiteClock, SQLiteBucket, Limiter
from pyrate_limiter import Rate, Duration
import sqlite3
# Method 1: Default SQLite clock
clock = SQLiteClock.default()
limiter = Limiter(Rate(10, Duration.SECOND), clock=clock)
# Method 2: SQLite clock with custom connection
conn = sqlite3.connect("timesync.db")
clock = SQLiteClock(conn)
# Method 3: SQLite clock sharing bucket's connection and lock
bucket = SQLiteBucket.init_from_file(
rates=[Rate(5, Duration.SECOND)],
db_path="shared.db"
)
clock = SQLiteClock(bucket) # Shares connection and lock
limiter = Limiter(bucket, clock=clock)Enterprise-grade clock using PostgreSQL for high-accuracy distributed time.
class PostgresClock(AbstractClock):
def __init__(self, pool: ConnectionPool):
"""
Initialize PostgreSQL clock with connection pool.
Parameters:
- pool: PostgreSQL connection pool
"""
def now(self) -> int:
"""
Get current time from PostgreSQL database.
Returns:
- int: Database time in milliseconds since epoch
Characteristics:
- Uses PostgreSQL's timestamp functions
- High precision and accuracy
- Consistent across distributed systems
- Requires connection pool management
- Best for enterprise applications
"""Usage example:
from pyrate_limiter import PostgresClock, PostgresBucket, Limiter
from pyrate_limiter import Rate, Duration
from psycopg_pool import ConnectionPool
# Create connection pool
pool = ConnectionPool(
"host=localhost dbname=mydb user=myuser password=mypass",
min_size=1,
max_size=10
)
# Use PostgreSQL clock with PostgreSQL bucket
clock = PostgresClock(pool)
bucket = PostgresBucket([Rate(100, Duration.MINUTE)], pool)
limiter = Limiter(bucket, clock=clock)
# Both bucket and clock use the same connection pool
# for consistency and efficiencyAll clocks implement the same interface for consistent behavior.
class AbstractClock(ABC):
@abstractmethod
def now(self) -> Union[int, Awaitable[int]]:
"""
Get current time in milliseconds.
Returns:
- int: Time in milliseconds since epoch
- Awaitable[int]: For async clocks
"""Abstract factory for creating and managing buckets with custom routing logic.
class BucketFactory(ABC):
leak_interval: int
@abstractmethod
def wrap_item(self, name: str, weight: int = 1) -> Union[RateItem, Awaitable[RateItem]]:
"""
Add timestamp to item using clock backend and create RateItem.
Parameters:
- name: Item identifier
- weight: Item weight (default: 1)
Returns:
- RateItem: Timestamped rate item
- Awaitable[RateItem]: For async factories
"""
@abstractmethod
def get(self, item: RateItem) -> Union[AbstractBucket, Awaitable[AbstractBucket]]:
"""
Get the corresponding bucket for this item.
Parameters:
- item: Rate item to get bucket for
Returns:
- AbstractBucket: Bucket for the item
- Awaitable[AbstractBucket]: For async factories
"""
def create(
self,
clock: AbstractClock,
bucket_class: Type[AbstractBucket],
*args,
**kwargs
) -> AbstractBucket:
"""Create bucket dynamically and schedule leak operations."""
def schedule_leak(self, bucket: AbstractBucket, clock: AbstractClock) -> None:
"""Schedule leak operations for bucket with associated clock."""
def get_buckets(self) -> List[AbstractBucket]:
"""Get list of all buckets in the factory."""
def dispose(self, bucket: Union[int, AbstractBucket]) -> bool:
"""Remove bucket from factory."""
def close(self) -> None:
"""Close factory and cleanup resources."""Using database time ensures consistency across distributed components.
from pyrate_limiter import SQLiteClock, SQLiteBucket, Limiter, Rate, Duration
# All processes use the same database for time and state
bucket = SQLiteBucket.init_from_file(
rates=[Rate(10, Duration.SECOND)],
db_path="/shared/rate_limits.db"
)
clock = SQLiteClock(bucket)
limiter = Limiter(bucket, clock=clock)
# Time and rate limiting state are both synchronizedEnsuring bucket and clock use compatible time sources.
from pyrate_limiter import PostgresClock, PostgresBucket, Limiter
from pyrate_limiter import Rate, Duration
from psycopg_pool import ConnectionPool
pool = ConnectionPool("postgresql://...")
# Use same connection pool for both time and storage
bucket = PostgresBucket([Rate(50, Duration.SECOND)], pool)
clock = PostgresClock(pool)
limiter = Limiter(bucket, clock=clock)
# Time calculations and storage operations use same database timeDifferent clocks for different purposes.
from pyrate_limiter import TimeClock, MonotonicClock, InMemoryBucket
from pyrate_limiter import Rate, Duration, Limiter
# Use monotonic clock for stable intervals
# but system time for logging/debugging
monotonic_clock = MonotonicClock()
limiter = Limiter(Rate(10, Duration.SECOND), clock=monotonic_clock)
# System time for external coordination
import time
system_time = int(time.time() * 1000)
print(f"Rate limited at system time: {system_time}")Install with Tessl CLI
npx tessl i tessl/pypi-pyrate-limiterdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10