Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware
—
Rate limiting in Dramatiq provides mechanisms to control task execution rates and implement synchronization patterns. The system supports multiple rate limiting strategies and backends for different use cases, from simple concurrent execution limits to sophisticated token bucket algorithms.
Base class for all rate limiting implementations.
class RateLimiter:
def __init__(self, backend: RateLimiterBackend, key: str):
"""
Initialize rate limiter.
Parameters:
- backend: Backend for storing rate limit state
- key: Unique key for this rate limiter instance
"""
def acquire(self, *, raise_on_failure: bool = True) -> bool:
"""
Context manager for acquiring rate limit permission.
Parameters:
- raise_on_failure: Whether to raise exception when limit exceeded
Returns:
True if acquired, False if limit exceeded (when raise_on_failure=False)
Raises:
RateLimitExceeded: When limit is exceeded and raise_on_failure=True
Usage:
with rate_limiter.acquire():
# Protected code here
pass
"""Abstract backend for storing rate limiter state.
class RateLimiterBackend:
"""
Abstract base class for rate limiter backends.
Backends provide persistent storage for rate limiter state
across multiple processes and workers.
"""
def add(self, key: str, value: int, ttl: int) -> bool:
"""Add value if key doesn't exist."""
def incr(self, key: str, amount: int, ttl: int) -> int:
"""Increment key by amount."""
def decr(self, key: str, amount: int, ttl: int) -> int:
"""Decrement key by amount."""
def incr_and_sum(self, keys: List[str], amount: int, ttl: int) -> int:
"""Increment multiple keys and return sum."""Limits the number of concurrent executions.
class ConcurrentRateLimiter(RateLimiter):
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, ttl: int = 900000):
"""
Create concurrent execution rate limiter.
Parameters:
- backend: Backend for storing state
- key: Unique key for this limiter
- limit: Maximum number of concurrent executions
- ttl: TTL for state in milliseconds (default: 15 minutes)
"""Usage:
from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend
# Set up backend
backend = RedisBackend()
# Create concurrent rate limiter
concurrent_limiter = ConcurrentRateLimiter(
backend,
"api_calls",
limit=5, # Max 5 concurrent executions
ttl=60000 # 1 minute TTL
)
@dramatiq.actor
def api_call_task(url):
with concurrent_limiter.acquire():
# Only 5 of these can run concurrently
response = requests.get(url)
return response.json()
# Usage
for i in range(20):
api_call_task.send(f"https://api.example.com/data/{i}")Token bucket algorithm for controlling request rates.
class BucketRateLimiter(RateLimiter):
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, bucket: int):
"""
Create token bucket rate limiter.
Parameters:
- backend: Backend for storing state
- key: Unique key for this limiter
- limit: Number of tokens to add per time window
- bucket: Maximum bucket capacity (burst allowance)
"""Usage:
from dramatiq.rate_limits import BucketRateLimiter
# Token bucket: 10 requests per minute, burst up to 20
bucket_limiter = BucketRateLimiter(
backend,
"email_sending",
limit=10, # 10 tokens per time window
bucket=20 # Burst capacity of 20
)
@dramatiq.actor
def send_email_task(to, subject, body):
with bucket_limiter.acquire():
# Rate limited email sending
send_email(to, subject, body)
return f"Email sent to {to}"
# Can send 20 emails quickly, then limited to 10 per time window
for user in users:
send_email_task.send(user.email, "Newsletter", "Content...")Sliding window rate limiting.
class WindowRateLimiter(RateLimiter):
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, window: int):
"""
Create sliding window rate limiter.
Parameters:
- backend: Backend for storing state
- key: Unique key for this limiter
- limit: Maximum operations per window
- window: Time window in milliseconds
"""Usage:
from dramatiq.rate_limits import WindowRateLimiter
# Sliding window: max 100 requests per hour
window_limiter = WindowRateLimiter(
backend,
"api_requests",
limit=100, # 100 requests
window=3600000 # per hour (3600 seconds)
)
@dramatiq.actor
def external_api_task(endpoint, data):
with window_limiter.acquire():
# Limited to 100 calls per hour
response = external_api.call(endpoint, data)
return response
# Usage
for request in api_requests:
external_api_task.send(request.endpoint, request.data)Synchronization barrier for coordinating multiple tasks.
class Barrier:
def __init__(self, backend: RateLimiterBackend, key: str, *, ttl: int = 900000):
"""
Create synchronization barrier.
Parameters:
- backend: Backend for coordination state
- key: Unique key for this barrier
- ttl: TTL for barrier state in milliseconds (default: 15 minutes)
"""
def create(self, size: int):
"""
Create barrier for specified number of participants.
Parameters:
- size: Number of tasks that must reach barrier
"""
def wait(self, timeout: int = None):
"""
Wait at barrier until all participants arrive.
Parameters:
- timeout: Timeout in milliseconds
Raises:
BarrierTimeout: If timeout exceeded
"""Usage:
from dramatiq.rate_limits import Barrier
# Create barrier for coordinating 5 tasks
barrier = Barrier(backend, "processing_barrier")
barrier.create(5)
@dramatiq.actor
def coordinated_task(task_id, data):
# Do individual processing
result = process_data(data)
# Wait for all tasks to complete processing
print(f"Task {task_id} waiting at barrier...")
barrier.wait(timeout=60000) # 1 minute timeout
# All tasks proceed together
print(f"Task {task_id} proceeding after barrier")
return finalize_result(result)
# Launch coordinated tasks
for i in range(5):
coordinated_task.send(i, f"data_{i}")Production backend using Redis for distributed rate limiting.
class RedisBackend(RateLimiterBackend):
def __init__(self, client: redis.Redis, *, encoder: Encoder = None):
"""
Create Redis backend for rate limiting.
Parameters:
- client: Redis client instance
- encoder: Message encoder (uses JSON if None)
"""Usage:
import redis
from dramatiq.rate_limits.backends import RedisBackend
# Create Redis client
redis_client = redis.Redis(host="localhost", port=6379, db=1)
# Create backend
redis_backend = RedisBackend(redis_client)
# Use with rate limiters
limiter = ConcurrentRateLimiter(redis_backend, "shared_resource", limit=10)Memcached backend for rate limiting state.
class MemcachedBackend(RateLimiterBackend):
def __init__(self, client, *, encoder: Encoder = None):
"""
Create Memcached backend for rate limiting.
Parameters:
- client: Memcached client instance
- encoder: Message encoder (uses JSON if None)
"""Usage:
import pylibmc
from dramatiq.rate_limits.backends import MemcachedBackend
# Create Memcached client
mc_client = pylibmc.Client(["127.0.0.1:11211"])
# Create backend
mc_backend = MemcachedBackend(mc_client)
# Use with rate limiters
limiter = WindowRateLimiter(mc_backend, "api_calls", limit=1000, window=3600000)In-memory backend for testing and development.
class StubBackend(RateLimiterBackend):
def __init__(self):
"""Create in-memory backend for testing."""Usage:
from dramatiq.rate_limits.backends import StubBackend
# Create stub backend for testing
stub_backend = StubBackend()
# Use in tests
test_limiter = ConcurrentRateLimiter(stub_backend, "test_key", limit=2)
def test_rate_limiting():
# Test rate limiting behavior
with test_limiter.acquire():
assert True # First acquisition succeeds
with test_limiter.acquire():
assert True # Second acquisition succeeds
# Third acquisition should fail
try:
with test_limiter.acquire():
assert False, "Should have been rate limited"
except dramatiq.RateLimitExceeded:
assert True # Expected behaviordef create_user_rate_limiter(user_id, limit=10):
"""Create rate limiter per user"""
return ConcurrentRateLimiter(
backend,
f"user:{user_id}:operations",
limit=limit,
ttl=3600000 # 1 hour
)
@dramatiq.actor
def user_operation_task(user_id, operation_data):
user_limiter = create_user_rate_limiter(user_id, limit=5)
with user_limiter.acquire():
# User-specific rate limiting
result = perform_user_operation(user_id, operation_data)
return result
# Each user gets their own rate limit
user_operation_task.send(123, {"action": "update_profile"})
user_operation_task.send(456, {"action": "send_message"})@dramatiq.actor
def api_request_task(service, endpoint, data):
# Global rate limit for all API calls
global_limiter = WindowRateLimiter(
backend, "global_api", limit=1000, window=3600000
)
# Service-specific rate limit
service_limiter = WindowRateLimiter(
backend, f"service:{service}", limit=200, window=3600000
)
# Endpoint-specific rate limit
endpoint_limiter = ConcurrentRateLimiter(
backend, f"endpoint:{service}:{endpoint}", limit=5
)
# Acquire all limits
with global_limiter.acquire():
with service_limiter.acquire():
with endpoint_limiter.acquire():
response = call_api(service, endpoint, data)
return response
# Usage with hierarchical limits
api_request_task.send("payments", "process_payment", payment_data)
api_request_task.send("users", "get_profile", user_data)@dramatiq.actor
def resilient_task(data, priority="normal"):
# Different limits based on priority
if priority == "high":
limiter = ConcurrentRateLimiter(backend, "high_priority", limit=20)
elif priority == "normal":
limiter = ConcurrentRateLimiter(backend, "normal_priority", limit=10)
else:
limiter = ConcurrentRateLimiter(backend, "low_priority", limit=5)
try:
with limiter.acquire():
return perform_full_processing(data)
except dramatiq.RateLimitExceeded:
# Graceful degradation: simplified processing
return perform_basic_processing(data)
# Tasks adapt to rate limiting
resilient_task.send(data, priority="high")
resilient_task.send(data, priority="normal")import time
def get_time_based_limiter(time_period="business_hours"):
"""Create different limits based on time"""
current_hour = time.gmtime().tm_hour
if time_period == "business_hours" and 9 <= current_hour <= 17:
# Higher limit during business hours
return WindowRateLimiter(backend, "business_hours", limit=500, window=3600000)
else:
# Lower limit during off-hours
return WindowRateLimiter(backend, "off_hours", limit=100, window=3600000)
@dramatiq.actor
def time_aware_task(data):
limiter = get_time_based_limiter()
with limiter.acquire():
return process_with_time_awareness(data)import time
from collections import defaultdict
class MetricsRateLimiter:
def __init__(self, limiter):
self.limiter = limiter
self.metrics = defaultdict(int)
self.last_reset = time.time()
def acquire(self, **kwargs):
try:
return self.limiter.acquire(**kwargs)
except dramatiq.RateLimitExceeded:
self.metrics["rate_limited"] += 1
raise
finally:
self.metrics["attempts"] += 1
# Reset metrics hourly
if time.time() - self.last_reset > 3600:
print(f"Rate limiting metrics: {dict(self.metrics)}")
self.metrics.clear()
self.last_reset = time.time()
@dramatiq.actor
def monitored_task(data):
limiter = MetricsRateLimiter(
ConcurrentRateLimiter(backend, "monitored", limit=10)
)
with limiter.acquire():
return process_with_monitoring(data)class RateLimitExceeded(Exception):
"""
Raised when rate limit is exceeded.
This is the same exception as dramatiq.RateLimitExceeded
"""Usage:
@dramatiq.actor
def rate_limited_task(data):
limiter = ConcurrentRateLimiter(backend, "limited", limit=3)
try:
with limiter.acquire():
return process_data(data)
except dramatiq.RateLimitExceeded:
# Handle rate limiting gracefully
print("Rate limit exceeded, scheduling for later")
# Could reschedule with delay
rate_limited_task.send_with_options(
args=(data,),
delay=60000 # Retry in 1 minute
)
return {"status": "rate_limited", "retry_scheduled": True}Rate limiting can be integrated directly into actor middleware:
class ActorRateLimitMiddleware(dramatiq.Middleware):
def __init__(self, backend, default_limit=10):
self.backend = backend
self.default_limit = default_limit
@property
def actor_options(self):
return {"rate_limit", "rate_limit_key"}
def before_process_message(self, broker, message):
rate_limit = message.options.get("rate_limit")
if rate_limit:
key = message.options.get("rate_limit_key", message.actor_name)
limiter = ConcurrentRateLimiter(
self.backend,
key,
limit=rate_limit
)
# Store limiter in message options for cleanup
message.options["_rate_limiter"] = limiter
limiter.acquire().__enter__()
def after_process_message(self, broker, message, *, result=None, exception=None):
limiter = message.options.get("_rate_limiter")
if limiter:
limiter.acquire().__exit__(None, None, None)
# Add middleware
rate_limit_middleware = ActorRateLimitMiddleware(backend)
broker.add_middleware(rate_limit_middleware)
# Use with actors
@dramatiq.actor(rate_limit=5, rate_limit_key="email_sending")
def send_email_task(to, subject, body):
send_email(to, subject, body)
return f"Email sent to {to}"Install with Tessl CLI
npx tessl i tessl/pypi-dramatiq