0
# Recipe Functions
1
2
DiskCache provides decorator functions for advanced caching patterns including throttling, serialization barriers, and memoization with cache stampede protection. These functions return decorators that can be applied to other functions to add caching behavior.
3
4
## Capabilities
5
6
### Throttle Decorator
7
8
Rate limiting decorator that restricts function calls to a specified frequency.
9
10
```python { .api }
11
def throttle(cache, count, seconds, name=None, expire=None, tag=None,
12
time_func=time.time, sleep_func=time.sleep):
13
"""
14
Create throttling decorator that limits function calls to specified rate.
15
16
Args:
17
cache (Cache or FanoutCache): Cache instance for rate tracking
18
count (int): Maximum number of calls allowed
19
seconds (float): Time window in seconds for the call limit
20
name (str, optional): Name for throttle key. Default uses function name.
21
expire (float, optional): Expiration time for throttle data
22
tag (str, optional): Tag for grouping related throttle data
23
time_func (callable): Function to get current time. Default time.time.
24
sleep_func (callable): Function for sleeping/waiting. Default time.sleep.
25
26
Returns:
27
Decorator function that enforces the specified call rate
28
29
Usage:
30
@throttle(cache, count=5, seconds=60)
31
def api_call():
32
# This function can only be called 5 times per minute
33
pass
34
"""
35
```
36
37
### Barrier Decorator
38
39
Serialization decorator that ensures only one instance of the decorated function runs at a time using a provided lock factory.
40
41
```python { .api }
42
def barrier(cache, lock_factory, name=None, expire=None, tag=None):
43
"""
44
Create barrier decorator that serializes access to function using locks.
45
46
Args:
47
cache (Cache or FanoutCache): Cache instance for lock coordination
48
lock_factory (callable): Function that creates lock instances
49
name (str, optional): Name for barrier key. Default uses function name.
50
expire (float, optional): Expiration time for lock
51
tag (str, optional): Tag for grouping related locks
52
53
Returns:
54
Decorator function that serializes function execution
55
56
Usage:
57
@barrier(cache, diskcache.Lock)
58
def critical_function():
59
# Only one instance of this function runs at a time
60
pass
61
"""
62
```
63
64
### Memoize Stampede Decorator
65
66
Memoization decorator with cache stampede protection using early expiration and probabilistic refresh.
67
68
```python { .api }
69
def memoize_stampede(cache, expire, name=None, typed=False, tag=None,
70
beta=1, ignore=()):
71
"""
72
Create memoization decorator with cache stampede protection.
73
74
Uses probabilistic early expiration to prevent cache stampede - the
75
"thundering herd" problem where many processes simultaneously try to
76
regenerate an expired cached value.
77
78
Args:
79
cache (Cache or FanoutCache): Cache instance for memoization
80
expire (float): Base expiration time in seconds
81
name (str, optional): Name for memoized function. Default function name.
82
typed (bool): Distinguish arguments by type. Default False.
83
tag (str, optional): Tag for grouping cached results
84
beta (float): Early expiration factor. Default 1. Higher values
85
increase probability of early expiration.
86
ignore (tuple): Argument positions/names to ignore in cache key
87
88
Returns:
89
Memoization decorator with stampede protection
90
91
Usage:
92
@memoize_stampede(cache, expire=3600, beta=1.5)
93
def expensive_computation(x, y):
94
# Cached with stampede protection
95
return x ** y
96
"""
97
```
98
99
## Usage Examples
100
101
### Throttling API Calls
102
103
```python
104
import diskcache
105
import time
106
import requests
107
108
cache = diskcache.Cache('/tmp/throttle')
109
110
# Limit API calls to 10 per minute
111
@diskcache.throttle(cache, count=10, seconds=60)
112
def call_api(endpoint):
113
"""API calls are automatically throttled to 10 per minute."""
114
response = requests.get(f"https://api.example.com/{endpoint}")
115
return response.json()
116
117
# These calls will be throttled
118
for i in range(20):
119
try:
120
result = call_api(f"endpoint_{i}")
121
print(f"Call {i}: Success")
122
except Exception as e:
123
print(f"Call {i}: {e}")
124
time.sleep(1)
125
```
126
127
### Custom Throttling Parameters
128
129
```python
130
import diskcache
131
import time
132
133
cache = diskcache.Cache('/tmp/custom_throttle')
134
135
# Custom throttle with different time and sleep functions
136
@diskcache.throttle(
137
cache,
138
count=3,
139
seconds=10,
140
name='custom_function',
141
expire=3600,
142
tag='rate_limited',
143
time_func=time.time,
144
sleep_func=lambda x: time.sleep(x * 0.5) # Sleep for half the required time
145
)
146
def custom_throttled_function():
147
print(f"Function called at {time.time()}")
148
return "result"
149
150
# Test throttling behavior
151
for i in range(6):
152
print(f"Attempt {i + 1}")
153
result = custom_throttled_function()
154
print(f"Result: {result}")
155
```
156
157
### Barrier for Critical Sections
158
159
```python
160
import diskcache
161
import threading
162
import time
163
164
cache = diskcache.Cache('/tmp/barrier')
165
166
# Use Lock as the lock factory for barriers
167
@diskcache.barrier(cache, diskcache.Lock, expire=60)
168
def critical_file_operation(filename):
169
"""Only one thread can perform file operations at a time."""
170
print(f"Starting file operation on {filename}")
171
time.sleep(2) # Simulate file I/O
172
with open(f"/tmp/{filename}", 'w') as f:
173
f.write(f"Data written at {time.time()}")
174
print(f"Completed file operation on {filename}")
175
return f"Processed {filename}"
176
177
# Multiple threads trying to access the critical section
178
def worker(worker_id):
179
result = critical_file_operation(f"file_{worker_id}.txt")
180
print(f"Worker {worker_id}: {result}")
181
182
threads = []
183
for i in range(5):
184
t = threading.Thread(target=worker, args=(i,))
185
threads.append(t)
186
t.start()
187
188
for t in threads:
189
t.join()
190
```
191
192
### Custom Barrier with RLock
193
194
```python
195
import diskcache
196
import threading
197
198
cache = diskcache.Cache('/tmp/rlock_barrier')
199
200
# Use RLock for re-entrant barriers
201
@diskcache.barrier(cache, diskcache.RLock, name='reentrant_critical')
202
def recursive_critical_function(depth):
203
"""Re-entrant critical function using RLock barrier."""
204
if depth <= 0:
205
return "Done"
206
207
print(f"In critical section at depth {depth}")
208
time.sleep(0.5)
209
210
# This will re-acquire the same lock (re-entrant)
211
result = recursive_critical_function(depth - 1)
212
return f"Depth {depth}: {result}"
213
214
result = recursive_critical_function(3)
215
print(result)
216
```
217
218
### Memoization with Stampede Protection
219
220
```python
221
import diskcache
222
import time
223
import random
224
import threading
225
226
cache = diskcache.Cache('/tmp/memoize_stampede')
227
228
@diskcache.memoize_stampede(
229
cache,
230
expire=10, # Base expiration of 10 seconds
231
beta=1.5, # 50% higher chance of early refresh
232
tag='expensive_computation'
233
)
234
def expensive_computation(n):
235
"""Expensive computation with stampede protection."""
236
print(f"Computing expensive_computation({n}) - this should happen rarely")
237
time.sleep(2) # Simulate expensive computation
238
return n ** 2 + random.randint(1, 100)
239
240
def worker(worker_id, n):
241
result = expensive_computation(n)
242
print(f"Worker {worker_id}: expensive_computation({n}) = {result}")
243
244
# Simulate many workers requesting the same computation
245
# The stampede protection should prevent multiple simultaneous computations
246
threads = []
247
for i in range(10):
248
t = threading.Thread(target=worker, args=(i, 42)) # All workers use same input
249
threads.append(t)
250
t.start()
251
252
for t in threads:
253
t.join()
254
255
print("\nWaiting for potential early expiration...")
256
time.sleep(8) # Wait close to expiration time
257
258
# These calls might trigger early refresh due to beta factor
259
for i in range(3):
260
result = expensive_computation(42)
261
print(f"Late call {i}: {result}")
262
```
263
264
### Advanced Memoization Options
265
266
```python
267
import diskcache
268
269
cache = diskcache.Cache('/tmp/advanced_memoize')
270
271
@diskcache.memoize_stampede(
272
cache,
273
expire=300, # 5 minutes
274
typed=True, # Distinguish between int(1) and float(1.0)
275
ignore=(2,), # Ignore third argument in cache key
276
tag='advanced_function',
277
beta=2.0 # Higher early expiration probability
278
)
279
def advanced_function(x, y, debug_info, *args, **kwargs):
280
"""
281
Function with advanced memoization options.
282
283
- typed=True: f(1, 2.0) and f(1.0, 2.0) are cached separately
284
- ignore=(2,): debug_info parameter doesn't affect caching
285
- Supports *args and **kwargs
286
"""
287
print(f"Computing advanced_function({x}, {y}, ignored={debug_info})")
288
time.sleep(1)
289
return x * y + sum(args) + sum(kwargs.values())
290
291
# These calls will be cached based on x, y, args, and kwargs only
292
# debug_info is ignored due to ignore=(2,)
293
result1 = advanced_function(2, 3, "debug1", 10, extra=5)
294
result2 = advanced_function(2, 3, "debug2", 10, extra=5) # Cache hit (debug_info ignored)
295
result3 = advanced_function(2.0, 3.0, "debug3", 10, extra=5) # Different due to typed=True
296
297
print(f"Result 1: {result1}")
298
print(f"Result 2: {result2}") # Should be same as result1
299
print(f"Result 3: {result3}") # Should be same value but was computed separately
300
```
301
302
### Combining Recipe Functions
303
304
```python
305
import diskcache
306
import time
307
308
cache = diskcache.Cache('/tmp/combined')
309
310
# Combine throttling and memoization
311
@diskcache.throttle(cache, count=5, seconds=60, name='throttled_api')
312
@diskcache.memoize_stampede(cache, expire=300, name='memoized_api', beta=1.0)
313
def api_with_caching_and_throttling(query):
314
"""
315
API function with both throttling and memoization.
316
- Throttled to 5 calls per minute
317
- Results cached for 5 minutes with stampede protection
318
"""
319
print(f"Making actual API call for query: {query}")
320
time.sleep(1) # Simulate API delay
321
return f"API result for {query}"
322
323
# First calls - will be computed and cached
324
print("First batch of calls:")
325
for i in range(3):
326
result = api_with_caching_and_throttling(f"query_{i}")
327
print(f"Call {i}: {result}")
328
329
print("\nSecond batch - should hit cache:")
330
for i in range(3):
331
result = api_with_caching_and_throttling(f"query_{i}")
332
print(f"Cached call {i}: {result}")
333
334
print("\nMany new calls - will be throttled:")
335
for i in range(10):
336
try:
337
result = api_with_caching_and_throttling(f"new_query_{i}")
338
print(f"New call {i}: {result}")
339
except Exception as e:
340
print(f"New call {i}: Throttled")
341
```
342
343
### Custom Lock Factory
344
345
```python
346
import diskcache
347
348
cache = diskcache.Cache('/tmp/custom_lock')
349
350
# Custom lock factory with specific settings
351
def custom_lock_factory(cache, key, expire=None, tag=None):
352
return diskcache.RLock(cache, key, expire=expire or 120, tag=tag or 'custom')
353
354
@diskcache.barrier(cache, custom_lock_factory, expire=180, tag='critical_ops')
355
def critical_operation_with_custom_lock():
356
"""Uses custom lock factory with 2-minute default expiration."""
357
print("Performing critical operation with custom lock")
358
time.sleep(1)
359
return "Operation completed"
360
361
result = critical_operation_with_custom_lock()
362
print(result)
363
```
364
365
## Best Practices
366
367
### Throttling Best Practices
368
369
```python
370
# Set reasonable limits and handle throttling gracefully
371
@diskcache.throttle(cache, count=100, seconds=3600, expire=7200) # 100/hour, data expires in 2 hours
372
def rate_limited_operation():
373
pass
374
375
# Use different names for different rate limits
376
@diskcache.throttle(cache, count=10, seconds=60, name='api_writes')
377
def write_api():
378
pass
379
380
@diskcache.throttle(cache, count=100, seconds=60, name='api_reads')
381
def read_api():
382
pass
383
```
384
385
### Memoization Best Practices
386
387
```python
388
# Use appropriate expiration times
389
@diskcache.memoize_stampede(cache, expire=3600, beta=1.2) # 1 hour with 20% early refresh
390
def hourly_report():
391
pass
392
393
@diskcache.memoize_stampede(cache, expire=86400, beta=1.5) # 1 day with 50% early refresh
394
def daily_summary():
395
pass
396
397
# Ignore volatile arguments
398
@diskcache.memoize_stampede(cache, expire=300, ignore=('timestamp', 'request_id'))
399
def process_request(data, timestamp=None, request_id=None):
400
# timestamp and request_id don't affect the computation
401
pass
402
```
403
404
### Error Handling
405
406
```python
407
import diskcache
408
import logging
409
410
cache = diskcache.Cache('/tmp/error_handling')
411
412
@diskcache.throttle(cache, count=5, seconds=60)
413
def fragile_operation():
414
try:
415
# Operation that might fail
416
risky_computation()
417
return "success"
418
except Exception as e:
419
logging.error(f"Operation failed: {e}")
420
# Throttling still applies even if function raises exception
421
raise
422
423
# Graceful degradation when cache is not available
424
try:
425
@diskcache.memoize_stampede(cache, expire=300)
426
def cached_operation(x):
427
return expensive_computation(x)
428
429
except Exception as e:
430
logging.warning(f"Cache not available: {e}")
431
# Fallback to uncached version
432
def cached_operation(x):
433
return expensive_computation(x)
434
```