or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-task-queue.mdexception-handling.mdindex.mdlocking-concurrency.mdresult-management.mdscheduling.mdstorage-backends.mdtask-lifecycle.md

locking-concurrency.mddocs/

0

# Task Locking and Concurrency

1

2

Task locking mechanisms, concurrency control, and synchronization features to prevent duplicate task execution and manage shared resources. These features ensure proper coordination in multi-worker environments.

3

4

## Capabilities

5

6

### Task Locking

7

8

Prevent multiple workers from executing the same critical section simultaneously.

9

10

```python { .api }

11

def lock_task(self, lock_name):

12

"""

13

Create a task lock for coordinating access to shared resources.

14

15

Parameters:

16

- lock_name (str): Name of the lock

17

18

Returns:

19

TaskLock: Lock instance that can be used as context manager or decorator

20

"""

21

22

def is_locked(self, lock_name):

23

"""

24

Check if a named lock is currently held.

25

26

Parameters:

27

- lock_name (str): Name of the lock to check

28

29

Returns:

30

bool: True if lock is currently held

31

"""

32

33

def flush_locks(self, *names):

34

"""

35

Remove specified locks or all locks if no names given.

36

37

Parameters:

38

- *names: Lock names to remove (optional, removes all if empty)

39

40

Returns:

41

set: Names of locks that were removed

42

"""

43

```

44

45

### TaskLock Class

46

47

Context manager and decorator for task synchronization.

48

49

```python { .api }

50

class TaskLock:

51

def __init__(self, huey, name):

52

"""

53

Initialize task lock.

54

55

Parameters:

56

- huey: Huey instance

57

- name (str): Lock name

58

"""

59

60

def is_locked(self):

61

"""

62

Check if this lock is currently held.

63

64

Returns:

65

bool: True if lock is held

66

"""

67

68

def clear(self):

69

"""

70

Force release this lock.

71

72

Returns:

73

bool: True if lock was held and released

74

"""

75

76

def __call__(self, fn):

77

"""

78

Use lock as a decorator.

79

80

Parameters:

81

- fn: Function to wrap with lock

82

83

Returns:

84

Wrapped function that acquires lock before execution

85

"""

86

87

def __enter__(self):

88

"""

89

Acquire lock (context manager entry).

90

91

Raises:

92

TaskLockedException: If lock cannot be acquired

93

"""

94

95

def __exit__(self, exc_type, exc_val, exc_tb):

96

"""

97

Release lock (context manager exit).

98

"""

99

```

100

101

### Lock Exception Handling

102

103

Exception raised when lock cannot be acquired.

104

105

```python { .api }

106

class TaskLockedException(HueyException):

107

"""

108

Exception raised when a task cannot acquire a required lock.

109

110

This exception is raised when:

111

- A task decorated with @lock cannot acquire the lock

112

- A context manager lock cannot be acquired

113

- A task tries to acquire a lock that's already held

114

"""

115

```

116

117

## Usage Examples

118

119

### Basic Task Locking

120

121

```python

122

from huey import RedisHuey

123

from huey.exceptions import TaskLockedException

124

125

huey = RedisHuey('locking-app')

126

127

@huey.task()

128

def update_user_count():

129

# Use lock as context manager

130

with huey.lock_task('user_count_update'):

131

# Only one worker can execute this block at a time

132

current_count = get_user_count()

133

new_count = recalculate_user_count()

134

update_user_count_in_db(new_count)

135

return new_count

136

137

# Multiple workers can enqueue this task, but only one executes at a time

138

result1 = update_user_count()

139

result2 = update_user_count() # Will wait for first to complete

140

```

141

142

### Lock as Decorator

143

144

```python

145

# Create a reusable lock

146

user_stats_lock = huey.lock_task('user_stats')

147

148

@huey.task()

149

@user_stats_lock

150

def update_user_stats(user_id):

151

# This entire function is protected by the lock

152

stats = calculate_user_stats(user_id)

153

save_user_stats(user_id, stats)

154

return stats

155

156

# Alternative: inline decorator

157

@huey.task()

158

@huey.lock_task('report_generation')

159

def generate_daily_report():

160

# Generate report logic

161

return "Report generated"

162

```

163

164

### Fine-grained Locking

165

166

```python

167

@huey.task()

168

def process_user_data(user_id):

169

# Use user-specific locks

170

lock_name = f'user_{user_id}_processing'

171

172

try:

173

with huey.lock_task(lock_name):

174

# Process user data

175

data = load_user_data(user_id)

176

processed = process_data(data)

177

save_processed_data(user_id, processed)

178

return f"Processed user {user_id}"

179

except TaskLockedException:

180

# Another worker is already processing this user

181

return f"User {user_id} already being processed"

182

183

# Each user can be processed independently

184

results = []

185

for user_id in [1, 2, 3, 1, 2]: # Note: duplicates

186

result = process_user_data(user_id)

187

results.append(result)

188

```

189

190

### Lock Status Monitoring

191

192

