or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-limiting.mdfactory-patterns.mdindex.mdrate-configuration.mdstorage-backends.mdtime-sources.md

storage-backends.mddocs/

0

# Storage Backends

1

2

Multiple storage backends for persisting rate limit state across application restarts, processes, and distributed deployments. Each backend provides the same interface while offering different persistence and scalability characteristics.

3

4

## Capabilities

5

6

### In-Memory Bucket

7

8

Fast, thread-safe bucket using native Python lists for simple applications.

9

10

```python { .api }

11

class InMemoryBucket(AbstractBucket):

12

def __init__(self, rates: List[Rate]):

13

"""

14

Initialize in-memory bucket with rate configurations.

15

16

Parameters:

17

- rates: List of rate limit configurations

18

19

Characteristics:

20

- Fast and precise

21

- Not persistent across restarts

22

- Not scalable across processes

23

- Suitable for single-process applications

24

"""

25

```

26

27

Usage example:

28

29

```python

30

from pyrate_limiter import InMemoryBucket, Rate, Duration, Limiter

31

32

# Create in-memory bucket

33

rates = [Rate(10, Duration.SECOND), Rate(100, Duration.MINUTE)]

34

bucket = InMemoryBucket(rates)

35

36

# Use with limiter

37

limiter = Limiter(bucket)

38

```

39

40

### SQLite Bucket

41

42

Persistent, file-based bucket using SQLite for cross-process rate limiting.

43

44

```python { .api }

45

class SQLiteBucket(AbstractBucket):

46

def __init__(

47

self,

48

rates: List[Rate],

49

conn: sqlite3.Connection,

50

table: str,

51

lock=None

52

):

53

"""

54

Initialize SQLite bucket with database connection.

55

56

Parameters:

57

- rates: List of rate limit configurations

58

- conn: SQLite database connection

59

- table: Table name for storing rate data

60

- lock: Optional lock for thread safety

61

"""

62

63

@classmethod

64

def init_from_file(

65

cls,

66

rates: List[Rate],

67

table: str = "rate_bucket",

68

db_path: Optional[str] = None,

69

create_new_table: bool = True,

70

use_file_lock: bool = False

71

) -> "SQLiteBucket":

72

"""

73

Create SQLite bucket from file path.

74

75

Parameters:

76

- rates: List of rate limit configurations

77

- table: Table name for rate data (default: "rate_bucket")

78

- db_path: Path to SQLite database file (None for temporary file)

79

- create_new_table: Create table if it doesn't exist (default: True)

80

- use_file_lock: Enable file locking for multiprocessing

81

82

Returns:

83

- SQLiteBucket: Configured bucket instance

84

"""

85

```

86

87

Usage example:

88

89

```python

90

from pyrate_limiter import SQLiteBucket, Rate, Duration, Limiter

91

import sqlite3

92

93

# Method 1: Direct connection

94

conn = sqlite3.connect("rate_limits.db")

95

bucket = SQLiteBucket([Rate(10, Duration.SECOND)], conn, "api_limits")

96

97

# Method 2: From file (recommended)

98

bucket = SQLiteBucket.init_from_file(

99

rates=[Rate(5, Duration.SECOND)],

100

table="user_limits",

101

db_path="rate_limits.db",

102

create_new_table=True,

103

use_file_lock=True # For multiprocessing

104

)

105

106

limiter = Limiter(bucket)

107

```

108

109

### Redis Bucket

110

111

Distributed bucket using Redis for scalable, cross-instance rate limiting.

112

113

```python { .api }

114

class RedisBucket(AbstractBucket):

115

def __init__(

116

self,

117

rates: List[Rate],

118

redis: Union[Redis, AsyncRedis],

119

bucket_key: str,

120

script_hash: str

121

):

122

"""

123

Initialize Redis bucket with Redis client.

124

125

Parameters:

126

- rates: List of rate limit configurations

127

- redis: Redis client (sync or async)

128

- bucket_key: Key prefix for Redis operations

129

- script_hash: Hash of the loaded Lua script

130

131

Note: Use the init() class method for normal initialization.

132

"""

133

134

@classmethod

135

def init(

136

cls,

137

rates: List[Rate],

138

redis: Union[Redis, AsyncRedis],

139

bucket_key: str

140

):

141

"""

142

Create Redis bucket with automatic Lua script loading.

143

144

Parameters:

145

- rates: List of rate limit configurations

146

- redis: Redis client (sync or async)

147

- bucket_key: Key prefix for Redis operations

148

149

Returns:

150

- RedisBucket: Configured bucket instance (or awaitable for async)

151

152

Characteristics:

153

- Distributed and scalable

154

- Supports both sync and async operations

155

- Requires Redis server

156

- Atomic operations using Lua scripts

157

"""

158

```

159

160

Usage example:

161

162

```python

163

from pyrate_limiter import RedisBucket, Rate, Duration, Limiter

164

import redis

165

166

# Sync Redis

167

redis_client = redis.Redis(host='localhost', port=6379, db=0)

168

bucket = RedisBucket.init(

169

rates=[Rate(100, Duration.MINUTE)],

170

redis=redis_client,

171

bucket_key="api_rate_limits"

172

)

173

174

# Async Redis

175

import redis.asyncio as aioredis

176

177

async def async_example():

178

redis_client = await aioredis.from_url("redis://localhost")

179

bucket = await RedisBucket.init(

180

rates=[Rate(50, Duration.SECOND)],

181

redis=redis_client,

182

bucket_key="async_limits"

183

)

184

limiter = Limiter(bucket)

185

186

success = await limiter.try_acquire_async("user123")

187

await redis_client.close()

188

```

