or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-caching.mddisk-serialization.mddjango-integration.mdfanout-cache.mdindex.mdpersistent-data-structures.mdrecipe-functions.mdsynchronization-primitives.md

synchronization-primitives.mddocs/

0

# Synchronization Primitives

1

2

DiskCache provides thread-safe and process-safe synchronization primitives that work across multiple processes and machines. These include locks, re-entrant locks, bounded semaphores, and a running average calculator, all implemented using the underlying cache for coordination.

3

4

## Capabilities

5

6

### Lock - Distributed Lock

7

8

A process-safe lock implementation using spin-lock algorithm with cache-based coordination.

9

10

```python { .api }

11

class Lock:

12

def __init__(self, cache, key, expire=None, tag=None):

13

"""

14

Initialize distributed lock.

15

16

Args:

17

cache (Cache or FanoutCache): Cache instance for coordination

18

key (str): Unique key for the lock

19

expire (float, optional): Lock expiration time in seconds

20

tag (str, optional): Tag for grouping related locks

21

"""

22

23

def acquire(self):

24

"""

25

Acquire the lock using spin-lock algorithm.

26

27

Blocks until lock is acquired. Uses polling with small delays

28

to detect when lock becomes available.

29

30

Returns:

31

bool: True when lock is acquired

32

"""

33

34

def release(self):

35

"""

36

Release the lock by deleting the lock key.

37

38

Returns:

39

bool: True if lock was held and released

40

"""

41

42

def locked(self):

43

"""

44

Check if lock is currently held (by any process).

45

46

Returns:

47

bool: True if lock is currently held

48

"""

49

50

def __enter__(self):

51

"""Context manager entry - acquire lock."""

52

53

def __exit__(self, *exc_info):

54

"""Context manager exit - release lock."""

55

```

56

57

### RLock - Re-entrant Distributed Lock

58

59

A re-entrant lock that can be acquired multiple times by the same process/thread, with a counter to track acquisition depth.

60

61

```python { .api }

62

class RLock:

63

def __init__(self, cache, key, expire=None, tag=None):

64

"""

65

Initialize re-entrant distributed lock.

66

67

Args:

68

cache (Cache or FanoutCache): Cache instance for coordination

69

key (str): Unique key for the lock

70

expire (float, optional): Lock expiration time in seconds

71

tag (str, optional): Tag for grouping related locks

72

"""

73

74

def acquire(self):

75

"""

76

Acquire the re-entrant lock.

77

78

If already held by the current process, increments the acquisition

79

count. Blocks if held by another process.

80

81

Returns:

82

bool: True when lock is acquired

83

"""

84

85

def release(self):

86

"""

87

Release the re-entrant lock.

88

89

Decrements the acquisition count. Only fully releases the lock

90

when count reaches zero.

91

92

Returns:

93

bool: True if lock count was decremented

94

95

Raises:

96

RuntimeError: If attempting to release a lock not held by current process

97

"""

98

99

def __enter__(self):

100

"""Context manager entry - acquire lock."""

101

102

def __exit__(self, *exc_info):

103

"""Context manager exit - release lock."""

104

```

105

106

### BoundedSemaphore - Distributed Semaphore

107

108

A distributed semaphore that limits the number of concurrent accesses to a resource across processes.

109

110

```python { .api }

111

class BoundedSemaphore:

112

def __init__(self, cache, key, value=1, expire=None, tag=None):

113

"""

114

Initialize bounded semaphore.

115

116

Args:

117

cache (Cache or FanoutCache): Cache instance for coordination

118

key (str): Unique key for the semaphore

119

value (int): Initial semaphore count. Default 1.

120

expire (float, optional): Semaphore expiration time in seconds

121

tag (str, optional): Tag for grouping related semaphores

122

"""

123

124

def acquire(self):

125

"""

126

Acquire the semaphore (decrement count).

127

128

Blocks until semaphore count is greater than zero, then

129

atomically decrements the count.

130

131

Returns:

132

bool: True when semaphore is acquired

133

"""

134

135

def release(self):

136

"""

137

Release the semaphore (increment count).

138

139

Atomically increments the semaphore count, potentially

140

allowing other processes to acquire it.

141

142

Returns:

143

bool: True if semaphore was released

144

145

Raises:

146

ValueError: If attempting to release beyond initial value

147

"""

148

149

def __enter__(self):

150

"""Context manager entry - acquire semaphore."""

151

152

def __exit__(self, *exc_info):

153

"""Context manager exit - release semaphore."""

154

```

