or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

rate-limiting.mddocs/

0

# Rate Limiting

1

2

Rate limiting in Dramatiq provides mechanisms to control task execution rates and implement synchronization patterns. The system supports multiple rate limiting strategies and backends for different use cases, from simple concurrent execution limits to sophisticated token bucket algorithms.

3

4

## Capabilities

5

6

### Rate Limiter Base Classes

7

8

#### RateLimiter

9

10

Base class for all rate limiting implementations.

11

12

```python { .api }

13

class RateLimiter:

14

def __init__(self, backend: RateLimiterBackend, key: str):

15

"""

16

Initialize rate limiter.

17

18

Parameters:

19

- backend: Backend for storing rate limit state

20

- key: Unique key for this rate limiter instance

21

"""

22

23

def acquire(self, *, raise_on_failure: bool = True) -> bool:

24

"""

25

Context manager for acquiring rate limit permission.

26

27

Parameters:

28

- raise_on_failure: Whether to raise exception when limit exceeded

29

30

Returns:

31

True if acquired, False if limit exceeded (when raise_on_failure=False)

32

33

Raises:

34

RateLimitExceeded: When limit is exceeded and raise_on_failure=True

35

36

Usage:

37

with rate_limiter.acquire():

38

# Protected code here

39

pass

40

"""

41

```

42

43

#### RateLimiterBackend

44

45

Abstract backend for storing rate limiter state.

46

47

```python { .api }

48

class RateLimiterBackend:

49

"""

50

Abstract base class for rate limiter backends.

51

52

Backends provide persistent storage for rate limiter state

53

across multiple processes and workers.

54

"""

55

56

def add(self, key: str, value: int, ttl: int) -> bool:

57

"""Add value if key doesn't exist."""

58

59

def incr(self, key: str, amount: int, ttl: int) -> int:

60

"""Increment key by amount."""

61

62

def decr(self, key: str, amount: int, ttl: int) -> int:

63

"""Decrement key by amount."""

64

65

def incr_and_sum(self, keys: List[str], amount: int, ttl: int) -> int:

66

"""Increment multiple keys and return sum."""

67

```

68

69

### Rate Limiter Implementations

70

71

#### Concurrent Rate Limiter

72

73

Limits the number of concurrent executions.

74

75

```python { .api }

76

class ConcurrentRateLimiter(RateLimiter):

77

def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, ttl: int = 900000):

78

"""

79

Create concurrent execution rate limiter.

80

81

Parameters:

82

- backend: Backend for storing state

83

- key: Unique key for this limiter

84

- limit: Maximum number of concurrent executions

85

- ttl: TTL for state in milliseconds (default: 15 minutes)

86

"""

87

```

88

89

**Usage:**

90

91

```python

92

from dramatiq.rate_limits import ConcurrentRateLimiter

93

from dramatiq.rate_limits.backends import RedisBackend

94

95

# Set up backend

96

backend = RedisBackend()

97

98

# Create concurrent rate limiter

99

concurrent_limiter = ConcurrentRateLimiter(

100

backend,

101

"api_calls",

102

limit=5, # Max 5 concurrent executions

103

ttl=60000 # 1 minute TTL

104

)

105

106

@dramatiq.actor

107

def api_call_task(url):

108

with concurrent_limiter.acquire():

109

# Only 5 of these can run concurrently

110

response = requests.get(url)

111

return response.json()

112

113

# Usage

114

for i in range(20):

115

api_call_task.send(f"https://api.example.com/data/{i}")

116

```

117

118

#### Bucket Rate Limiter

119

120

Token bucket algorithm for controlling request rates.

121

122

```python { .api }

123

class BucketRateLimiter(RateLimiter):

124

def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, bucket: int):

125

"""

126

Create token bucket rate limiter.

127

128

Parameters:

129

- backend: Backend for storing state

130

- key: Unique key for this limiter

131

- limit: Number of tokens to add per time window

132

- bucket: Maximum bucket capacity (burst allowance)

133

"""

134

```

135

136

**Usage:**

137

138

