0
# Asynchronous API
1
2
The limits library provides complete asynchronous support through the `limits.aio` module, offering the same functionality as the synchronous API but optimized for async/await applications and frameworks like FastAPI, aiohttp, and asyncio-based applications.
3
4
## Capabilities
5
6
### Async Rate Limiting Strategies
7
8
Asynchronous versions of all rate limiting strategies with identical interfaces but using async/await patterns.
9
10
```python { .api }
11
from abc import ABC, abstractmethod
12
from limits.limits import RateLimitItem
13
from limits.util import WindowStats
14
15
class RateLimiter(ABC):
16
"""Abstract base class for async rate limiting strategies"""
17
18
def __init__(self, storage: "limits.aio.storage.Storage"):
19
"""
20
Initialize async rate limiter with async storage backend.
21
22
Args:
23
storage: Async storage backend instance
24
"""
25
26
@abstractmethod
27
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
28
"""
29
Asynchronously consume the rate limit.
30
31
Args:
32
item: Rate limit item defining limits
33
identifiers: Unique identifiers for this limit instance
34
cost: Cost of this request (default: 1)
35
36
Returns:
37
True if request is allowed, False if rate limit exceeded
38
"""
39
40
@abstractmethod
41
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
42
"""
43
Asynchronously check if rate limit allows request without consuming.
44
45
Args:
46
item: Rate limit item defining limits
47
identifiers: Unique identifiers for this limit instance
48
cost: Expected cost to consume (default: 1)
49
50
Returns:
51
True if request would be allowed, False otherwise
52
"""
53
54
@abstractmethod
55
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
56
"""
57
Asynchronously get current window statistics.
58
59
Args:
60
item: Rate limit item defining limits
61
identifiers: Unique identifiers for this limit instance
62
63
Returns:
64
WindowStats with reset_time and remaining quota
65
"""
66
67
async def clear(self, item: RateLimitItem, *identifiers: str) -> None:
68
"""
69
Asynchronously clear rate limit data.
70
71
Args:
72
item: Rate limit item defining limits
73
identifiers: Unique identifiers for this limit instance
74
"""
75
76
class FixedWindowRateLimiter(RateLimiter):
77
"""Async fixed window rate limiting strategy"""
78
79
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
80
"""Async version of fixed window hit"""
81
82
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
83
"""Async version of fixed window test"""
84
85
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
86
"""Async version of fixed window stats"""
87
88
class MovingWindowRateLimiter(RateLimiter):
89
"""Async moving window rate limiting strategy"""
90
91
def __init__(self, storage: "limits.aio.storage.Storage"):
92
"""
93
Initialize async moving window rate limiter.
94
95
Requires async storage backend with MovingWindowSupport.
96
97
Args:
98
storage: Async storage with moving window support
99
100
Raises:
101
NotImplementedError: If storage lacks moving window support
102
"""
103
104
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
105
"""Async version of moving window hit"""
106
107
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
108
"""Async version of moving window test"""
109
110
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
111
"""Async version of moving window stats"""
112
113
class SlidingWindowCounterRateLimiter(RateLimiter):
114
"""Async sliding window counter rate limiting strategy"""
115
116
def __init__(self, storage: "limits.aio.storage.Storage"):
117
"""
118
Initialize async sliding window counter rate limiter.
119
120
Requires async storage backend with SlidingWindowCounterSupport.
121
122
Args:
123
storage: Async storage with sliding window counter support
124
125
Raises:
126
NotImplementedError: If storage lacks sliding window counter support
127
"""
128
129
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
130
"""Async version of sliding window counter hit"""
131
132
async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
133
"""Async version of sliding window counter test"""
134
135
async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:
136
"""Async version of sliding window counter stats"""
137
138
class FixedWindowElasticExpiryRateLimiter(FixedWindowRateLimiter):
139
"""Async fixed window with elastic expiry (deprecated in 4.1)"""
140
141
async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:
142
"""Async version of elastic expiry hit"""
143
```
144
145
### Async Storage Backends
146
147
Asynchronous storage implementations providing the same interface as synchronous versions but with async/await support.
148
149
```python { .api }
150
from abc import ABC, abstractmethod
151
152
class Storage(ABC):
153
"""Base class for async storage backends"""
154
155
@abstractmethod
156
async def incr(self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1) -> int:
157
"""Asynchronously increment counter for key"""
158
159
@abstractmethod
160
async def get(self, key: str) -> int:
161
"""Asynchronously get current counter value"""
162
163
@abstractmethod
164
async def get_expiry(self, key: str) -> float:
165
"""Asynchronously get expiration time for key"""
166
167
@abstractmethod
168
async def check(self) -> bool:
169
"""Asynchronously check storage health"""
170
171
@abstractmethod
172
async def reset(self) -> None:
173
"""Asynchronously reset all stored data"""
174
175
@abstractmethod
176
async def clear(self, key: str) -> None:
177
"""Asynchronously clear data for specific key"""
178
179
class MovingWindowSupport(ABC):
180
"""Interface for async storage supporting moving window"""
181
182
@abstractmethod
183
async def acquire_entry(self, key: str, limit: int, expiry: int, amount: int = 1) -> bool:
184
"""Asynchronously acquire entry in moving window"""
185
186
@abstractmethod
187
async def get_moving_window(self, key: str, limit: int, expiry: int) -> tuple[float, int]:
188
"""Asynchronously get moving window state"""
189
190
class SlidingWindowCounterSupport(ABC):
191
"""Interface for async storage supporting sliding window counter"""
192
193
@abstractmethod
194
async def get_sliding_window(self, key: str, expiry: int) -> tuple[int, float, int, float]:
195
"""Asynchronously get sliding window counter state"""
196
197
@abstractmethod
198
async def acquire_sliding_window_entry(self, key: str, limit: int, expiry: int, amount: int) -> bool:
199
"""Asynchronously acquire sliding window counter entry"""
200
201
class MemoryStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
202
"""Async in-memory storage backend"""
203
204
def __init__(self):
205
"""Initialize async memory storage"""
206
207
class RedisStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
208
"""Async Redis storage backend"""
209
210
def __init__(self, uri: str, **options):
211
"""Initialize async Redis storage"""
212
213
class RedisClusterStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
214
"""Async Redis Cluster storage backend"""
215
216
def __init__(self, uri: str, **options):
217
"""Initialize async Redis Cluster storage"""
218
219
class RedisSentinelStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
220
"""Async Redis Sentinel storage backend"""
221
222
def __init__(self, uri: str, **options):
223
"""Initialize async Redis Sentinel storage"""
224
225
class MemcachedStorage(Storage):
226
"""Async Memcached storage backend"""
227
228
def __init__(self, uri: str, **options):
229
"""Initialize async Memcached storage"""
230
231
class MongoDBStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):
232
"""Async MongoDB storage backend"""
233
234
def __init__(self, uri: str, **options):
235
"""Initialize async MongoDB storage"""
236
237
class EtcdStorage(Storage):
238
"""Async etcd storage backend"""
239
240
def __init__(self, uri: str, **options):
241
"""Initialize async etcd storage"""
242
```
243
244
## Usage Examples
245
246
### Basic Async Rate Limiting
247
248
```python
249
import asyncio
250
from limits import RateLimitItemPerMinute
251
from limits.aio.storage import storage_from_string
252
from limits.aio.strategies import FixedWindowRateLimiter
253
254
async def rate_limit_example():
255
# Create rate limit and async storage
256
rate_limit = RateLimitItemPerMinute(60) # 60 requests per minute
257
storage = storage_from_string("async+redis://localhost:6379")
258
259
# Create async rate limiter
260
limiter = FixedWindowRateLimiter(storage)
261
262
user_id = "user123"
263
264
# Test rate limit asynchronously
265
if await limiter.test(rate_limit, user_id):
266
# Consume rate limit asynchronously
267
success = await limiter.hit(rate_limit, user_id)
268
if success:
269
print("Request allowed")
270
else:
271
print("Rate limit exceeded")
272
else:
273
print("Rate limit would be exceeded")
274
275
# Get window statistics asynchronously
276
stats = await limiter.get_window_stats(rate_limit, user_id)
277
print(f"Remaining: {stats.remaining}")
278
print(f"Reset time: {stats.reset_time}")
279
280
# Run async example
281
asyncio.run(rate_limit_example())
282
```
283
284
### FastAPI Integration
285
286
```python
287
from fastapi import FastAPI, HTTPException, Request
288
from limits import RateLimitItemPerMinute
289
from limits.aio.storage import storage_from_string
290
from limits.aio.strategies import FixedWindowRateLimiter
291
292
app = FastAPI()
293
294
# Initialize async rate limiting components
295
rate_limit = RateLimitItemPerMinute(100) # 100 requests per minute
296
storage = None
297
limiter = None
298
299
@app.on_event("startup")
300
async def setup_rate_limiting():
301
global storage, limiter
302
storage = storage_from_string("async+redis://localhost:6379")
303
limiter = FixedWindowRateLimiter(storage)
304
305
@app.middleware("http")
306
async def rate_limiting_middleware(request: Request, call_next):
307
# Get client identifier (IP address or user ID)
308
client_id = request.client.host
309
310
# Check rate limit
311
if not await limiter.test(rate_limit, client_id):
312
raise HTTPException(
313
status_code=429,
314
detail="Rate limit exceeded",
315
headers={"Retry-After": "60"}
316
)
317
318
# Consume rate limit
319
await limiter.hit(rate_limit, client_id)
320
321
# Process request
322
response = await call_next(request)
323
324
# Add rate limiting headers
325
stats = await limiter.get_window_stats(rate_limit, client_id)
326
response.headers["X-RateLimit-Remaining"] = str(stats.remaining)
327
response.headers["X-RateLimit-Reset"] = str(int(stats.reset_time))
328
329
return response
330
331
@app.get("/api/data")
332
async def get_data():
333
return {"message": "Data retrieved successfully"}
334
```
335
336
### aiohttp Integration
337
338
```python
339
from aiohttp import web, ClientError
340
from limits import RateLimitItemPerSecond
341
from limits.aio.storage import storage_from_string
342
from limits.aio.strategies import SlidingWindowCounterRateLimiter
343
344
async def rate_limiting_middleware(request, handler):
345
# Get rate limiter from app context
346
limiter = request.app['rate_limiter']
347
rate_limit = request.app['rate_limit']
348
349
# Use IP address as identifier
350
client_id = request.remote
351
352
# Check and consume rate limit
353
if await limiter.test(rate_limit, client_id):
354
await limiter.hit(rate_limit, client_id)
355
356
# Add rate limit headers
357
stats = await limiter.get_window_stats(rate_limit, client_id)
358
response = await handler(request)
359
response.headers['X-RateLimit-Remaining'] = str(stats.remaining)
360
response.headers['X-RateLimit-Reset'] = str(int(stats.reset_time))
361
362
return response
363
else:
364
# Rate limit exceeded
365
raise web.HTTPTooManyRequests(
366
text="Rate limit exceeded",
367
headers={'Retry-After': '1'}
368
)
369
370
async def hello_handler(request):
371
return web.json_response({'message': 'Hello, World!'})
372
373
async def create_app():
374
# Setup rate limiting
375
rate_limit = RateLimitItemPerSecond(10) # 10 requests per second
376
storage = storage_from_string("async+memory://")
377
limiter = SlidingWindowCounterRateLimiter(storage)
378
379
# Create app
380
app = web.Application(middlewares=[rate_limiting_middleware])
381
app['rate_limit'] = rate_limit
382
app['rate_limiter'] = limiter
383
384
# Add routes
385
app.router.add_get('/', hello_handler)
386
387
return app
388
389
if __name__ == '__main__':
390
app = create_app()
391
web.run_app(app, host='localhost', port=8080)
392
```
393
394
### Async Context Manager Usage
395
396
```python
397
import asyncio
398
from contextlib import asynccontextmanager
399
from limits import RateLimitItemPerMinute
400
from limits.aio.storage import storage_from_string
401
from limits.aio.strategies import MovingWindowRateLimiter
402
403
@asynccontextmanager
404
async def rate_limiter_context():
405
"""Context manager for async rate limiter lifecycle"""
406
storage = storage_from_string("async+redis://localhost:6379")
407
limiter = MovingWindowRateLimiter(storage)
408
409
try:
410
yield limiter
411
finally:
412
# Cleanup if needed
413
await storage.reset()
414
415
async def process_requests():
416
rate_limit = RateLimitItemPerMinute(50)
417
418
async with rate_limiter_context() as limiter:
419
# Process multiple requests
420
for i in range(100):
421
user_id = f"user_{i % 10}" # 10 different users
422
423
if await limiter.test(rate_limit, user_id):
424
success = await limiter.hit(rate_limit, user_id)
425
if success:
426
print(f"Processing request {i} for {user_id}")
427
# Simulate async work
428
await asyncio.sleep(0.1)
429
else:
430
print(f"Rate limit exceeded for {user_id}")
431
432
asyncio.run(process_requests())
433
```
434
435
### Batch Operations with Async
436
437
```python
438
import asyncio
439
from limits import RateLimitItemPerSecond
440
from limits.aio.storage import storage_from_string
441
from limits.aio.strategies import FixedWindowRateLimiter
442
443
async def process_batch_requests():
444
# Setup
445
rate_limit = RateLimitItemPerSecond(100) # 100 requests per second
446
storage = storage_from_string("async+memory://")
447
limiter = FixedWindowRateLimiter(storage)
448
449
# Simulate batch of requests from different users
450
requests = [
451
("user1", 5), # 5 requests from user1
452
("user2", 10), # 10 requests from user2
453
("user3", 3), # 3 requests from user3
454
("user1", 2), # 2 more requests from user1
455
]
456
457
async def process_user_requests(user_id, count):
458
"""Process multiple requests for a single user"""
459
results = []
460
461
for i in range(count):
462
# Test with cost (each request has cost of 1)
463
if await limiter.test(rate_limit, user_id, cost=1):
464
success = await limiter.hit(rate_limit, user_id, cost=1)
465
results.append(f"{user_id} request {i+1}: {'success' if success else 'failed'}")
466
else:
467
results.append(f"{user_id} request {i+1}: rate limited")
468
469
return results
470
471
# Process all user requests concurrently
472
tasks = [process_user_requests(user_id, count) for user_id, count in requests]
473
results = await asyncio.gather(*tasks)
474
475
# Print results
476
for user_results in results:
477
for result in user_results:
478
print(result)
479
480
# Show final stats for each user
481
unique_users = set(user_id for user_id, _ in requests)
482
for user_id in unique_users:
483
stats = await limiter.get_window_stats(rate_limit, user_id)
484
print(f"{user_id} - Remaining: {stats.remaining}, Reset: {stats.reset_time}")
485
486
asyncio.run(process_batch_requests())
487
```
488
489
## Strategy Registry
490
491
```python { .api }
492
# Async strategy registry
493
STRATEGIES: dict[str, type[RateLimiter]] = {
494
"fixed-window": FixedWindowRateLimiter,
495
"moving-window": MovingWindowRateLimiter,
496
"sliding-window-counter": SlidingWindowCounterRateLimiter,
497
"fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter
498
}
499
```