or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdindex.mdrate-limit-items.mdstorage.mdstrategies.mdutilities.md

async.mddocs/

0

# Asynchronous API

1

2

The limits library provides complete asynchronous support through the `limits.aio` module, offering the same functionality as the synchronous API but optimized for async/await applications and frameworks like FastAPI, aiohttp, and asyncio-based applications.

3

4

## Capabilities

5

6

### Async Rate Limiting Strategies

7

8

Asynchronous versions of all rate limiting strategies with identical interfaces but using async/await patterns.

9

10

```python { .api }

11

from abc import ABC, abstractmethod

12

from limits.limits import RateLimitItem

13

from limits.util import WindowStats

14

15

class RateLimiter(ABC):

16

"""Abstract base class for async rate limiting strategies"""

17

18

def __init__(self, storage: "limits.aio.storage.Storage"):

19

"""

20

Initialize async rate limiter with async storage backend.

21

22

Args:

23

storage: Async storage backend instance

24

"""

25

26

@abstractmethod

27

async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

28

"""

29

Asynchronously consume the rate limit.

30

31

Args:

32

item: Rate limit item defining limits

33

identifiers: Unique identifiers for this limit instance

34

cost: Cost of this request (default: 1)

35

36

Returns:

37

True if request is allowed, False if rate limit exceeded

38

"""

39

40

@abstractmethod

41

async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

42

"""

43

Asynchronously check if rate limit allows request without consuming.

44

45

Args:

46

item: Rate limit item defining limits

47

identifiers: Unique identifiers for this limit instance

48

cost: Expected cost to consume (default: 1)

49

50

Returns:

51

True if request would be allowed, False otherwise

52

"""

53

54

@abstractmethod

55

async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:

56

"""

57

Asynchronously get current window statistics.

58

59

Args:

60

item: Rate limit item defining limits

61

identifiers: Unique identifiers for this limit instance

62

63

Returns:

64

WindowStats with reset_time and remaining quota

65

"""

66

67

async def clear(self, item: RateLimitItem, *identifiers: str) -> None:

68

"""

69

Asynchronously clear rate limit data.

70

71

Args:

72

item: Rate limit item defining limits

73

identifiers: Unique identifiers for this limit instance

74

"""

75

76

class FixedWindowRateLimiter(RateLimiter):

77

"""Async fixed window rate limiting strategy"""

78

79

async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

80

"""Async version of fixed window hit"""

81

82

async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

83

"""Async version of fixed window test"""

84

85

async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:

86

"""Async version of fixed window stats"""

87

88

class MovingWindowRateLimiter(RateLimiter):

89

"""Async moving window rate limiting strategy"""

90

91

def __init__(self, storage: "limits.aio.storage.Storage"):

92

"""

93

Initialize async moving window rate limiter.

94

95

Requires async storage backend with MovingWindowSupport.

96

97

Args:

98

storage: Async storage with moving window support

99

100

Raises:

101

NotImplementedError: If storage lacks moving window support

102

"""

103

104

async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

105

"""Async version of moving window hit"""

106

107

async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

108

"""Async version of moving window test"""

109

110

async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:

111

"""Async version of moving window stats"""

112

113

class SlidingWindowCounterRateLimiter(RateLimiter):

114

"""Async sliding window counter rate limiting strategy"""

115

116

def __init__(self, storage: "limits.aio.storage.Storage"):

117

"""

118

Initialize async sliding window counter rate limiter.

119

120

Requires async storage backend with SlidingWindowCounterSupport.

121

122

Args:

123

storage: Async storage with sliding window counter support

124

125

Raises:

126

NotImplementedError: If storage lacks sliding window counter support

127

"""

128

129

async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

130

"""Async version of sliding window counter hit"""

131

132

async def test(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

133

"""Async version of sliding window counter test"""

134

135

async def get_window_stats(self, item: RateLimitItem, *identifiers: str) -> WindowStats:

136

"""Async version of sliding window counter stats"""

137

138

class FixedWindowElasticExpiryRateLimiter(FixedWindowRateLimiter):

139

"""Async fixed window with elastic expiry (deprecated in 4.1)"""

140

141

async def hit(self, item: RateLimitItem, *identifiers: str, cost: int = 1) -> bool:

142

"""Async version of elastic expiry hit"""

143

```