155

156

### Averager - Running Average Calculator

157

158

A distributed running average calculator that maintains total and count across processes.

159

160

```python { .api }

161

class Averager:

162

def __init__(self, cache, key, expire=None, tag=None):

163

"""

164

Initialize running average calculator.

165

166

Args:

167

cache (Cache or FanoutCache): Cache instance for storage

168

key (str): Unique key for the average data

169

expire (float, optional): Data expiration time in seconds

170

tag (str, optional): Tag for grouping related averages

171

"""

172

173

def add(self, value):

174

"""

175

Add value to the running average.

176

177

Atomically updates both the total sum and count,

178

maintaining consistency across concurrent operations.

179

180

Args:

181

value (float): Value to add to average

182

"""

183

184

def get(self):

185

"""

186

Get current average value.

187

188

Returns:

189

float: Current average (total/count), or None if no values added

190

"""

191

192

def pop(self):

193

"""

194

Get current average and delete the data.

195

196

Returns:

197

float: Current average (total/count), or None if no values added

198

"""

199

```

200

201

## Usage Examples

202

203

### Basic Lock Usage

204

205

```python

206

import diskcache

207

import threading

208

import time

209

210

cache = diskcache.FanoutCache('/tmp/locks')

211

lock = diskcache.Lock(cache, 'resource_lock')

212

213

def worker(worker_id):

214

print(f"Worker {worker_id} trying to acquire lock...")

215

216

with lock: # Context manager automatically acquires and releases

217

print(f"Worker {worker_id} has the lock")

218

time.sleep(2) # Simulate work

219

print(f"Worker {worker_id} releasing lock")

220

221

# Create multiple threads competing for the same lock

222

threads = []

223

for i in range(5):

224

t = threading.Thread(target=worker, args=(i,))

225

threads.append(t)

226

t.start()

227

228

for t in threads:

229

t.join()

230

```

231

232

### Manual Lock Operations

233

234

```python

235

import diskcache

236

237

cache = diskcache.Cache('/tmp/manual_locks')

238

lock = diskcache.Lock(cache, 'manual_lock', expire=30) # Expires in 30 seconds

239

240

# Manual lock operations

241

if not lock.locked():

242

lock.acquire()

243

try:

244

print("Performing critical section work...")

245

# Do critical work

246

finally:

247

lock.release()

248

else:

249

print("Resource is busy")

250

```

251

252

### Re-entrant Lock Usage

253

254

```python

255

import diskcache

256

257

cache = diskcache.Cache('/tmp/rlocks')

258

rlock = diskcache.RLock(cache, 'reentrant_lock')

259

260

def recursive_function(depth):

261

if depth <= 0:

262

return

263

264

with rlock: # Can acquire the same lock multiple times

265

print(f"Acquired lock at depth {depth}")

266

recursive_function(depth - 1) # Recursive call - will re-acquire same lock

267

print(f"Released lock at depth {depth}")

268

269

recursive_function(3)

270

```

271

272

### Bounded Semaphore Usage

273

274

```python

275

import diskcache

276

import threading

277

import time

278

279

cache = diskcache.Cache('/tmp/semaphores')

280

# Allow maximum 3 concurrent connections

281

semaphore = diskcache.BoundedSemaphore(cache, 'db_connections', value=3)

282

283

def database_worker(worker_id):

284

print(f"Worker {worker_id} requesting database connection...")

285

286

with semaphore: # Only 3 workers can be here simultaneously

287

print(f"Worker {worker_id} connected to database")

288

time.sleep(2) # Simulate database work

289

print(f"Worker {worker_id} disconnecting from database")

290

291

# Create 10 workers, but only 3 can access database at once

292

threads = []

293

for i in range(10):

294

t = threading.Thread(target=database_worker, args=(i,))

295

threads.append(t)

296

t.start()

297

298

for t in threads:

299

t.join()

300

```

301

302

### Running Average Usage

303

304

