or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-caching.mddisk-serialization.mddjango-integration.mdfanout-cache.mdindex.mdpersistent-data-structures.mdrecipe-functions.mdsynchronization-primitives.md

recipe-functions.mddocs/

0

# Recipe Functions

1

2

DiskCache provides decorator functions for advanced caching patterns including throttling, serialization barriers, and memoization with cache stampede protection. These functions return decorators that can be applied to other functions to add caching behavior.

3

4

## Capabilities

5

6

### Throttle Decorator

7

8

Rate limiting decorator that restricts function calls to a specified frequency.

9

10

```python { .api }

11

def throttle(cache, count, seconds, name=None, expire=None, tag=None,

12

time_func=time.time, sleep_func=time.sleep):

13

"""

14

Create throttling decorator that limits function calls to specified rate.

15

16

Args:

17

cache (Cache or FanoutCache): Cache instance for rate tracking

18

count (int): Maximum number of calls allowed

19

seconds (float): Time window in seconds for the call limit

20

name (str, optional): Name for throttle key. Default uses function name.

21

expire (float, optional): Expiration time for throttle data

22

tag (str, optional): Tag for grouping related throttle data

23

time_func (callable): Function to get current time. Default time.time.

24

sleep_func (callable): Function for sleeping/waiting. Default time.sleep.

25

26

Returns:

27

Decorator function that enforces the specified call rate

28

29

Usage:

30

@throttle(cache, count=5, seconds=60)

31

def api_call():

32

# This function can only be called 5 times per minute

33

pass

34

"""

35

```

36

37

### Barrier Decorator

38

39

Serialization decorator that ensures only one instance of the decorated function runs at a time using a provided lock factory.

40

41

```python { .api }

42

def barrier(cache, lock_factory, name=None, expire=None, tag=None):

43

"""

44

Create barrier decorator that serializes access to function using locks.

45

46

Args:

47

cache (Cache or FanoutCache): Cache instance for lock coordination

48

lock_factory (callable): Function that creates lock instances

49

name (str, optional): Name for barrier key. Default uses function name.

50

expire (float, optional): Expiration time for lock

51

tag (str, optional): Tag for grouping related locks

52

53

Returns:

54

Decorator function that serializes function execution

55

56

Usage:

57

@barrier(cache, diskcache.Lock)

58

def critical_function():

59

# Only one instance of this function runs at a time

60

pass

61

"""

62

```

63

64

### Memoize Stampede Decorator

65

66

Memoization decorator with cache stampede protection using early expiration and probabilistic refresh.

67

68

```python { .api }

69

def memoize_stampede(cache, expire, name=None, typed=False, tag=None,

70

beta=1, ignore=()):

71

"""

72

Create memoization decorator with cache stampede protection.

73

74

Uses probabilistic early expiration to prevent cache stampede - the

75

"thundering herd" problem where many processes simultaneously try to

76

regenerate an expired cached value.

77

78

Args:

79

cache (Cache or FanoutCache): Cache instance for memoization

80

expire (float): Base expiration time in seconds

81

name (str, optional): Name for memoized function. Default function name.

82

typed (bool): Distinguish arguments by type. Default False.

83

tag (str, optional): Tag for grouping cached results

84

beta (float): Early expiration factor. Default 1. Higher values

85

increase probability of early expiration.

86

ignore (tuple): Argument positions/names to ignore in cache key

87

88

Returns:

89

Memoization decorator with stampede protection

90

91

Usage:

92

@memoize_stampede(cache, expire=3600, beta=1.5)

93

def expensive_computation(x, y):

94

# Cached with stampede protection

95

return x ** y

96

"""

97

```

98

99

## Usage Examples

100

101

### Throttling API Calls

102

103

```python

104

import diskcache

105

import time

106

import requests

107

108

cache = diskcache.Cache('/tmp/throttle')

109

110

# Limit API calls to 10 per minute

111

@diskcache.throttle(cache, count=10, seconds=60)

112

def call_api(endpoint):

113

"""API calls are automatically throttled to 10 per minute."""

114

response = requests.get(f"https://api.example.com/{endpoint}")

115

return response.json()

116

117

# These calls will be throttled

118

for i in range(20):

119

try:

120

result = call_api(f"endpoint_{i}")

121

print(f"Call {i}: Success")

122

except Exception as e:

123

print(f"Call {i}: {e}")

124

time.sleep(1)

125

```

126

127

### Custom Throttling Parameters

128

129