```python

139

from dramatiq.rate_limits import BucketRateLimiter

140

141

# Token bucket: 10 requests per minute, burst up to 20

142

bucket_limiter = BucketRateLimiter(

143

backend,

144

"email_sending",

145

limit=10, # 10 tokens per time window

146

bucket=20 # Burst capacity of 20

147

)

148

149

@dramatiq.actor

150

def send_email_task(to, subject, body):

151

with bucket_limiter.acquire():

152

# Rate limited email sending

153

send_email(to, subject, body)

154

return f"Email sent to {to}"

155

156

# Can send 20 emails quickly, then limited to 10 per time window

157

for user in users:

158

send_email_task.send(user.email, "Newsletter", "Content...")

159

```

160

161

#### Window Rate Limiter

162

163

Sliding window rate limiting.

164

165

```python { .api }

166

class WindowRateLimiter(RateLimiter):

167

def __init__(self, backend: RateLimiterBackend, key: str, *, limit: int, window: int):

168

"""

169

Create sliding window rate limiter.

170

171

Parameters:

172

- backend: Backend for storing state

173

- key: Unique key for this limiter

174

- limit: Maximum operations per window

175

- window: Time window in milliseconds

176

"""

177

```

178

179

**Usage:**

180

181

```python

182

from dramatiq.rate_limits import WindowRateLimiter

183

184

# Sliding window: max 100 requests per hour

185

window_limiter = WindowRateLimiter(

186

backend,

187

"api_requests",

188

limit=100, # 100 requests

189

window=3600000 # per hour (3600 seconds)

190

)

191

192

@dramatiq.actor

193

def external_api_task(endpoint, data):

194

with window_limiter.acquire():

195

# Limited to 100 calls per hour

196

response = external_api.call(endpoint, data)

197

return response

198

199

# Usage

200

for request in api_requests:

201

external_api_task.send(request.endpoint, request.data)

202

```

203

204

### Synchronization Primitives

205

206

#### Barrier

207

208

Synchronization barrier for coordinating multiple tasks.

209

210

```python { .api }

211

class Barrier:

212

def __init__(self, backend: RateLimiterBackend, key: str, *, ttl: int = 900000):

213

"""

214

Create synchronization barrier.

215

216

Parameters:

217

- backend: Backend for coordination state

218

- key: Unique key for this barrier

219

- ttl: TTL for barrier state in milliseconds (default: 15 minutes)

220

"""

221

222

def create(self, size: int):

223

"""

224

Create barrier for specified number of participants.

225

226

Parameters:

227

- size: Number of tasks that must reach barrier

228

"""

229

230

def wait(self, timeout: int = None):

231

"""

232

Wait at barrier until all participants arrive.

233

234

Parameters:

235

- timeout: Timeout in milliseconds

236

237

Raises:

238

BarrierTimeout: If timeout exceeded

239

"""

240

```

241

242

**Usage:**

243

244

```python

245

from dramatiq.rate_limits import Barrier

246

247

# Create barrier for coordinating 5 tasks

248

barrier = Barrier(backend, "processing_barrier")

249

barrier.create(5)

250

251

@dramatiq.actor

252

def coordinated_task(task_id, data):

253

# Do individual processing

254

result = process_data(data)

255

256

# Wait for all tasks to complete processing

257

print(f"Task {task_id} waiting at barrier...")

258

barrier.wait(timeout=60000) # 1 minute timeout

259

260

# All tasks proceed together

261

print(f"Task {task_id} proceeding after barrier")

262

return finalize_result(result)

263

264

# Launch coordinated tasks

265

for i in range(5):

266

coordinated_task.send(i, f"data_{i}")

267

```

268

269

### Rate Limiter Backends

270

271

#### Redis Backend

272

273

Production backend using Redis for distributed rate limiting.

274

275

```python { .api }

276

class RedisBackend(RateLimiterBackend):

277

def __init__(self, client: redis.Redis, *, encoder: Encoder = None):

278

"""

279

Create Redis backend for rate limiting.

280

281

Parameters:

282

- client: Redis client instance

283

- encoder: Message encoder (uses JSON if None)

284

"""

285

```

286

287

**Usage:**

288

289

```python

290

import redis

291

from dramatiq.rate_limits.backends import RedisBackend

292

293

# Create Redis client

294

redis_client = redis.Redis(host="localhost", port=6379, db=1)

295

296

# Create backend

297

redis_backend = RedisBackend(redis_client)

298

299

# Use with rate limiters

300

limiter = ConcurrentRateLimiter(redis_backend, "shared_resource", limit=10)

301

```

302

303

#### Memcached Backend

304

305

Memcached backend for rate limiting state.

306

307

