or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdcluster-support.mdconnection-management.mdcore-client.mddistributed-locking.mderror-handling.mdhigh-availability.mdindex.mdpipelines-transactions.mdpubsub-messaging.md

distributed-locking.mddocs/

0

# Distributed Locking

1

2

Redis distributed locks provide mutual exclusion across multiple processes or servers using Redis as a coordination service. The Lock implementation uses SET with NX and EX options for atomic lock acquisition with automatic expiration.

3

4

## Capabilities

5

6

### Lock Operations

7

8

Redis Lock class for creating and managing distributed locks with automatic expiration.

9

10

```python { .api }

11

def lock(

12

self,

13

name: str,

14

timeout: Optional[float] = None,

15

sleep: float = 0.1,

16

blocking: bool = True,

17

blocking_timeout: Optional[float] = None,

18

thread_local: bool = True

19

) -> "Lock": ...

20

21

class Lock:

22

def __init__(

23

self,

24

redis: Redis,

25

name: str,

26

timeout: Optional[float] = None,

27

sleep: float = 0.1,

28

blocking: bool = True,

29

blocking_timeout: Optional[float] = None,

30

thread_local: bool = True

31

): ...

32

33

def acquire(

34

self,

35

blocking: Optional[bool] = None,

36

blocking_timeout: Optional[float] = None,

37

token: Optional[bytes] = None

38

) -> bool: ...

39

40

def release(self) -> None: ...

41

42

def extend(self, additional_time: float, replace_ttl: bool = False) -> bool: ...

43

44

def locked(self) -> bool: ...

45

46

def owned(self) -> bool: ...

47

48

def reacquire(self) -> bool: ...

49

50

def __enter__(self) -> "Lock": ...

51

52

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

53

54

@property

55

def token(self) -> Optional[bytes]: ...

56

```

57

58

## Usage Examples

59

60

### Basic Lock Usage

61

62

```python

63

import redis

64

import time

65

import threading

66

67

r = redis.Redis(host='localhost', port=6379, db=0)

68

69

def worker(worker_id):

70

"""Worker function that acquires a lock before processing"""

71

lock = r.lock("shared_resource", timeout=10)

72

73

try:

74

# Try to acquire the lock

75

if lock.acquire(blocking=False):

76

print(f"Worker {worker_id}: Acquired lock")

77

78

# Simulate work with shared resource

79

print(f"Worker {worker_id}: Processing shared resource...")

80

time.sleep(2)

81

82

print(f"Worker {worker_id}: Work completed")

83

else:

84

print(f"Worker {worker_id}: Could not acquire lock")

85

finally:

86

# Always try to release the lock

87

if lock.owned():

88

lock.release()

89

print(f"Worker {worker_id}: Released lock")

90

91

# Run multiple workers

92

threads = []

93

for i in range(3):

94

t = threading.Thread(target=worker, args=(i,))

95

threads.append(t)

96

t.start()

97

98

for t in threads:

99

t.join()

100

```

101

102

### Lock with Context Manager

103

104

```python

105

import redis

106

import time

107

108

r = redis.Redis(host='localhost', port=6379, db=0)

109

110

def process_with_lock():

111

"""Use lock as context manager for automatic cleanup"""

112

with r.lock("critical_section", timeout=5) as lock:

113

print("Entered critical section")

114

115

# Simulate critical work

116

time.sleep(2)

117

118

print("Exiting critical section")

119

# Lock is automatically released here

120

121

# Call the function

122

process_with_lock()

123

```

124

125

### Blocking Lock with Timeout

126

127

```python

128

import redis

129

import time

130

import threading

131

132

r = redis.Redis(host='localhost', port=6379, db=0)

133

134

def blocking_worker(worker_id):

135

"""Worker that waits for lock with timeout"""

136

lock = r.lock("shared_counter", timeout=10)

137

138

try:

139

# Block for up to 5 seconds waiting for the lock

140

acquired = lock.acquire(blocking=True, blocking_timeout=5)

141

142

if acquired:

143

print(f"Worker {worker_id}: Got the lock after waiting")

144

145

# Increment counter

146

current = r.get("counter") or b"0"

147

new_value = int(current) + 1

148

r.set("counter", new_value)

149

150

print(f"Worker {worker_id}: Counter = {new_value}")

151

time.sleep(1) # Hold lock briefly

152

153

else:

154

print(f"Worker {worker_id}: Timeout waiting for lock")

155

156

finally:

157

if lock.owned():

158

lock.release()

159

160

# Initialize counter

161

r.set("counter", 0)

162

163

# Start multiple workers

164

threads = []

165

for i in range(5):

166

t = threading.Thread(target=blocking_worker, args=(i,))

167

threads.append(t)

168

t.start()

169

170

for t in threads:

171

t.join()

172

173

final_count = r.get("counter")

174

print(f"Final counter value: {final_count}")

175

```