189

190

### PostgreSQL Bucket

191

192

Enterprise-grade bucket using PostgreSQL for high-performance distributed rate limiting.

193

194

```python { .api }

195

class PostgresBucket(AbstractBucket):

196

def __init__(

197

self,

198

pool: ConnectionPool,

199

table: str,

200

rates: List[Rate]

201

):

202

"""

203

Initialize PostgreSQL bucket with connection pool.

204

205

Parameters:

206

- pool: PostgreSQL connection pool

207

- table: Table name for rate data

208

- rates: List of rate limit configurations

209

210

Characteristics:

211

- High performance and reliability

212

- ACID compliance

213

- Supports connection pooling

214

- Suitable for enterprise applications

215

"""

216

```

217

218

Usage example:

219

220

```python

221

from pyrate_limiter import PostgresBucket, Rate, Duration, Limiter

222

from psycopg_pool import ConnectionPool

223

224

# Create connection pool

225

pool = ConnectionPool(

226

"host=localhost dbname=mydb user=myuser password=mypass",

227

min_size=1,

228

max_size=10

229

)

230

231

bucket = PostgresBucket(

232

pool=pool,

233

table="rate_limits",

234

rates=[Rate(1000, Duration.HOUR)]

235

)

236

237

limiter = Limiter(bucket)

238

239

# Don't forget to close the pool when done

240

# pool.close()

241

```

242

243

### Multiprocess Bucket

244

245

Wrapper bucket for multiprocessing environments using file-based locking.

246

247

```python { .api }

248

class MultiprocessBucket(AbstractBucket):

249

def __init__(self, rates: List[Rate], items: List[RateItem], mp_lock: LockType):

250

"""

251

Initialize multiprocess-safe bucket.

252

253

Parameters:

254

- rates: List of rate limit configurations

255

- items: Shared list proxy for storing rate items

256

- mp_lock: Multiprocessing lock for synchronization

257

258

Note: Use the init() class method for normal initialization.

259

"""

260

261

@classmethod

262

def init(cls, rates: List[Rate]):

263

"""

264

Create multiprocess bucket with shared memory.

265

266

Parameters:

267

- rates: List of rate limit configurations

268

269

Returns:

270

- MultiprocessBucket: Configured bucket with shared state

271

272

Characteristics:

273

- Safe across multiple processes

274

- Uses multiprocessing.Manager for shared state

275

- Built on InMemoryBucket with process synchronization

276

"""

277

```

278

279

Usage example:

280

281

```python

282

from pyrate_limiter import MultiprocessBucket, Rate, Duration, Limiter

283

284

# For multiprocessing environments

285

bucket = MultiprocessBucket.init(

286

rates=[Rate(20, Duration.SECOND)]

287

)

288

289

limiter = Limiter(bucket)

290

```

291

292

## Abstract Bucket Interface

293

294

All buckets implement the same interface for consistent behavior.

295

296

```python { .api }

297

class AbstractBucket(ABC):

298

rates: List[Rate]

299

failing_rate: Optional[Rate]

300

301

def put(self, item: RateItem) -> Union[bool, Awaitable[bool]]:

302

"""Put an item in the bucket, return True if successful."""

303

304

def leak(self, current_timestamp: Optional[int] = None) -> Union[int, Awaitable[int]]:

305

"""Remove outdated items from bucket."""

306

307

def flush(self) -> Union[None, Awaitable[None]]:

308

"""Flush the entire bucket."""

309

310

def count(self) -> Union[int, Awaitable[int]]:

311

"""Count number of items in bucket."""

312

313

def peek(self, index: int) -> Union[Optional[RateItem], Awaitable[Optional[RateItem]]]:

314

"""Peek at item at specific index."""

315

316

def waiting(self, item: RateItem) -> Union[int, Awaitable[int]]:

317

"""Calculate time until bucket becomes available."""

318

319

def limiter_lock(self) -> Optional[object]:

320

"""Additional lock for multiprocessing environments."""

321

322

def close(self) -> None:

323

"""Release resources held by bucket."""

324

325

def __enter__(self):

326

"""Enter context manager."""

327

328

def __exit__(self, exc_type, exc, tb) -> None:

329

"""Exit context manager and cleanup resources."""

330

```

331

332

## Bucket Wrapper

333

334

For converting synchronous buckets to asynchronous interface.

335

336

```python { .api }

337

class BucketAsyncWrapper(AbstractBucket):

338

def __init__(self, bucket: AbstractBucket):

339

"""

340

Wrap any bucket to provide async interface.

341

342

Parameters:

343

- bucket: Synchronous bucket to wrap

344

"""

345

```

346

347

Usage example:

348

349

```python

350

from pyrate_limiter import BucketAsyncWrapper, InMemoryBucket, Rate, Duration

351

352

# Wrap sync bucket for async usage

353

sync_bucket = InMemoryBucket([Rate(10, Duration.SECOND)])

354

async_bucket = BucketAsyncWrapper(sync_bucket)

355

356

# Use in async context

357

async def async_rate_limiting():

358

success = await async_bucket.put(RateItem("user123", 1640995200000))

359

count = await async_bucket.count()

360

```

361

362

## Choosing a Storage Backend

363

364

- **InMemoryBucket**: Single-process applications, temporary rate limiting

365

- **SQLiteBucket**: Cross-process applications, persistent rate limiting, moderate scale

366

- **RedisBucket**: Distributed applications, high scale, shared rate limiting

367

- **PostgresBucket**: Enterprise applications, ACID compliance, complex queries

368

- **MultiprocessBucket**: Multiprocessing applications with file-based coordination