```python { .api }

308

class MemcachedBackend(RateLimiterBackend):

309

def __init__(self, client, *, encoder: Encoder = None):

310

"""

311

Create Memcached backend for rate limiting.

312

313

Parameters:

314

- client: Memcached client instance

315

- encoder: Message encoder (uses JSON if None)

316

"""

317

```

318

319

**Usage:**

320

321

```python

322

import pylibmc

323

from dramatiq.rate_limits.backends import MemcachedBackend

324

325

# Create Memcached client

326

mc_client = pylibmc.Client(["127.0.0.1:11211"])

327

328

# Create backend

329

mc_backend = MemcachedBackend(mc_client)

330

331

# Use with rate limiters

332

limiter = WindowRateLimiter(mc_backend, "api_calls", limit=1000, window=3600000)

333

```

334

335

#### Stub Backend

336

337

In-memory backend for testing and development.

338

339

```python { .api }

340

class StubBackend(RateLimiterBackend):

341

def __init__(self):

342

"""Create in-memory backend for testing."""

343

```

344

345

**Usage:**

346

347

```python

348

from dramatiq.rate_limits.backends import StubBackend

349

350

# Create stub backend for testing

351

stub_backend = StubBackend()

352

353

# Use in tests

354

test_limiter = ConcurrentRateLimiter(stub_backend, "test_key", limit=2)

355

356

def test_rate_limiting():

357

# Test rate limiting behavior

358

with test_limiter.acquire():

359

assert True # First acquisition succeeds

360

361

with test_limiter.acquire():

362

assert True # Second acquisition succeeds

363

364

# Third acquisition should fail

365

try:

366

with test_limiter.acquire():

367

assert False, "Should have been rate limited"

368

except dramatiq.RateLimitExceeded:

369

assert True # Expected behavior

370

```

371

372

### Advanced Rate Limiting Patterns

373

374

#### Per-User Rate Limiting

375

376

```python

377

def create_user_rate_limiter(user_id, limit=10):

378

"""Create rate limiter per user"""

379

return ConcurrentRateLimiter(

380

backend,

381

f"user:{user_id}:operations",

382

limit=limit,

383

ttl=3600000 # 1 hour

384

)

385

386

@dramatiq.actor

387

def user_operation_task(user_id, operation_data):

388

user_limiter = create_user_rate_limiter(user_id, limit=5)

389

390

with user_limiter.acquire():

391

# User-specific rate limiting

392

result = perform_user_operation(user_id, operation_data)

393

return result

394

395

# Each user gets their own rate limit

396

user_operation_task.send(123, {"action": "update_profile"})

397

user_operation_task.send(456, {"action": "send_message"})

398

```

399

400

#### Hierarchical Rate Limiting

401

402

```python

403

@dramatiq.actor

404

def api_request_task(service, endpoint, data):

405

# Global rate limit for all API calls

406

global_limiter = WindowRateLimiter(

407

backend, "global_api", limit=1000, window=3600000

408

)

409

410

# Service-specific rate limit

411

service_limiter = WindowRateLimiter(

412

backend, f"service:{service}", limit=200, window=3600000

413

)

414

415

# Endpoint-specific rate limit

416

endpoint_limiter = ConcurrentRateLimiter(

417

backend, f"endpoint:{service}:{endpoint}", limit=5

418

)

419

420

# Acquire all limits

421

with global_limiter.acquire():

422

with service_limiter.acquire():

423

with endpoint_limiter.acquire():

424

response = call_api(service, endpoint, data)

425

return response

426

427

# Usage with hierarchical limits

428

api_request_task.send("payments", "process_payment", payment_data)

429

api_request_task.send("users", "get_profile", user_data)

430

```

431

432

#### Rate Limiting with Graceful Degradation

433

434

```python

435

@dramatiq.actor

436

def resilient_task(data, priority="normal"):

437

# Different limits based on priority

438

if priority == "high":

439

limiter = ConcurrentRateLimiter(backend, "high_priority", limit=20)

440

elif priority == "normal":

441

limiter = ConcurrentRateLimiter(backend, "normal_priority", limit=10)

442

else:

443

limiter = ConcurrentRateLimiter(backend, "low_priority", limit=5)

444

445

try:

446

with limiter.acquire():

447

return perform_full_processing(data)

448

except dramatiq.RateLimitExceeded:

449

# Graceful degradation: simplified processing

450

return perform_basic_processing(data)

451

452

# Tasks adapt to rate limiting

453

resilient_task.send(data, priority="high")

454

resilient_task.send(data, priority="normal")

455

```