176

177

### Lock Extension

178

179

```python

180

import redis

181

import time

182

import threading

183

184

r = redis.Redis(host='localhost', port=6379, db=0)

185

186

def long_running_task():

187

"""Task that may need to extend lock duration"""

188

lock = r.lock("long_task", timeout=5) # Initial 5 second timeout

189

190

try:

191

if lock.acquire():

192

print("Started long running task")

193

194

for i in range(10):

195

# Simulate work

196

time.sleep(1)

197

print(f"Working... step {i+1}/10")

198

199

# Extend lock if we're halfway through and need more time

200

if i == 4: # After 5 seconds of work

201

extended = lock.extend(additional_time=5)

202

if extended:

203

print("Extended lock by 5 seconds")

204

else:

205

print("Failed to extend lock - it may have expired")

206

break

207

208

print("Task completed")

209

finally:

210

if lock.owned():

211

lock.release()

212

213

long_running_task()

214

```

215

216

### Lock Reacquisition

217

218

```python

219

import redis

220

import time

221

222

r = redis.Redis(host='localhost', port=6379, db=0)

223

224

def task_with_reacquisition():

225

"""Task that releases and reacquires lock"""

226

lock = r.lock("reacquire_demo", timeout=10)

227

228

try:

229

# Initial acquisition

230

if lock.acquire():

231

print("Phase 1: Acquired lock")

232

time.sleep(2)

233

234

# Release lock temporarily

235

lock.release()

236

print("Released lock for other processes")

237

238

# Do some work that doesn't need the lock

239

time.sleep(1)

240

241

# Reacquire the same lock

242

if lock.reacquire():

243

print("Phase 2: Reacquired lock")

244

time.sleep(2)

245

print("Phase 2 complete")

246

else:

247

print("Failed to reacquire lock")

248

249

finally:

250

if lock.owned():

251

lock.release()

252

253

task_with_reacquisition()

254

```

255

256

### Lock Status Checking

257

258

```python

259

import redis

260

import time

261

import threading

262

263

r = redis.Redis(host='localhost', port=6379, db=0)

264

265

def monitor_lock():

266

"""Monitor lock status"""

267

lock = r.lock("monitored_lock", timeout=8)

268

269

def lock_holder():

270

"""Function that holds the lock"""

271

with lock:

272

print("Lock holder: Acquired lock")

273

time.sleep(5)

274

print("Lock holder: Releasing lock")

275

276

def lock_monitor():

277

"""Function that monitors the lock"""

278

time.sleep(0.5) # Let lock holder start first

279

280

for i in range(10):

281

is_locked = lock.locked()

282

is_owned = lock.owned()

283

284

print(f"Monitor: Locked={is_locked}, Owned by us={is_owned}")

285

time.sleep(1)

286

287

# Start both threads

288

holder_thread = threading.Thread(target=lock_holder)

289

monitor_thread = threading.Thread(target=lock_monitor)

290

291

holder_thread.start()

292

monitor_thread.start()

293

294

holder_thread.join()

295

monitor_thread.join()

296

297

monitor_lock()

298

```

299

300

### Distributed Counter with Lock

301

302

```python

303

import redis

304

import time

305

import threading

306

import random

307

308

class DistributedCounter:

309

def __init__(self, redis_client, counter_name):

310

self.r = redis_client

311

self.counter_name = counter_name

312

self.lock_name = f"{counter_name}:lock"

313

314

def increment(self, amount=1):

315

"""Thread-safe increment operation"""

316

with self.r.lock(self.lock_name, timeout=5) as lock:

317

# Get current value

318

current = self.r.get(self.counter_name)

319

current_value = int(current) if current else 0

320

321

# Simulate some processing time

322

time.sleep(random.uniform(0.01, 0.1))

323

324

# Set new value

325

new_value = current_value + amount

326

self.r.set(self.counter_name, new_value)

327

328

return new_value

329

330

def get_value(self):

331

"""Get current counter value"""

332

value = self.r.get(self.counter_name)

333

return int(value) if value else 0

334

335

# Usage example

336

r = redis.Redis(host='localhost', port=6379, db=0)

337

counter = DistributedCounter(r, "global_counter")

338

339

# Initialize counter

340

r.set("global_counter", 0)

341

342

def worker(worker_id, counter, iterations):

343

"""Worker that increments counter multiple times"""

344

for i in range(iterations):

345

value = counter.increment()

346

print(f"Worker {worker_id}: Incremented to {value}")

347

time.sleep(random.uniform(0.1, 0.3))

348

349

# Run multiple workers concurrently

350

threads = []

351

for i in range(5):

352

t = threading.Thread(target=worker, args=(i, counter, 3))

353

threads.append(t)

354

t.start()

355

356

for t in threads:

357

t.join()

358

359

print(f"Final counter value: {counter.get_value()}")

360

```

