0
# Storage Backends
1
2
Multiple storage backends for persisting rate limit state across application restarts, processes, and distributed deployments. Each backend provides the same interface while offering different persistence and scalability characteristics.
3
4
## Capabilities
5
6
### In-Memory Bucket
7
8
Fast, thread-safe bucket using native Python lists for simple applications.
9
10
```python { .api }
11
class InMemoryBucket(AbstractBucket):
12
def __init__(self, rates: List[Rate]):
13
"""
14
Initialize in-memory bucket with rate configurations.
15
16
Parameters:
17
- rates: List of rate limit configurations
18
19
Characteristics:
20
- Fast and precise
21
- Not persistent across restarts
22
- Not scalable across processes
23
- Suitable for single-process applications
24
"""
25
```
26
27
Usage example:
28
29
```python
30
from pyrate_limiter import InMemoryBucket, Rate, Duration, Limiter
31
32
# Create in-memory bucket
33
rates = [Rate(10, Duration.SECOND), Rate(100, Duration.MINUTE)]
34
bucket = InMemoryBucket(rates)
35
36
# Use with limiter
37
limiter = Limiter(bucket)
38
```
39
40
### SQLite Bucket
41
42
Persistent, file-based bucket using SQLite for cross-process rate limiting.
43
44
```python { .api }
45
class SQLiteBucket(AbstractBucket):
46
def __init__(
47
self,
48
rates: List[Rate],
49
conn: sqlite3.Connection,
50
table: str,
51
lock=None
52
):
53
"""
54
Initialize SQLite bucket with database connection.
55
56
Parameters:
57
- rates: List of rate limit configurations
58
- conn: SQLite database connection
59
- table: Table name for storing rate data
60
- lock: Optional lock for thread safety
61
"""
62
63
@classmethod
64
def init_from_file(
65
cls,
66
rates: List[Rate],
67
table: str = "rate_bucket",
68
db_path: Optional[str] = None,
69
create_new_table: bool = True,
70
use_file_lock: bool = False
71
) -> "SQLiteBucket":
72
"""
73
Create SQLite bucket from file path.
74
75
Parameters:
76
- rates: List of rate limit configurations
77
- table: Table name for rate data (default: "rate_bucket")
78
- db_path: Path to SQLite database file (None for temporary file)
79
- create_new_table: Create table if it doesn't exist (default: True)
80
- use_file_lock: Enable file locking for multiprocessing
81
82
Returns:
83
- SQLiteBucket: Configured bucket instance
84
"""
85
```
86
87
Usage example:
88
89
```python
90
from pyrate_limiter import SQLiteBucket, Rate, Duration, Limiter
91
import sqlite3
92
93
# Method 1: Direct connection
94
conn = sqlite3.connect("rate_limits.db")
95
bucket = SQLiteBucket([Rate(10, Duration.SECOND)], conn, "api_limits")
96
97
# Method 2: From file (recommended)
98
bucket = SQLiteBucket.init_from_file(
99
rates=[Rate(5, Duration.SECOND)],
100
table="user_limits",
101
db_path="rate_limits.db",
102
create_new_table=True,
103
use_file_lock=True # For multiprocessing
104
)
105
106
limiter = Limiter(bucket)
107
```
108
109
### Redis Bucket
110
111
Distributed bucket using Redis for scalable, cross-instance rate limiting.
112
113
```python { .api }
114
class RedisBucket(AbstractBucket):
115
def __init__(
116
self,
117
rates: List[Rate],
118
redis: Union[Redis, AsyncRedis],
119
bucket_key: str,
120
script_hash: str
121
):
122
"""
123
Initialize Redis bucket with Redis client.
124
125
Parameters:
126
- rates: List of rate limit configurations
127
- redis: Redis client (sync or async)
128
- bucket_key: Key prefix for Redis operations
129
- script_hash: Hash of the loaded Lua script
130
131
Note: Use the init() class method for normal initialization.
132
"""
133
134
@classmethod
135
def init(
136
cls,
137
rates: List[Rate],
138
redis: Union[Redis, AsyncRedis],
139
bucket_key: str
140
):
141
"""
142
Create Redis bucket with automatic Lua script loading.
143
144
Parameters:
145
- rates: List of rate limit configurations
146
- redis: Redis client (sync or async)
147
- bucket_key: Key prefix for Redis operations
148
149
Returns:
150
- RedisBucket: Configured bucket instance (or awaitable for async)
151
152
Characteristics:
153
- Distributed and scalable
154
- Supports both sync and async operations
155
- Requires Redis server
156
- Atomic operations using Lua scripts
157
"""
158
```
159
160
Usage example:
161
162
```python
163
from pyrate_limiter import RedisBucket, Rate, Duration, Limiter
164
import redis
165
166
# Sync Redis
167
redis_client = redis.Redis(host='localhost', port=6379, db=0)
168
bucket = RedisBucket.init(
169
rates=[Rate(100, Duration.MINUTE)],
170
redis=redis_client,
171
bucket_key="api_rate_limits"
172
)
173
174
# Async Redis
175
import redis.asyncio as aioredis
176
177
async def async_example():
178
redis_client = await aioredis.from_url("redis://localhost")
179
bucket = await RedisBucket.init(
180
rates=[Rate(50, Duration.SECOND)],
181
redis=redis_client,
182
bucket_key="async_limits"
183
)
184
limiter = Limiter(bucket)
185
186
success = await limiter.try_acquire_async("user123")
187
await redis_client.close()
188
```
189
190
### PostgreSQL Bucket
191
192
Enterprise-grade bucket using PostgreSQL for high-performance distributed rate limiting.
193
194
```python { .api }
195
class PostgresBucket(AbstractBucket):
196
def __init__(
197
self,
198
pool: ConnectionPool,
199
table: str,
200
rates: List[Rate]
201
):
202
"""
203
Initialize PostgreSQL bucket with connection pool.
204
205
Parameters:
206
- pool: PostgreSQL connection pool
207
- table: Table name for rate data
208
- rates: List of rate limit configurations
209
210
Characteristics:
211
- High performance and reliability
212
- ACID compliance
213
- Supports connection pooling
214
- Suitable for enterprise applications
215
"""
216
```
217
218
Usage example:
219
220
```python
221
from pyrate_limiter import PostgresBucket, Rate, Duration, Limiter
222
from psycopg_pool import ConnectionPool
223
224
# Create connection pool
225
pool = ConnectionPool(
226
"host=localhost dbname=mydb user=myuser password=mypass",
227
min_size=1,
228
max_size=10
229
)
230
231
bucket = PostgresBucket(
232
pool=pool,
233
table="rate_limits",
234
rates=[Rate(1000, Duration.HOUR)]
235
)
236
237
limiter = Limiter(bucket)
238
239
# Don't forget to close the pool when done
240
# pool.close()
241
```
242
243
### Multiprocess Bucket
244
245
Wrapper bucket for multiprocessing environments using file-based locking.
246
247
```python { .api }
248
class MultiprocessBucket(AbstractBucket):
249
def __init__(self, rates: List[Rate], items: List[RateItem], mp_lock: LockType):
250
"""
251
Initialize multiprocess-safe bucket.
252
253
Parameters:
254
- rates: List of rate limit configurations
255
- items: Shared list proxy for storing rate items
256
- mp_lock: Multiprocessing lock for synchronization
257
258
Note: Use the init() class method for normal initialization.
259
"""
260
261
@classmethod
262
def init(cls, rates: List[Rate]):
263
"""
264
Create multiprocess bucket with shared memory.
265
266
Parameters:
267
- rates: List of rate limit configurations
268
269
Returns:
270
- MultiprocessBucket: Configured bucket with shared state
271
272
Characteristics:
273
- Safe across multiple processes
274
- Uses multiprocessing.Manager for shared state
275
- Built on InMemoryBucket with process synchronization
276
"""
277
```
278
279
Usage example:
280
281
```python
282
from pyrate_limiter import MultiprocessBucket, Rate, Duration, Limiter
283
284
# For multiprocessing environments
285
bucket = MultiprocessBucket.init(
286
rates=[Rate(20, Duration.SECOND)]
287
)
288
289
limiter = Limiter(bucket)
290
```
291
292
## Abstract Bucket Interface
293
294
All buckets implement the same interface for consistent behavior.
295
296
```python { .api }
297
class AbstractBucket(ABC):
298
rates: List[Rate]
299
failing_rate: Optional[Rate]
300
301
def put(self, item: RateItem) -> Union[bool, Awaitable[bool]]:
302
"""Put an item in the bucket, return True if successful."""
303
304
def leak(self, current_timestamp: Optional[int] = None) -> Union[int, Awaitable[int]]:
305
"""Remove outdated items from bucket."""
306
307
def flush(self) -> Union[None, Awaitable[None]]:
308
"""Flush the entire bucket."""
309
310
def count(self) -> Union[int, Awaitable[int]]:
311
"""Count number of items in bucket."""
312
313
def peek(self, index: int) -> Union[Optional[RateItem], Awaitable[Optional[RateItem]]]:
314
"""Peek at item at specific index."""
315
316
def waiting(self, item: RateItem) -> Union[int, Awaitable[int]]:
317
"""Calculate time until bucket becomes available."""
318
319
def limiter_lock(self) -> Optional[object]:
320
"""Additional lock for multiprocessing environments."""
321
322
def close(self) -> None:
323
"""Release resources held by bucket."""
324
325
def __enter__(self):
326
"""Enter context manager."""
327
328
def __exit__(self, exc_type, exc, tb) -> None:
329
"""Exit context manager and cleanup resources."""
330
```
331
332
## Bucket Wrapper
333
334
For converting synchronous buckets to asynchronous interface.
335
336
```python { .api }
337
class BucketAsyncWrapper(AbstractBucket):
338
def __init__(self, bucket: AbstractBucket):
339
"""
340
Wrap any bucket to provide async interface.
341
342
Parameters:
343
- bucket: Synchronous bucket to wrap
344
"""
345
```
346
347
Usage example:
348
349
```python
350
from pyrate_limiter import BucketAsyncWrapper, InMemoryBucket, Rate, Duration
351
352
# Wrap sync bucket for async usage
353
sync_bucket = InMemoryBucket([Rate(10, Duration.SECOND)])
354
async_bucket = BucketAsyncWrapper(sync_bucket)
355
356
# Use in async context
357
async def async_rate_limiting():
358
success = await async_bucket.put(RateItem("user123", 1640995200000))
359
count = await async_bucket.count()
360
```
361
362
## Choosing a Storage Backend
363
364
- **InMemoryBucket**: Single-process applications, temporary rate limiting
365
- **SQLiteBucket**: Cross-process applications, persistent rate limiting, moderate scale
366
- **RedisBucket**: Distributed applications, high scale, shared rate limiting
367
- **PostgresBucket**: Enterprise applications, ACID compliance, complex queries
368
- **MultiprocessBucket**: Multiprocessing applications with file-based coordination