144

145

### Async Storage Backends

146

147

Asynchronous storage implementations providing the same interface as synchronous versions but with async/await support.

148

149

```python { .api }

150

from abc import ABC, abstractmethod

151

152

class Storage(ABC):

153

"""Base class for async storage backends"""

154

155

@abstractmethod

156

async def incr(self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1) -> int:

157

"""Asynchronously increment counter for key"""

158

159

@abstractmethod

160

async def get(self, key: str) -> int:

161

"""Asynchronously get current counter value"""

162

163

@abstractmethod

164

async def get_expiry(self, key: str) -> float:

165

"""Asynchronously get expiration time for key"""

166

167

@abstractmethod

168

async def check(self) -> bool:

169

"""Asynchronously check storage health"""

170

171

@abstractmethod

172

async def reset(self) -> None:

173

"""Asynchronously reset all stored data"""

174

175

@abstractmethod

176

async def clear(self, key: str) -> None:

177

"""Asynchronously clear data for specific key"""

178

179

class MovingWindowSupport(ABC):

180

"""Interface for async storage supporting moving window"""

181

182

@abstractmethod

183

async def acquire_entry(self, key: str, limit: int, expiry: int, amount: int = 1) -> bool:

184

"""Asynchronously acquire entry in moving window"""

185

186

@abstractmethod

187

async def get_moving_window(self, key: str, limit: int, expiry: int) -> tuple[float, int]:

188

"""Asynchronously get moving window state"""

189

190

class SlidingWindowCounterSupport(ABC):

191

"""Interface for async storage supporting sliding window counter"""

192

193

@abstractmethod

194

async def get_sliding_window(self, key: str, expiry: int) -> tuple[int, float, int, float]:

195

"""Asynchronously get sliding window counter state"""

196

197

@abstractmethod

198

async def acquire_sliding_window_entry(self, key: str, limit: int, expiry: int, amount: int) -> bool:

199

"""Asynchronously acquire sliding window counter entry"""

200

201

class MemoryStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):

202

"""Async in-memory storage backend"""

203

204

def __init__(self):

205

"""Initialize async memory storage"""

206

207

class RedisStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):

208

"""Async Redis storage backend"""

209

210

def __init__(self, uri: str, **options):

211

"""Initialize async Redis storage"""

212

213

class RedisClusterStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):

214

"""Async Redis Cluster storage backend"""

215

216

def __init__(self, uri: str, **options):

217

"""Initialize async Redis Cluster storage"""

218

219

class RedisSentinelStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):

220

"""Async Redis Sentinel storage backend"""

221

222

def __init__(self, uri: str, **options):

223

"""Initialize async Redis Sentinel storage"""

224

225

class MemcachedStorage(Storage):

226

"""Async Memcached storage backend"""

227

228

def __init__(self, uri: str, **options):

229

"""Initialize async Memcached storage"""

230

231

class MongoDBStorage(Storage, MovingWindowSupport, SlidingWindowCounterSupport):

232

"""Async MongoDB storage backend"""

233

234

def __init__(self, uri: str, **options):

235

"""Initialize async MongoDB storage"""

236

237

class EtcdStorage(Storage):

238

"""Async etcd storage backend"""

239

240

def __init__(self, uri: str, **options):

241

"""Initialize async etcd storage"""

242

```

243

244

## Usage Examples

245

246

### Basic Async Rate Limiting

247

248