361

362

### Lock with Custom Token

363

364

```python

365

import redis

366

import uuid

367

import time

368

369

r = redis.Redis(host='localhost', port=6379, db=0)

370

371

def custom_token_lock():

372

"""Use custom token for lock identification"""

373

# Generate custom token

374

custom_token = str(uuid.uuid4()).encode()

375

376

lock = r.lock("custom_token_lock", timeout=10)

377

378

try:

379

# Acquire with custom token

380

acquired = lock.acquire(token=custom_token)

381

382

if acquired:

383

print(f"Acquired lock with token: {custom_token}")

384

print(f"Lock token: {lock.token}")

385

386

# Verify we own the lock

387

print(f"Lock owned: {lock.owned()}")

388

389

time.sleep(2)

390

391

finally:

392

if lock.owned():

393

lock.release()

394

print("Lock released")

395

396

custom_token_lock()

397

```

398

399

### Handling Lock Expiration

400

401

```python

402

import redis

403

import time

404

import threading

405

406

r = redis.Redis(host='localhost', port=6379, db=0)

407

408

def task_with_expiration_handling():

409

"""Handle case where lock expires during processing"""

410

lock = r.lock("expiring_lock", timeout=3) # Short timeout for demo

411

412

try:

413

if lock.acquire():

414

print("Acquired lock")

415

416

# Simulate long-running task

417

for i in range(8):

418

time.sleep(1)

419

420

# Check if we still own the lock

421

if not lock.owned():

422

print(f"Lock expired during processing at step {i+1}")

423

break

424

425

print(f"Processing step {i+1}")

426

else:

427

print("Task completed successfully")

428

429

except Exception as e:

430

print(f"Error during processing: {e}")

431

finally:

432

# Only release if we still own it

433

if lock.owned():

434

lock.release()

435

print("Released lock")

436

else:

437

print("Lock was already expired/released")

438

439

task_with_expiration_handling()

440

```

441

442

### Multi-Resource Locking

443

444

```python

445

import redis

446

import time

447

448

r = redis.Redis(host='localhost', port=6379, db=0)

449

450

class MultiLock:

451

def __init__(self, redis_client, lock_names, timeout=10):

452

self.r = redis_client

453

self.locks = [r.lock(name, timeout=timeout) for name in lock_names]

454

self.acquired_locks = []

455

456

def acquire_all(self, blocking=True, blocking_timeout=None):

457

"""Acquire all locks or none"""

458

try:

459

for lock in self.locks:

460

if lock.acquire(blocking=blocking, blocking_timeout=blocking_timeout):

461

self.acquired_locks.append(lock)

462

else:

463

# Failed to acquire a lock, release all acquired ones

464

self.release_all()

465

return False

466

return True

467

except Exception:

468

self.release_all()

469

raise

470

471

def release_all(self):

472

"""Release all acquired locks"""

473

for lock in self.acquired_locks:

474

if lock.owned():

475

lock.release()

476

self.acquired_locks.clear()

477

478

def __enter__(self):

479

if self.acquire_all():

480

return self

481

else:

482

raise RuntimeError("Could not acquire all locks")

483

484

def __exit__(self, exc_type, exc_val, exc_tb):

485

self.release_all()

486

487

def transfer_between_accounts(from_account, to_account, amount):

488

"""Transfer money between accounts with multi-lock"""

489

# Lock both accounts to prevent race conditions

490

lock_names = [f"account:{from_account}:lock", f"account:{to_account}:lock"]

491

492

with MultiLock(r, lock_names) as multi_lock:

493

print(f"Acquired locks for accounts {from_account} and {to_account}")

494

495

# Get balances

496

from_balance = float(r.get(f"account:{from_account}:balance") or 0)

497

to_balance = float(r.get(f"account:{to_account}:balance") or 0)

498

499

# Check sufficient funds

500

if from_balance < amount:

501

raise ValueError("Insufficient funds")

502

503

# Perform transfer

504

time.sleep(0.1) # Simulate processing time

505

506

r.set(f"account:{from_account}:balance", from_balance - amount)

507

r.set(f"account:{to_account}:balance", to_balance + amount)

508

509

print(f"Transferred ${amount} from account {from_account} to {to_account}")

510

511

# Initialize accounts

512

r.set("account:1001:balance", 1000)

513

r.set("account:1002:balance", 500)

514

515

# Perform transfer

516

try:

517

transfer_between_accounts("1001", "1002", 250)

518

519

# Check final balances

520

balance_1001 = r.get("account:1001:balance")

521

balance_1002 = r.get("account:1002:balance")

522

print(f"Final balances: Account 1001: ${balance_1001}, Account 1002: ${balance_1002}")

523

524

except Exception as e:

525

print(f"Transfer failed: {e}")

526

```