```python

305

import diskcache

306

import random

307

import threading

308

309

cache = diskcache.Cache('/tmp/averages')

310

avg = diskcache.Averager(cache, 'response_time_avg')

311

312

def simulate_requests(worker_id):

313

for i in range(10):

314

# Simulate request with random response time

315

response_time = random.uniform(0.1, 2.0)

316

avg.add(response_time)

317

print(f"Worker {worker_id} added {response_time:.3f}s")

318

319

# Multiple workers adding response times concurrently

320

threads = []

321

for i in range(5):

322

t = threading.Thread(target=simulate_requests, args=(i,))

323

threads.append(t)

324

t.start()

325

326

for t in threads:

327

t.join()

328

329

# Get final average

330

final_avg = avg.get()

331

print(f"Average response time: {final_avg:.3f}s")

332

333

# Reset for next measurement period

334

final_avg = avg.pop() # Gets average and clears data

335

print(f"Final average before reset: {final_avg:.3f}s")

336

print(f"After reset: {avg.get()}") # Should be None

337

```

338

339

### Cross-Process Coordination

340

341

```python

342

import diskcache

343

import multiprocessing

344

import time

345

346

def worker_process(process_id, cache_dir):

347

# Each process creates its own cache connection

348

cache = diskcache.Cache(cache_dir)

349

lock = diskcache.Lock(cache, 'shared_resource')

350

semaphore = diskcache.BoundedSemaphore(cache, 'resource_pool', value=2)

351

352

# Coordinate access across processes

353

with semaphore:

354

print(f"Process {process_id} acquired semaphore")

355

356

with lock:

357

print(f"Process {process_id} has exclusive access")

358

time.sleep(1)

359

print(f"Process {process_id} releasing exclusive access")

360

361

print(f"Process {process_id} released semaphore")

362

363

if __name__ == '__main__':

364

cache_dir = '/tmp/multiprocess_sync'

365

366

# Create multiple processes

367

processes = []

368

for i in range(6):

369

p = multiprocessing.Process(target=worker_process, args=(i, cache_dir))

370

processes.append(p)

371

p.start()

372

373

for p in processes:

374

p.join()

375

```

376

377

### Advanced Patterns

378

379

```python

380

import diskcache

381

import time

382

import threading

383

384

cache = diskcache.FanoutCache('/tmp/advanced_sync')

385

386

# Reader-writer lock pattern using multiple locks

387

read_lock = diskcache.Lock(cache, 'read_lock')

388

write_lock = diskcache.Lock(cache, 'write_lock')

389

reader_count = diskcache.Averager(cache, 'reader_count')

390

391

def reader(reader_id):

392

with read_lock:

393

reader_count.add(1) # Increment reader count

394

if reader_count.get() == 1: # First reader

395

write_lock.acquire() # Block writers

396

397

# Reading critical section

398

print(f"Reader {reader_id} is reading...")

399

time.sleep(1)

400

401

with read_lock:

402

current_count = reader_count.get() or 0

403

if current_count <= 1: # Last reader

404

reader_count.pop() # Reset count

405

write_lock.release() # Allow writers

406

else:

407

# Decrement count (simplified - in practice need atomic decrement)

408

pass

409

410

def writer(writer_id):

411

with write_lock:

412

print(f"Writer {writer_id} is writing...")

413

time.sleep(2)

414

415

# Create readers and writers

416

threads = []

417

for i in range(3):

418

threads.append(threading.Thread(target=reader, args=(i,)))

419

for i in range(2):

420

threads.append(threading.Thread(target=writer, args=(i,)))

421

422

for t in threads:

423

t.start()

424

for t in threads:

425

t.join()

426

```

427

428

## Best Practices

429

430

### Lock Expiration

431

432

Always set reasonable expiration times to prevent deadlocks from crashed processes:

433

434

```python

435

# Lock expires after 60 seconds to prevent deadlocks

436

lock = diskcache.Lock(cache, 'critical_resource', expire=60)

437

```

438

439

### Error Handling

440

441

Properly handle lock acquisition failures and cleanup:

442

443

```python

444

import diskcache

445

446

cache = diskcache.Cache('/tmp/safe_locks')

447

lock = diskcache.Lock(cache, 'safe_lock', expire=30)

448

449

try:

450

if lock.acquire():

451

try:

452

# Critical section

453

critical_work()

454

finally:

455

lock.release()

456

else:

457

print("Could not acquire lock")

458

except Exception as e:

459

print(f"Error in critical section: {e}")

460

# Lock will expire automatically due to expire parameter

461

```

462

463

### Semaphore Resource Management

464

465

Use semaphores to limit resource usage:

466

467

```python

468

# Limit concurrent file downloads

469

download_semaphore = diskcache.BoundedSemaphore(

470

cache, 'downloads', value=5, expire=300

471

)

472

473

def download_file(url):

474

with download_semaphore:

475

# Only 5 downloads can happen simultaneously

476

perform_download(url)

477

```