```python

193

@huey.task()

194

def monitor_locks():

195

# Check specific locks

196

critical_locks = ['database_backup', 'user_count_update', 'report_generation']

197

198

lock_status = {}

199

for lock_name in critical_locks:

200

is_locked = huey.is_locked(lock_name)

201

lock_status[lock_name] = is_locked

202

203

return lock_status

204

205

@huey.task()

206

def emergency_unlock():

207

# Force release all locks (use with caution!)

208

released = huey.flush_locks()

209

return f"Released locks: {released}"

210

211

@huey.task()

212

def unlock_specific_locks():

213

# Release specific locks

214

released = huey.flush_locks('stale_lock_1', 'stale_lock_2')

215

return f"Released locks: {released}"

216

```

217

218

### Conditional Locking

219

220

```python

221

@huey.task()

222

def conditional_processing(resource_id):

223

lock_name = f'resource_{resource_id}'

224

225

# Check if already locked before attempting

226

if huey.is_locked(lock_name):

227

return f"Resource {resource_id} is busy, skipping"

228

229

try:

230

with huey.lock_task(lock_name):

231

# Process resource

232

result = process_resource(resource_id)

233

return result

234

except TaskLockedException:

235

# Lock was acquired between check and acquisition

236

return f"Resource {resource_id} became busy"

237

```

238

239

### Lock with Timeout Pattern

240

241

```python

242

import time

243

from contextlib import contextmanager

244

245

@contextmanager

246

def timed_lock(huey_instance, lock_name, timeout=30, check_interval=0.5):

247

"""Custom lock with timeout capability."""

248

start_time = time.time()

249

250

while time.time() - start_time < timeout:

251

try:

252

with huey_instance.lock_task(lock_name):

253

yield

254

return

255

except TaskLockedException:

256

time.sleep(check_interval)

257

258

raise TimeoutError(f"Could not acquire lock '{lock_name}' within {timeout} seconds")

259

260

@huey.task()

261

def process_with_timeout(data):

262

try:

263

with timed_lock(huey, 'critical_resource', timeout=60):

264

# Process data with timeout

265

result = expensive_processing(data)

266

return result

267

except TimeoutError as e:

268

return f"Processing failed: {e}"

269

```

270

271

### Database Connection Pooling with Locks

272

273

```python

274

import sqlite3

275

import threading

276

277

# Shared resource that needs protection

278

db_connections = {}

279

connection_lock = threading.Lock()

280

281

@huey.task()

282

def database_task(query, db_name='default'):

283

# Use lock to coordinate database access

284

lock_name = f'db_access_{db_name}'

285

286

with huey.lock_task(lock_name):

287

# Get or create database connection

288

if db_name not in db_connections:

289

db_connections[db_name] = sqlite3.connect(f'{db_name}.db')

290

291

conn = db_connections[db_name]

292

cursor = conn.cursor()

293

cursor.execute(query)

294

result = cursor.fetchall()

295

conn.commit()

296

297

return f"Query executed: {len(result)} rows"

298

```

299

300

### Distributed Lock Patterns

301

302

```python

303

@huey.task()

304

def singleton_task():

305

"""Ensure only one instance of this task runs across all workers."""

306

lock_name = 'singleton_task_global'

307

308

try:

309

with huey.lock_task(lock_name):

310

# This code runs on only one worker globally

311

perform_singleton_operation()

312

return "Singleton task completed"

313

except TaskLockedException:

314

return "Singleton task already running"

315

316

@huey.task()

317

def batch_processor(batch_id):

318

"""Process batches with coordination between workers."""

319

# Lock the entire batch

320

batch_lock = f'batch_{batch_id}'

321

322

try:

323

with huey.lock_task(batch_lock):

324

items = get_batch_items(batch_id)

325

326

# Process items with item-level locks for fine-grained control

327

results = []

328

for item_id in items:

329

item_lock = f'item_{item_id}'

330

try:

331

with huey.lock_task(item_lock):

332

result = process_item(item_id)

333

results.append(result)

334

except TaskLockedException:

335

results.append(f"Item {item_id} locked")

336

337

return f"Batch {batch_id}: {len(results)} items processed"

338

except TaskLockedException:

339

return f"Batch {batch_id} already being processed"

340

```

341

342

### Lock Cleanup and Maintenance

343

344

```python

345

@huey.periodic_task(crontab(minute='*/10')) # Every 10 minutes

346

def cleanup_stale_locks():

347

"""Periodic cleanup of potentially stale locks."""

348

# In production, you might want to track lock timestamps

349

# and clean up locks that are older than expected task duration

350

351

# For now, just report on current locks

352

# (Manual cleanup would require custom lock tracking)

353

354

# Check critical locks

355

critical_locks = ['database_backup', 'report_generation']

356

stale_locks = []

357

358

for lock_name in critical_locks:

359

if huey.is_locked(lock_name):

360

# In real implementation, check if lock is truly stale

361

# based on timestamps or other criteria

362

stale_locks.append(lock_name)

363

364

if stale_locks:

365

return f"Warning: Long-running locks detected: {stale_locks}"

366

else:

367

return "All locks appear healthy"

368

369

@huey.task()

370

def force_unlock_emergency(lock_names):

371

"""Emergency lock release (use with extreme caution)."""

372

if not isinstance(lock_names, list):

373

lock_names = [lock_names]

374

375

released = huey.flush_locks(*lock_names)

376

return f"Emergency unlock completed. Released: {released}"

377

```