456

457

#### Time-Based Rate Limiting

458

459

```python

460

import time

461

462

def get_time_based_limiter(time_period="business_hours"):

463

"""Create different limits based on time"""

464

current_hour = time.gmtime().tm_hour

465

466

if time_period == "business_hours" and 9 <= current_hour <= 17:

467

# Higher limit during business hours

468

return WindowRateLimiter(backend, "business_hours", limit=500, window=3600000)

469

else:

470

# Lower limit during off-hours

471

return WindowRateLimiter(backend, "off_hours", limit=100, window=3600000)

472

473

@dramatiq.actor

474

def time_aware_task(data):

475

limiter = get_time_based_limiter()

476

477

with limiter.acquire():

478

return process_with_time_awareness(data)

479

```

480

481

#### Rate Limiting with Metrics

482

483

```python

484

import time

485

from collections import defaultdict

486

487

class MetricsRateLimiter:

488

def __init__(self, limiter):

489

self.limiter = limiter

490

self.metrics = defaultdict(int)

491

self.last_reset = time.time()

492

493

def acquire(self, **kwargs):

494

try:

495

return self.limiter.acquire(**kwargs)

496

except dramatiq.RateLimitExceeded:

497

self.metrics["rate_limited"] += 1

498

raise

499

finally:

500

self.metrics["attempts"] += 1

501

502

# Reset metrics hourly

503

if time.time() - self.last_reset > 3600:

504

print(f"Rate limiting metrics: {dict(self.metrics)}")

505

self.metrics.clear()

506

self.last_reset = time.time()

507

508

@dramatiq.actor

509

def monitored_task(data):

510

limiter = MetricsRateLimiter(

511

ConcurrentRateLimiter(backend, "monitored", limit=10)

512

)

513

514

with limiter.acquire():

515

return process_with_monitoring(data)

516

```

517

518

### Error Handling

519

520

```python { .api }

521

class RateLimitExceeded(Exception):

522

"""

523

Raised when rate limit is exceeded.

524

525

This is the same exception as dramatiq.RateLimitExceeded

526

"""

527

```

528

529

**Usage:**

530

531

```python

532

@dramatiq.actor

533

def rate_limited_task(data):

534

limiter = ConcurrentRateLimiter(backend, "limited", limit=3)

535

536

try:

537

with limiter.acquire():

538

return process_data(data)

539

except dramatiq.RateLimitExceeded:

540

# Handle rate limiting gracefully

541

print("Rate limit exceeded, scheduling for later")

542

# Could reschedule with delay

543

rate_limited_task.send_with_options(

544

args=(data,),

545

delay=60000 # Retry in 1 minute

546

)

547

return {"status": "rate_limited", "retry_scheduled": True}

548

```

549

550

### Integration with Actors

551

552

Rate limiting can be integrated directly into actor middleware:

553

554

```python

555

class ActorRateLimitMiddleware(dramatiq.Middleware):

556

def __init__(self, backend, default_limit=10):

557

self.backend = backend

558

self.default_limit = default_limit

559

560

@property

561

def actor_options(self):

562

return {"rate_limit", "rate_limit_key"}

563

564

def before_process_message(self, broker, message):

565

rate_limit = message.options.get("rate_limit")

566

if rate_limit:

567

key = message.options.get("rate_limit_key", message.actor_name)

568

limiter = ConcurrentRateLimiter(

569

self.backend,

570

key,

571

limit=rate_limit

572

)

573

574

# Store limiter in message options for cleanup

575

message.options["_rate_limiter"] = limiter

576

limiter.acquire().__enter__()

577

578

def after_process_message(self, broker, message, *, result=None, exception=None):

579

limiter = message.options.get("_rate_limiter")

580

if limiter:

581

limiter.acquire().__exit__(None, None, None)

582

583

# Add middleware

584

rate_limit_middleware = ActorRateLimitMiddleware(backend)

585

broker.add_middleware(rate_limit_middleware)

586

587

# Use with actors

588

@dramatiq.actor(rate_limit=5, rate_limit_key="email_sending")

589

def send_email_task(to, subject, body):

590

send_email(to, subject, body)

591

return f"Email sent to {to}"

592

```