0
# Rate Limiting
1
2
Rate limiting in Dramatiq provides mechanisms to control task execution rates and implement synchronization patterns. The system supports multiple rate limiting strategies and backends for different use cases, from simple concurrent execution limits to sophisticated token bucket algorithms.
3
4
## Capabilities
5
6
### Rate Limiter Base Classes
7
8
#### RateLimiter
9
10
Base class for all rate limiting implementations.
11
12
```python { .api }
13
class RateLimiter:
14
def __init__(self, backend: RateLimiterBackend, key: str):
15
"""
16
Initialize rate limiter.
17
18
Parameters:
19
- backend: Backend for storing rate limit state
20
- key: Unique key for this rate limiter instance
21
"""
22
23
def acquire(self, *, raise_on_failure: bool = True) -> bool:
24
"""
25
Context manager for acquiring rate limit permission.
26
27
Parameters:
28
- raise_on_failure: Whether to raise exception when limit exceeded
29
30
Returns:
31
True if acquired, False if limit exceeded (when raise_on_failure=False)
32
33
Raises:
34
RateLimitExceeded: When limit is exceeded and raise_on_failure=True
35
36
Usage:
37
with rate_limiter.acquire():
38
# Protected code here
39
pass
40
"""
41
```
42
43
#### RateLimiterBackend
44
45
Abstract backend for storing rate limiter state.
46
47
```python { .api }
48
class RateLimiterBackend:
49
"""
50
Abstract base class for rate limiter backends.
51
52
Backends provide persistent storage for rate limiter state
53
across multiple processes and workers.
54
"""
55
56
def add(self, key: str, value: int, ttl: int) -> bool:
57
"""Add value if key doesn't exist."""
58
59
def incr(self, key: str, amount: int, ttl: int) -> int:
60
"""Increment key by amount."""
61
62
def decr(self, key: str, amount: int, ttl: int) -> int:
63
"""Decrement key by amount."""
64
65
def incr_and_sum(self, keys: List[str], amount: int, ttl: int) -> int:
66
"""Increment multiple keys and return sum."""
67
```
68
69
### Rate Limiter Implementations
70
71
#### Concurrent Rate Limiter
72
73
Limits the number of concurrent executions.
74
75
```python { .api }
76
class ConcurrentRateLimiter(RateLimiter):
77
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, ttl: int = 900000):
78
"""
79
Create concurrent execution rate limiter.
80
81
Parameters:
82
- backend: Backend for storing state
83
- key: Unique key for this limiter
84
- limit: Maximum number of concurrent executions
85
- ttl: TTL for state in milliseconds (default: 15 minutes)
86
"""
87
```
88
89
**Usage:**
90
91
```python
92
from dramatiq.rate_limits import ConcurrentRateLimiter
93
from dramatiq.rate_limits.backends import RedisBackend
94
95
# Set up backend
96
backend = RedisBackend()
97
98
# Create concurrent rate limiter
99
concurrent_limiter = ConcurrentRateLimiter(
100
backend,
101
"api_calls",
102
limit=5, # Max 5 concurrent executions
103
ttl=60000 # 1 minute TTL
104
)
105
106
@dramatiq.actor
107
def api_call_task(url):
108
with concurrent_limiter.acquire():
109
# Only 5 of these can run concurrently
110
response = requests.get(url)
111
return response.json()
112
113
# Usage
114
for i in range(20):
115
api_call_task.send(f"https://api.example.com/data/{i}")
116
```
117
118
#### Bucket Rate Limiter
119
120
Token bucket algorithm for controlling request rates.
121
122
```python { .api }
123
class BucketRateLimiter(RateLimiter):
124
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, bucket: int):
125
"""
126
Create token bucket rate limiter.
127
128
Parameters:
129
- backend: Backend for storing state
130
- key: Unique key for this limiter
131
- limit: Number of tokens to add per time window
132
- bucket: Maximum bucket capacity (burst allowance)
133
"""
134
```
135
136
**Usage:**
137
138
```python
139
from dramatiq.rate_limits import BucketRateLimiter
140
141
# Token bucket: 10 requests per minute, burst up to 20
142
bucket_limiter = BucketRateLimiter(
143
backend,
144
"email_sending",
145
limit=10, # 10 tokens per time window
146
bucket=20 # Burst capacity of 20
147
)
148
149
@dramatiq.actor
150
def send_email_task(to, subject, body):
151
with bucket_limiter.acquire():
152
# Rate limited email sending
153
send_email(to, subject, body)
154
return f"Email sent to {to}"
155
156
# Can send 20 emails quickly, then limited to 10 per time window
157
for user in users:
158
send_email_task.send(user.email, "Newsletter", "Content...")
159
```
160
161
#### Window Rate Limiter
162
163
Sliding window rate limiting.
164
165
```python { .api }
166
class WindowRateLimiter(RateLimiter):
167
def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, window: int):
168
"""
169
Create sliding window rate limiter.
170
171
Parameters:
172
- backend: Backend for storing state
173
- key: Unique key for this limiter
174
- limit: Maximum operations per window
175
- window: Time window in milliseconds
176
"""
177
```
178
179
**Usage:**
180
181
```python
182
from dramatiq.rate_limits import WindowRateLimiter
183
184
# Sliding window: max 100 requests per hour
185
window_limiter = WindowRateLimiter(
186
backend,
187
"api_requests",
188
limit=100, # 100 requests
189
window=3600000 # per hour (3600 seconds)
190
)
191
192
@dramatiq.actor
193
def external_api_task(endpoint, data):
194
with window_limiter.acquire():
195
# Limited to 100 calls per hour
196
response = external_api.call(endpoint, data)
197
return response
198
199
# Usage
200
for request in api_requests:
201
external_api_task.send(request.endpoint, request.data)
202
```
203
204
### Synchronization Primitives
205
206
#### Barrier
207
208
Synchronization barrier for coordinating multiple tasks.
209
210
```python { .api }
211
class Barrier:
212
def __init__(self, backend: RateLimiterBackend, key: str, *, ttl: int = 900000):
213
"""
214
Create synchronization barrier.
215
216
Parameters:
217
- backend: Backend for coordination state
218
- key: Unique key for this barrier
219
- ttl: TTL for barrier state in milliseconds (default: 15 minutes)
220
"""
221
222
def create(self, size: int):
223
"""
224
Create barrier for specified number of participants.
225
226
Parameters:
227
- size: Number of tasks that must reach barrier
228
"""
229
230
def wait(self, timeout: int = None):
231
"""
232
Wait at barrier until all participants arrive.
233
234
Parameters:
235
- timeout: Timeout in milliseconds
236
237
Raises:
238
BarrierTimeout: If timeout exceeded
239
"""
240
```
241
242
**Usage:**
243
244
```python
245
from dramatiq.rate_limits import Barrier
246
247
# Create barrier for coordinating 5 tasks
248
barrier = Barrier(backend, "processing_barrier")
249
barrier.create(5)
250
251
@dramatiq.actor
252
def coordinated_task(task_id, data):
253
# Do individual processing
254
result = process_data(data)
255
256
# Wait for all tasks to complete processing
257
print(f"Task {task_id} waiting at barrier...")
258
barrier.wait(timeout=60000) # 1 minute timeout
259
260
# All tasks proceed together
261
print(f"Task {task_id} proceeding after barrier")
262
return finalize_result(result)
263
264
# Launch coordinated tasks
265
for i in range(5):
266
coordinated_task.send(i, f"data_{i}")
267
```
268
269
### Rate Limiter Backends
270
271
#### Redis Backend
272
273
Production backend using Redis for distributed rate limiting.
274
275
```python { .api }
276
class RedisBackend(RateLimiterBackend):
277
def __init__(self, client: redis.Redis, *, encoder: Encoder = None):
278
"""
279
Create Redis backend for rate limiting.
280
281
Parameters:
282
- client: Redis client instance
283
- encoder: Message encoder (uses JSON if None)
284
"""
285
```
286
287
**Usage:**
288
289
```python
290
import redis
291
from dramatiq.rate_limits.backends import RedisBackend
292
293
# Create Redis client
294
redis_client = redis.Redis(host="localhost", port=6379, db=1)
295
296
# Create backend
297
redis_backend = RedisBackend(redis_client)
298
299
# Use with rate limiters
300
limiter = ConcurrentRateLimiter(redis_backend, "shared_resource", limit=10)
301
```
302
303
#### Memcached Backend
304
305
Memcached backend for rate limiting state.
306
307
```python { .api }
308
class MemcachedBackend(RateLimiterBackend):
309
def __init__(self, client, *, encoder: Encoder = None):
310
"""
311
Create Memcached backend for rate limiting.
312
313
Parameters:
314
- client: Memcached client instance
315
- encoder: Message encoder (uses JSON if None)
316
"""
317
```
318
319
**Usage:**
320
321
```python
322
import pylibmc
323
from dramatiq.rate_limits.backends import MemcachedBackend
324
325
# Create Memcached client
326
mc_client = pylibmc.Client(["127.0.0.1:11211"])
327
328
# Create backend
329
mc_backend = MemcachedBackend(mc_client)
330
331
# Use with rate limiters
332
limiter = WindowRateLimiter(mc_backend, "api_calls", limit=1000, window=3600000)
333
```
334
335
#### Stub Backend
336
337
In-memory backend for testing and development.
338
339
```python { .api }
340
class StubBackend(RateLimiterBackend):
341
def __init__(self):
342
"""Create in-memory backend for testing."""
343
```
344
345
**Usage:**
346
347
```python
348
from dramatiq.rate_limits.backends import StubBackend
349
350
# Create stub backend for testing
351
stub_backend = StubBackend()
352
353
# Use in tests
354
test_limiter = ConcurrentRateLimiter(stub_backend, "test_key", limit=2)
355
356
def test_rate_limiting():
357
# Test rate limiting behavior
358
with test_limiter.acquire():
359
assert True # First acquisition succeeds
360
361
with test_limiter.acquire():
362
assert True # Second acquisition succeeds
363
364
# Third acquisition should fail
365
try:
366
with test_limiter.acquire():
367
assert False, "Should have been rate limited"
368
except dramatiq.RateLimitExceeded:
369
assert True # Expected behavior
370
```
371
372
### Advanced Rate Limiting Patterns
373
374
#### Per-User Rate Limiting
375
376
```python
377
def create_user_rate_limiter(user_id, limit=10):
378
"""Create rate limiter per user"""
379
return ConcurrentRateLimiter(
380
backend,
381
f"user:{user_id}:operations",
382
limit=limit,
383
ttl=3600000 # 1 hour
384
)
385
386
@dramatiq.actor
387
def user_operation_task(user_id, operation_data):
388
user_limiter = create_user_rate_limiter(user_id, limit=5)
389
390
with user_limiter.acquire():
391
# User-specific rate limiting
392
result = perform_user_operation(user_id, operation_data)
393
return result
394
395
# Each user gets their own rate limit
396
user_operation_task.send(123, {"action": "update_profile"})
397
user_operation_task.send(456, {"action": "send_message"})
398
```
399
400
#### Hierarchical Rate Limiting
401
402
```python
403
@dramatiq.actor
404
def api_request_task(service, endpoint, data):
405
# Global rate limit for all API calls
406
global_limiter = WindowRateLimiter(
407
backend, "global_api", limit=1000, window=3600000
408
)
409
410
# Service-specific rate limit
411
service_limiter = WindowRateLimiter(
412
backend, f"service:{service}", limit=200, window=3600000
413
)
414
415
# Endpoint-specific rate limit
416
endpoint_limiter = ConcurrentRateLimiter(
417
backend, f"endpoint:{service}:{endpoint}", limit=5
418
)
419
420
# Acquire all limits
421
with global_limiter.acquire():
422
with service_limiter.acquire():
423
with endpoint_limiter.acquire():
424
response = call_api(service, endpoint, data)
425
return response
426
427
# Usage with hierarchical limits
428
api_request_task.send("payments", "process_payment", payment_data)
429
api_request_task.send("users", "get_profile", user_data)
430
```
431
432
#### Rate Limiting with Graceful Degradation
433
434
```python
435
@dramatiq.actor
436
def resilient_task(data, priority="normal"):
437
# Different limits based on priority
438
if priority == "high":
439
limiter = ConcurrentRateLimiter(backend, "high_priority", limit=20)
440
elif priority == "normal":
441
limiter = ConcurrentRateLimiter(backend, "normal_priority", limit=10)
442
else:
443
limiter = ConcurrentRateLimiter(backend, "low_priority", limit=5)
444
445
try:
446
with limiter.acquire():
447
return perform_full_processing(data)
448
except dramatiq.RateLimitExceeded:
449
# Graceful degradation: simplified processing
450
return perform_basic_processing(data)
451
452
# Tasks adapt to rate limiting
453
resilient_task.send(data, priority="high")
454
resilient_task.send(data, priority="normal")
455
```
456
457
#### Time-Based Rate Limiting
458
459
```python
460
import time
461
462
def get_time_based_limiter(time_period="business_hours"):
463
"""Create different limits based on time"""
464
current_hour = time.gmtime().tm_hour
465
466
if time_period == "business_hours" and 9 <= current_hour <= 17:
467
# Higher limit during business hours
468
return WindowRateLimiter(backend, "business_hours", limit=500, window=3600000)
469
else:
470
# Lower limit during off-hours
471
return WindowRateLimiter(backend, "off_hours", limit=100, window=3600000)
472
473
@dramatiq.actor
474
def time_aware_task(data):
475
limiter = get_time_based_limiter()
476
477
with limiter.acquire():
478
return process_with_time_awareness(data)
479
```
480
481
#### Rate Limiting with Metrics
482
483
```python
484
import time
485
from collections import defaultdict
486
487
class MetricsRateLimiter:
488
def __init__(self, limiter):
489
self.limiter = limiter
490
self.metrics = defaultdict(int)
491
self.last_reset = time.time()
492
493
def acquire(self, **kwargs):
494
try:
495
return self.limiter.acquire(**kwargs)
496
except dramatiq.RateLimitExceeded:
497
self.metrics["rate_limited"] += 1
498
raise
499
finally:
500
self.metrics["attempts"] += 1
501
502
# Reset metrics hourly
503
if time.time() - self.last_reset > 3600:
504
print(f"Rate limiting metrics: {dict(self.metrics)}")
505
self.metrics.clear()
506
self.last_reset = time.time()
507
508
@dramatiq.actor
509
def monitored_task(data):
510
limiter = MetricsRateLimiter(
511
ConcurrentRateLimiter(backend, "monitored", limit=10)
512
)
513
514
with limiter.acquire():
515
return process_with_monitoring(data)
516
```
517
518
### Error Handling
519
520
```python { .api }
521
class RateLimitExceeded(Exception):
522
"""
523
Raised when rate limit is exceeded.
524
525
This is the same exception as dramatiq.RateLimitExceeded
526
"""
527
```
528
529
**Usage:**
530
531
```python
532
@dramatiq.actor
533
def rate_limited_task(data):
534
limiter = ConcurrentRateLimiter(backend, "limited", limit=3)
535
536
try:
537
with limiter.acquire():
538
return process_data(data)
539
except dramatiq.RateLimitExceeded:
540
# Handle rate limiting gracefully
541
print("Rate limit exceeded, scheduling for later")
542
# Could reschedule with delay
543
rate_limited_task.send_with_options(
544
args=(data,),
545
delay=60000 # Retry in 1 minute
546
)
547
return {"status": "rate_limited", "retry_scheduled": True}
548
```
549
550
### Integration with Actors
551
552
Rate limiting can be integrated directly into actor middleware:
553
554
```python
555
class ActorRateLimitMiddleware(dramatiq.Middleware):
556
def __init__(self, backend, default_limit=10):
557
self.backend = backend
558
self.default_limit = default_limit
559
560
@property
561
def actor_options(self):
562
return {"rate_limit", "rate_limit_key"}
563
564
def before_process_message(self, broker, message):
565
rate_limit = message.options.get("rate_limit")
566
if rate_limit:
567
key = message.options.get("rate_limit_key", message.actor_name)
568
limiter = ConcurrentRateLimiter(
569
self.backend,
570
key,
571
limit=rate_limit
572
)
573
574
# Store limiter in message options for cleanup
575
message.options["_rate_limiter"] = limiter
576
limiter.acquire().__enter__()
577
578
def after_process_message(self, broker, message, *, result=None, exception=None):
579
limiter = message.options.get("_rate_limiter")
580
if limiter:
581
limiter.acquire().__exit__(None, None, None)
582
583
# Add middleware
584
rate_limit_middleware = ActorRateLimitMiddleware(backend)
585
broker.add_middleware(rate_limit_middleware)
586
587
# Use with actors
588
@dramatiq.actor(rate_limit=5, rate_limit_key="email_sending")
589
def send_email_task(to, subject, body):
590
send_email(to, subject, body)
591
return f"Email sent to {to}"
592
```