```python

130

import diskcache

131

import time

132

133

cache = diskcache.Cache('/tmp/custom_throttle')

134

135

# Custom throttle with different time and sleep functions

136

@diskcache.throttle(

137

cache,

138

count=3,

139

seconds=10,

140

name='custom_function',

141

expire=3600,

142

tag='rate_limited',

143

time_func=time.time,

144

sleep_func=lambda x: time.sleep(x * 0.5) # Sleep for half the required time

145

)

146

def custom_throttled_function():

147

print(f"Function called at {time.time()}")

148

return "result"

149

150

# Test throttling behavior

151

for i in range(6):

152

print(f"Attempt {i + 1}")

153

result = custom_throttled_function()

154

print(f"Result: {result}")

155

```

156

157

### Barrier for Critical Sections

158

159

```python

160

import diskcache

161

import threading

162

import time

163

164

cache = diskcache.Cache('/tmp/barrier')

165

166

# Use Lock as the lock factory for barriers

167

@diskcache.barrier(cache, diskcache.Lock, expire=60)

168

def critical_file_operation(filename):

169

"""Only one thread can perform file operations at a time."""

170

print(f"Starting file operation on {filename}")

171

time.sleep(2) # Simulate file I/O

172

with open(f"/tmp/{filename}", 'w') as f:

173

f.write(f"Data written at {time.time()}")

174

print(f"Completed file operation on {filename}")

175

return f"Processed {filename}"

176

177

# Multiple threads trying to access the critical section

178

def worker(worker_id):

179

result = critical_file_operation(f"file_{worker_id}.txt")

180

print(f"Worker {worker_id}: {result}")

181

182

threads = []

183

for i in range(5):

184

t = threading.Thread(target=worker, args=(i,))

185

threads.append(t)

186

t.start()

187

188

for t in threads:

189

t.join()

190

```

191

192

### Custom Barrier with RLock

193

194

```python

195

import diskcache

196

import threading

197

198

cache = diskcache.Cache('/tmp/rlock_barrier')

199

200

# Use RLock for re-entrant barriers

201

@diskcache.barrier(cache, diskcache.RLock, name='reentrant_critical')

202

def recursive_critical_function(depth):

203

"""Re-entrant critical function using RLock barrier."""

204

if depth <= 0:

205

return "Done"

206

207

print(f"In critical section at depth {depth}")

208

time.sleep(0.5)

209

210

# This will re-acquire the same lock (re-entrant)

211

result = recursive_critical_function(depth - 1)

212

return f"Depth {depth}: {result}"

213

214

result = recursive_critical_function(3)

215

print(result)

216

```

217

218

### Memoization with Stampede Protection

219

220

```python

221

import diskcache

222

import time

223

import random

224

import threading

225

226

cache = diskcache.Cache('/tmp/memoize_stampede')

227

228

@diskcache.memoize_stampede(

229

cache,

230

expire=10, # Base expiration of 10 seconds

231

beta=1.5, # 50% higher chance of early refresh

232

tag='expensive_computation'

233

)

234

def expensive_computation(n):

235

"""Expensive computation with stampede protection."""

236

print(f"Computing expensive_computation({n}) - this should happen rarely")

237

time.sleep(2) # Simulate expensive computation

238

return n ** 2 + random.randint(1, 100)

239

240

def worker(worker_id, n):

241

result = expensive_computation(n)

242

print(f"Worker {worker_id}: expensive_computation({n}) = {result}")

243

244

# Simulate many workers requesting the same computation

245

# The stampede protection should prevent multiple simultaneous computations

246

threads = []

247

for i in range(10):

248

t = threading.Thread(target=worker, args=(i, 42)) # All workers use same input

249

threads.append(t)

250

t.start()

251

252

for t in threads:

253

t.join()

254

255

print("\nWaiting for potential early expiration...")

256

time.sleep(8) # Wait close to expiration time

257

258

# These calls might trigger early refresh due to beta factor

259

for i in range(3):

260

result = expensive_computation(42)

261

print(f"Late call {i}: {result}")

262

```

263

264

### Advanced Memoization Options

265

266

```python

267

import diskcache

268

269

cache = diskcache.Cache('/tmp/advanced_memoize')

270

271

@diskcache.memoize_stampede(

272

cache,

273

expire=300, # 5 minutes

274

typed=True, # Distinguish between int(1) and float(1.0)

275

ignore=(2,), # Ignore third argument in cache key

276

tag='advanced_function',

277

beta=2.0 # Higher early expiration probability

278

)

279

def advanced_function(x, y, debug_info, *args, **kwargs):

280

"""

281

Function with advanced memoization options.

282

283

- typed=True: f(1, 2.0) and f(1.0, 2.0) are cached separately

284

- ignore=(2,): debug_info parameter doesn't affect caching

285

- Supports *args and **kwargs

286

"""

287

print(f"Computing advanced_function({x}, {y}, ignored={debug_info})")

288

time.sleep(1)

289

return x * y + sum(args) + sum(kwargs.values())

290

291

# These calls will be cached based on x, y, args, and kwargs only

292

# debug_info is ignored due to ignore=(2,)

293

result1 = advanced_function(2, 3, "debug1", 10, extra=5)

294

result2 = advanced_function(2, 3, "debug2", 10, extra=5) # Cache hit (debug_info ignored)

295

result3 = advanced_function(2.0, 3.0, "debug3", 10, extra=5) # Different due to typed=True

296

297

print(f"Result 1: {result1}")

298

print(f"Result 2: {result2}") # Should be same as result1

299

print(f"Result 3: {result3}") # Should be same value but was computed separately

300

```

