0
# Synchronization Primitives
1
2
DiskCache provides thread-safe and process-safe synchronization primitives that work across multiple processes and machines. These include locks, re-entrant locks, bounded semaphores, and a running average calculator, all implemented using the underlying cache for coordination.
3
4
## Capabilities
5
6
### Lock - Distributed Lock
7
8
A process-safe lock implementation using spin-lock algorithm with cache-based coordination.
9
10
```python { .api }
11
class Lock:
12
def __init__(self, cache, key, expire=None, tag=None):
13
"""
14
Initialize distributed lock.
15
16
Args:
17
cache (Cache or FanoutCache): Cache instance for coordination
18
key (str): Unique key for the lock
19
expire (float, optional): Lock expiration time in seconds
20
tag (str, optional): Tag for grouping related locks
21
"""
22
23
def acquire(self):
24
"""
25
Acquire the lock using spin-lock algorithm.
26
27
Blocks until lock is acquired. Uses polling with small delays
28
to detect when lock becomes available.
29
30
Returns:
31
bool: True when lock is acquired
32
"""
33
34
def release(self):
35
"""
36
Release the lock by deleting the lock key.
37
38
Returns:
39
bool: True if lock was held and released
40
"""
41
42
def locked(self):
43
"""
44
Check if lock is currently held (by any process).
45
46
Returns:
47
bool: True if lock is currently held
48
"""
49
50
def __enter__(self):
51
"""Context manager entry - acquire lock."""
52
53
def __exit__(self, *exc_info):
54
"""Context manager exit - release lock."""
55
```
56
57
### RLock - Re-entrant Distributed Lock
58
59
A re-entrant lock that can be acquired multiple times by the same process/thread, with a counter to track acquisition depth.
60
61
```python { .api }
62
class RLock:
63
def __init__(self, cache, key, expire=None, tag=None):
64
"""
65
Initialize re-entrant distributed lock.
66
67
Args:
68
cache (Cache or FanoutCache): Cache instance for coordination
69
key (str): Unique key for the lock
70
expire (float, optional): Lock expiration time in seconds
71
tag (str, optional): Tag for grouping related locks
72
"""
73
74
def acquire(self):
75
"""
76
Acquire the re-entrant lock.
77
78
If already held by the current process, increments the acquisition
79
count. Blocks if held by another process.
80
81
Returns:
82
bool: True when lock is acquired
83
"""
84
85
def release(self):
86
"""
87
Release the re-entrant lock.
88
89
Decrements the acquisition count. Only fully releases the lock
90
when count reaches zero.
91
92
Returns:
93
bool: True if lock count was decremented
94
95
Raises:
96
RuntimeError: If attempting to release a lock not held by current process
97
"""
98
99
def __enter__(self):
100
"""Context manager entry - acquire lock."""
101
102
def __exit__(self, *exc_info):
103
"""Context manager exit - release lock."""
104
```
105
106
### BoundedSemaphore - Distributed Semaphore
107
108
A distributed semaphore that limits the number of concurrent accesses to a resource across processes.
109
110
```python { .api }
111
class BoundedSemaphore:
112
def __init__(self, cache, key, value=1, expire=None, tag=None):
113
"""
114
Initialize bounded semaphore.
115
116
Args:
117
cache (Cache or FanoutCache): Cache instance for coordination
118
key (str): Unique key for the semaphore
119
value (int): Initial semaphore count. Default 1.
120
expire (float, optional): Semaphore expiration time in seconds
121
tag (str, optional): Tag for grouping related semaphores
122
"""
123
124
def acquire(self):
125
"""
126
Acquire the semaphore (decrement count).
127
128
Blocks until semaphore count is greater than zero, then
129
atomically decrements the count.
130
131
Returns:
132
bool: True when semaphore is acquired
133
"""
134
135
def release(self):
136
"""
137
Release the semaphore (increment count).
138
139
Atomically increments the semaphore count, potentially
140
allowing other processes to acquire it.
141
142
Returns:
143
bool: True if semaphore was released
144
145
Raises:
146
ValueError: If attempting to release beyond initial value
147
"""
148
149
def __enter__(self):
150
"""Context manager entry - acquire semaphore."""
151
152
def __exit__(self, *exc_info):
153
"""Context manager exit - release semaphore."""
154
```
155
156
### Averager - Running Average Calculator
157
158
A distributed running average calculator that maintains total and count across processes.
159
160
```python { .api }
161
class Averager:
162
def __init__(self, cache, key, expire=None, tag=None):
163
"""
164
Initialize running average calculator.
165
166
Args:
167
cache (Cache or FanoutCache): Cache instance for storage
168
key (str): Unique key for the average data
169
expire (float, optional): Data expiration time in seconds
170
tag (str, optional): Tag for grouping related averages
171
"""
172
173
def add(self, value):
174
"""
175
Add value to the running average.
176
177
Atomically updates both the total sum and count,
178
maintaining consistency across concurrent operations.
179
180
Args:
181
value (float): Value to add to average
182
"""
183
184
def get(self):
185
"""
186
Get current average value.
187
188
Returns:
189
float: Current average (total/count), or None if no values added
190
"""
191
192
def pop(self):
193
"""
194
Get current average and delete the data.
195
196
Returns:
197
float: Current average (total/count), or None if no values added
198
"""
199
```
200
201
## Usage Examples
202
203
### Basic Lock Usage
204
205
```python
206
import diskcache
207
import threading
208
import time
209
210
cache = diskcache.FanoutCache('/tmp/locks')
211
lock = diskcache.Lock(cache, 'resource_lock')
212
213
def worker(worker_id):
214
print(f"Worker {worker_id} trying to acquire lock...")
215
216
with lock: # Context manager automatically acquires and releases
217
print(f"Worker {worker_id} has the lock")
218
time.sleep(2) # Simulate work
219
print(f"Worker {worker_id} releasing lock")
220
221
# Create multiple threads competing for the same lock
222
threads = []
223
for i in range(5):
224
t = threading.Thread(target=worker, args=(i,))
225
threads.append(t)
226
t.start()
227
228
for t in threads:
229
t.join()
230
```
231
232
### Manual Lock Operations
233
234
```python
235
import diskcache
236
237
cache = diskcache.Cache('/tmp/manual_locks')
238
lock = diskcache.Lock(cache, 'manual_lock', expire=30) # Expires in 30 seconds
239
240
# Manual lock operations
241
if not lock.locked():
242
lock.acquire()
243
try:
244
print("Performing critical section work...")
245
# Do critical work
246
finally:
247
lock.release()
248
else:
249
print("Resource is busy")
250
```
251
252
### Re-entrant Lock Usage
253
254
```python
255
import diskcache
256
257
cache = diskcache.Cache('/tmp/rlocks')
258
rlock = diskcache.RLock(cache, 'reentrant_lock')
259
260
def recursive_function(depth):
261
if depth <= 0:
262
return
263
264
with rlock: # Can acquire the same lock multiple times
265
print(f"Acquired lock at depth {depth}")
266
recursive_function(depth - 1) # Recursive call - will re-acquire same lock
267
print(f"Released lock at depth {depth}")
268
269
recursive_function(3)
270
```
271
272
### Bounded Semaphore Usage
273
274
```python
275
import diskcache
276
import threading
277
import time
278
279
cache = diskcache.Cache('/tmp/semaphores')
280
# Allow maximum 3 concurrent connections
281
semaphore = diskcache.BoundedSemaphore(cache, 'db_connections', value=3)
282
283
def database_worker(worker_id):
284
print(f"Worker {worker_id} requesting database connection...")
285
286
with semaphore: # Only 3 workers can be here simultaneously
287
print(f"Worker {worker_id} connected to database")
288
time.sleep(2) # Simulate database work
289
print(f"Worker {worker_id} disconnecting from database")
290
291
# Create 10 workers, but only 3 can access database at once
292
threads = []
293
for i in range(10):
294
t = threading.Thread(target=database_worker, args=(i,))
295
threads.append(t)
296
t.start()
297
298
for t in threads:
299
t.join()
300
```
301
302
### Running Average Usage
303
304
```python
305
import diskcache
306
import random
307
import threading
308
309
cache = diskcache.Cache('/tmp/averages')
310
avg = diskcache.Averager(cache, 'response_time_avg')
311
312
def simulate_requests(worker_id):
313
for i in range(10):
314
# Simulate request with random response time
315
response_time = random.uniform(0.1, 2.0)
316
avg.add(response_time)
317
print(f"Worker {worker_id} added {response_time:.3f}s")
318
319
# Multiple workers adding response times concurrently
320
threads = []
321
for i in range(5):
322
t = threading.Thread(target=simulate_requests, args=(i,))
323
threads.append(t)
324
t.start()
325
326
for t in threads:
327
t.join()
328
329
# Get final average
330
final_avg = avg.get()
331
print(f"Average response time: {final_avg:.3f}s")
332
333
# Reset for next measurement period
334
final_avg = avg.pop() # Gets average and clears data
335
print(f"Final average before reset: {final_avg:.3f}s")
336
print(f"After reset: {avg.get()}") # Should be None
337
```
338
339
### Cross-Process Coordination
340
341
```python
342
import diskcache
343
import multiprocessing
344
import time
345
346
def worker_process(process_id, cache_dir):
347
# Each process creates its own cache connection
348
cache = diskcache.Cache(cache_dir)
349
lock = diskcache.Lock(cache, 'shared_resource')
350
semaphore = diskcache.BoundedSemaphore(cache, 'resource_pool', value=2)
351
352
# Coordinate access across processes
353
with semaphore:
354
print(f"Process {process_id} acquired semaphore")
355
356
with lock:
357
print(f"Process {process_id} has exclusive access")
358
time.sleep(1)
359
print(f"Process {process_id} releasing exclusive access")
360
361
print(f"Process {process_id} released semaphore")
362
363
if __name__ == '__main__':
364
cache_dir = '/tmp/multiprocess_sync'
365
366
# Create multiple processes
367
processes = []
368
for i in range(6):
369
p = multiprocessing.Process(target=worker_process, args=(i, cache_dir))
370
processes.append(p)
371
p.start()
372
373
for p in processes:
374
p.join()
375
```
376
377
### Advanced Patterns
378
379
```python
380
import diskcache
381
import time
382
import threading
383
384
cache = diskcache.FanoutCache('/tmp/advanced_sync')
385
386
# Reader-writer lock pattern using multiple locks
387
read_lock = diskcache.Lock(cache, 'read_lock')
388
write_lock = diskcache.Lock(cache, 'write_lock')
389
reader_count = diskcache.Averager(cache, 'reader_count')
390
391
def reader(reader_id):
392
with read_lock:
393
reader_count.add(1) # Increment reader count
394
if reader_count.get() == 1: # First reader
395
write_lock.acquire() # Block writers
396
397
# Reading critical section
398
print(f"Reader {reader_id} is reading...")
399
time.sleep(1)
400
401
with read_lock:
402
current_count = reader_count.get() or 0
403
if current_count <= 1: # Last reader
404
reader_count.pop() # Reset count
405
write_lock.release() # Allow writers
406
else:
407
# Decrement count (simplified - in practice need atomic decrement)
408
pass
409
410
def writer(writer_id):
411
with write_lock:
412
print(f"Writer {writer_id} is writing...")
413
time.sleep(2)
414
415
# Create readers and writers
416
threads = []
417
for i in range(3):
418
threads.append(threading.Thread(target=reader, args=(i,)))
419
for i in range(2):
420
threads.append(threading.Thread(target=writer, args=(i,)))
421
422
for t in threads:
423
t.start()
424
for t in threads:
425
t.join()
426
```
427
428
## Best Practices
429
430
### Lock Expiration
431
432
Always set reasonable expiration times to prevent deadlocks from crashed processes:
433
434
```python
435
# Lock expires after 60 seconds to prevent deadlocks
436
lock = diskcache.Lock(cache, 'critical_resource', expire=60)
437
```
438
439
### Error Handling
440
441
Properly handle lock acquisition failures and cleanup:
442
443
```python
444
import diskcache
445
446
cache = diskcache.Cache('/tmp/safe_locks')
447
lock = diskcache.Lock(cache, 'safe_lock', expire=30)
448
449
try:
450
if lock.acquire():
451
try:
452
# Critical section
453
critical_work()
454
finally:
455
lock.release()
456
else:
457
print("Could not acquire lock")
458
except Exception as e:
459
print(f"Error in critical section: {e}")
460
# Lock will expire automatically due to expire parameter
461
```
462
463
### Semaphore Resource Management
464
465
Use semaphores to limit resource usage:
466
467
```python
468
# Limit concurrent file downloads
469
download_semaphore = diskcache.BoundedSemaphore(
470
cache, 'downloads', value=5, expire=300
471
)
472
473
def download_file(url):
474
with download_semaphore:
475
# Only 5 downloads can happen simultaneously
476
perform_download(url)
477
```