```python

249

import asyncio

250

from limits import RateLimitItemPerMinute

251

from limits.aio.storage import storage_from_string

252

from limits.aio.strategies import FixedWindowRateLimiter

253

254

async def rate_limit_example():

255

# Create rate limit and async storage

256

rate_limit = RateLimitItemPerMinute(60) # 60 requests per minute

257

storage = storage_from_string("async+redis://localhost:6379")

258

259

# Create async rate limiter

260

limiter = FixedWindowRateLimiter(storage)

261

262

user_id = "user123"

263

264

# Test rate limit asynchronously

265

if await limiter.test(rate_limit, user_id):

266

# Consume rate limit asynchronously

267

success = await limiter.hit(rate_limit, user_id)

268

if success:

269

print("Request allowed")

270

else:

271

print("Rate limit exceeded")

272

else:

273

print("Rate limit would be exceeded")

274

275

# Get window statistics asynchronously

276

stats = await limiter.get_window_stats(rate_limit, user_id)

277

print(f"Remaining: {stats.remaining}")

278

print(f"Reset time: {stats.reset_time}")

279

280

# Run async example

281

asyncio.run(rate_limit_example())

282

```

283

284

### FastAPI Integration

285

286

```python

287

from fastapi import FastAPI, HTTPException, Request

288

from limits import RateLimitItemPerMinute

289

from limits.aio.storage import storage_from_string

290

from limits.aio.strategies import FixedWindowRateLimiter

291

292

app = FastAPI()

293

294

# Initialize async rate limiting components

295

rate_limit = RateLimitItemPerMinute(100) # 100 requests per minute

296

storage = None

297

limiter = None

298

299

@app.on_event("startup")

300

async def setup_rate_limiting():

301

global storage, limiter

302

storage = storage_from_string("async+redis://localhost:6379")

303

limiter = FixedWindowRateLimiter(storage)

304

305

@app.middleware("http")

306

async def rate_limiting_middleware(request: Request, call_next):

307

# Get client identifier (IP address or user ID)

308

client_id = request.client.host

309

310

# Check rate limit

311

if not await limiter.test(rate_limit, client_id):

312

raise HTTPException(

313

status_code=429,

314

detail="Rate limit exceeded",

315

headers={"Retry-After": "60"}

316

)

317

318

# Consume rate limit

319

await limiter.hit(rate_limit, client_id)

320

321

# Process request

322

response = await call_next(request)

323

324

# Add rate limiting headers

325

stats = await limiter.get_window_stats(rate_limit, client_id)

326

response.headers["X-RateLimit-Remaining"] = str(stats.remaining)

327

response.headers["X-RateLimit-Reset"] = str(int(stats.reset_time))

328

329

return response

330

331

@app.get("/api/data")

332

async def get_data():

333

return {"message": "Data retrieved successfully"}

334

```

335

336

### aiohttp Integration

337

338

```python

339

from aiohttp import web, ClientError

340

from limits import RateLimitItemPerSecond

341

from limits.aio.storage import storage_from_string

342

from limits.aio.strategies import SlidingWindowCounterRateLimiter

343

344

async def rate_limiting_middleware(request, handler):

345

# Get rate limiter from app context

346

limiter = request.app['rate_limiter']

347

rate_limit = request.app['rate_limit']

348

349

# Use IP address as identifier

350

client_id = request.remote

351

352

# Check and consume rate limit

353

if await limiter.test(rate_limit, client_id):

354

await limiter.hit(rate_limit, client_id)

355

356

# Add rate limit headers

357

stats = await limiter.get_window_stats(rate_limit, client_id)

358

response = await handler(request)

359

response.headers['X-RateLimit-Remaining'] = str(stats.remaining)

360

response.headers['X-RateLimit-Reset'] = str(int(stats.reset_time))

361

362

return response

363

else:

364

# Rate limit exceeded

365

raise web.HTTPTooManyRequests(

366

text="Rate limit exceeded",

367

headers={'Retry-After': '1'}

368

)

369

370

async def hello_handler(request):

371

return web.json_response({'message': 'Hello, World!'})

372

373

async def create_app():

374

# Setup rate limiting

375

rate_limit = RateLimitItemPerSecond(10) # 10 requests per second

376

storage = storage_from_string("async+memory://")

377

limiter = SlidingWindowCounterRateLimiter(storage)

378

379

# Create app

380

app = web.Application(middlewares=[rate_limiting_middleware])

381

app['rate_limit'] = rate_limit

382

app['rate_limiter'] = limiter

383

384

# Add routes

385

app.router.add_get('/', hello_handler)

386

387

return app

388

389

if __name__ == '__main__':

390

app = create_app()

391

web.run_app(app, host='localhost', port=8080)

392

```