301

302

### Combining Recipe Functions

303

304

```python

305

import diskcache

306

import time

307

308

cache = diskcache.Cache('/tmp/combined')

309

310

# Combine throttling and memoization

311

@diskcache.throttle(cache, count=5, seconds=60, name='throttled_api')

312

@diskcache.memoize_stampede(cache, expire=300, name='memoized_api', beta=1.0)

313

def api_with_caching_and_throttling(query):

314

"""

315

API function with both throttling and memoization.

316

- Throttled to 5 calls per minute

317

- Results cached for 5 minutes with stampede protection

318

"""

319

print(f"Making actual API call for query: {query}")

320

time.sleep(1) # Simulate API delay

321

return f"API result for {query}"

322

323

# First calls - will be computed and cached

324

print("First batch of calls:")

325

for i in range(3):

326

result = api_with_caching_and_throttling(f"query_{i}")

327

print(f"Call {i}: {result}")

328

329

print("\nSecond batch - should hit cache:")

330

for i in range(3):

331

result = api_with_caching_and_throttling(f"query_{i}")

332

print(f"Cached call {i}: {result}")

333

334

print("\nMany new calls - will be throttled:")

335

for i in range(10):

336

try:

337

result = api_with_caching_and_throttling(f"new_query_{i}")

338

print(f"New call {i}: {result}")

339

except Exception as e:

340

print(f"New call {i}: Throttled")

341

```

342

343

### Custom Lock Factory

344

345

```python

346

import diskcache

347

348

cache = diskcache.Cache('/tmp/custom_lock')

349

350

# Custom lock factory with specific settings

351

def custom_lock_factory(cache, key, expire=None, tag=None):

352

return diskcache.RLock(cache, key, expire=expire or 120, tag=tag or 'custom')

353

354

@diskcache.barrier(cache, custom_lock_factory, expire=180, tag='critical_ops')

355

def critical_operation_with_custom_lock():

356

"""Uses custom lock factory with 2-minute default expiration."""

357

print("Performing critical operation with custom lock")

358

time.sleep(1)

359

return "Operation completed"

360

361

result = critical_operation_with_custom_lock()

362

print(result)

363

```

364

365

## Best Practices

366

367

### Throttling Best Practices

368

369

```python

370

# Set reasonable limits and handle throttling gracefully

371

@diskcache.throttle(cache, count=100, seconds=3600, expire=7200) # 100/hour, data expires in 2 hours

372

def rate_limited_operation():

373

pass

374

375

# Use different names for different rate limits

376

@diskcache.throttle(cache, count=10, seconds=60, name='api_writes')

377

def write_api():

378

pass

379

380

@diskcache.throttle(cache, count=100, seconds=60, name='api_reads')

381

def read_api():

382

pass

383

```

384

385

### Memoization Best Practices

386

387

```python

388

# Use appropriate expiration times

389

@diskcache.memoize_stampede(cache, expire=3600, beta=1.2) # 1 hour with 20% early refresh

390

def hourly_report():

391

pass

392

393

@diskcache.memoize_stampede(cache, expire=86400, beta=1.5) # 1 day with 50% early refresh

394

def daily_summary():

395

pass

396

397

# Ignore volatile arguments

398

@diskcache.memoize_stampede(cache, expire=300, ignore=('timestamp', 'request_id'))

399

def process_request(data, timestamp=None, request_id=None):

400

# timestamp and request_id don't affect the computation

401

pass

402

```

403

404

### Error Handling

405

406

```python

407

import diskcache

408

import logging

409

410

cache = diskcache.Cache('/tmp/error_handling')

411

412

@diskcache.throttle(cache, count=5, seconds=60)

413

def fragile_operation():

414

try:

415

# Operation that might fail

416

risky_computation()

417

return "success"

418

except Exception as e:

419

logging.error(f"Operation failed: {e}")

420

# Throttling still applies even if function raises exception

421

raise

422

423

# Graceful degradation when cache is not available

424

try:

425

@diskcache.memoize_stampede(cache, expire=300)

426

def cached_operation(x):

427

return expensive_computation(x)

428

429

except Exception as e:

430

logging.warning(f"Cache not available: {e}")

431

# Fallback to uncached version

432

def cached_operation(x):

433

return expensive_computation(x)

434

```