393

394

### Async Context Manager Usage

395

396

```python

397

import asyncio

398

from contextlib import asynccontextmanager

399

from limits import RateLimitItemPerMinute

400

from limits.aio.storage import storage_from_string

401

from limits.aio.strategies import MovingWindowRateLimiter

402

403

@asynccontextmanager

404

async def rate_limiter_context():

405

"""Context manager for async rate limiter lifecycle"""

406

storage = storage_from_string("async+redis://localhost:6379")

407

limiter = MovingWindowRateLimiter(storage)

408

409

try:

410

yield limiter

411

finally:

412

# Cleanup if needed

413

await storage.reset()

414

415

async def process_requests():

416

rate_limit = RateLimitItemPerMinute(50)

417

418

async with rate_limiter_context() as limiter:

419

# Process multiple requests

420

for i in range(100):

421

user_id = f"user_{i % 10}" # 10 different users

422

423

if await limiter.test(rate_limit, user_id):

424

success = await limiter.hit(rate_limit, user_id)

425

if success:

426

print(f"Processing request {i} for {user_id}")

427

# Simulate async work

428

await asyncio.sleep(0.1)

429

else:

430

print(f"Rate limit exceeded for {user_id}")

431

432

asyncio.run(process_requests())

433

```

434

435

### Batch Operations with Async

436

437

```python

438

import asyncio

439

from limits import RateLimitItemPerSecond

440

from limits.aio.storage import storage_from_string

441

from limits.aio.strategies import FixedWindowRateLimiter

442

443

async def process_batch_requests():

444

# Setup

445

rate_limit = RateLimitItemPerSecond(100) # 100 requests per second

446

storage = storage_from_string("async+memory://")

447

limiter = FixedWindowRateLimiter(storage)

448

449

# Simulate batch of requests from different users

450

requests = [

451

("user1", 5), # 5 requests from user1

452

("user2", 10), # 10 requests from user2

453

("user3", 3), # 3 requests from user3

454

("user1", 2), # 2 more requests from user1

455

]

456

457

async def process_user_requests(user_id, count):

458

"""Process multiple requests for a single user"""

459

results = []

460

461

for i in range(count):

462

# Test with cost (each request has cost of 1)

463

if await limiter.test(rate_limit, user_id, cost=1):

464

success = await limiter.hit(rate_limit, user_id, cost=1)

465

results.append(f"{user_id} request {i+1}: {'success' if success else 'failed'}")

466

else:

467

results.append(f"{user_id} request {i+1}: rate limited")

468

469

return results

470

471

# Process all user requests concurrently

472

tasks = [process_user_requests(user_id, count) for user_id, count in requests]

473

results = await asyncio.gather(*tasks)

474

475

# Print results

476

for user_results in results:

477

for result in user_results:

478

print(result)

479

480

# Show final stats for each user

481

unique_users = set(user_id for user_id, _ in requests)

482

for user_id in unique_users:

483

stats = await limiter.get_window_stats(rate_limit, user_id)

484

print(f"{user_id} - Remaining: {stats.remaining}, Reset: {stats.reset_time}")

485

486

asyncio.run(process_batch_requests())

487

```

488

489

## Strategy Registry

490

491

```python { .api }

492

# Async strategy registry

493

STRATEGIES: dict[str, type[RateLimiter]] = {

494

"fixed-window": FixedWindowRateLimiter,

495

"moving-window": MovingWindowRateLimiter,

496

"sliding-window-counter": SlidingWindowCounterRateLimiter,

497

"fixed-window-elastic-expiry": FixedWindowElasticExpiryRateLimiter

498

}

499

```