or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-task-queue.mdexception-handling.mdindex.mdlocking-concurrency.mdresult-management.mdscheduling.mdstorage-backends.mdtask-lifecycle.md

task-lifecycle.mddocs/

0

# Task Lifecycle and Hooks

1

2

Task lifecycle management including pre/post execution hooks, startup/shutdown hooks, signal handling, and task pipeline chaining. These features enable comprehensive task orchestration and monitoring.

3

4

## Capabilities

5

6

### Execution Hooks

7

8

Register callbacks that run before and after task execution for monitoring, logging, and custom processing.

9

10

```python { .api }

11

def pre_execute(self, name=None):

12

"""

13

Decorator to register pre-execution hook.

14

15

Parameters:

16

- name (str): Hook name (default: function name)

17

18

Returns:

19

Decorator function

20

21

Hook function signature:

22

def hook(task): ...

23

24

Hooks can raise CancelExecution to prevent task execution.

25

"""

26

27

def post_execute(self, name=None):

28

"""

29

Decorator to register post-execution hook.

30

31

Parameters:

32

- name (str): Hook name (default: function name)

33

34

Returns:

35

Decorator function

36

37

Hook function signature:

38

def hook(task, task_value, exception): ...

39

40

Parameters:

41

- task: Task instance that executed

42

- task_value: Return value from task (None if exception occurred)

43

- exception: Exception instance if task failed (None if successful)

44

"""

45

46

def unregister_pre_execute(self, name):

47

"""

48

Remove a pre-execution hook.

49

50

Parameters:

51

- name (str or function): Hook name or function to remove

52

53

Returns:

54

bool: True if hook was removed

55

"""

56

57

def unregister_post_execute(self, name):

58

"""

59

Remove a post-execution hook.

60

61

Parameters:

62

- name (str or function): Hook name or function to remove

63

64

Returns:

65

bool: True if hook was removed

66

"""

67

```

68

69

### Startup and Shutdown Hooks

70

71

Register callbacks for consumer process lifecycle events.

72

73

```python { .api }

74

def on_startup(self, name=None):

75

"""

76

Decorator to register startup hook.

77

78

Parameters:

79

- name (str): Hook name (default: function name)

80

81

Returns:

82

Decorator function

83

84

Hook function signature:

85

def hook(): ...

86

"""

87

88

def on_shutdown(self, name=None):

89

"""

90

Decorator to register shutdown hook.

91

92

Parameters:

93

- name (str): Hook name (default: function name)

94

95

Returns:

96

Decorator function

97

98

Hook function signature:

99

def hook(): ...

100

"""

101

102

def unregister_on_startup(self, name):

103

"""

104

Remove a startup hook.

105

106

Parameters:

107

- name (str or function): Hook name or function to remove

108

109

Returns:

110

bool: True if hook was removed

111

"""

112

113

def unregister_on_shutdown(self, name):

114

"""

115

Remove a shutdown hook.

116

117

Parameters:

118

- name (str or function): Hook name or function to remove

119

120

Returns:

121

bool: True if hook was removed

122

"""

123

```

124

125

### Signal Handling

126

127

Register signal handlers for various task execution events.

128

129

```python { .api }

130

def signal(self, *signals):

131

"""

132

Decorator to register signal handler.

133

134

Parameters:

135

- *signals: Signal names to handle

136

137

Available signals:

138

- SIGNAL_ENQUEUED: Task was added to queue

139

- SIGNAL_EXECUTING: Task execution started

140

- SIGNAL_COMPLETE: Task completed successfully

141

- SIGNAL_ERROR: Task failed with exception

142

- SIGNAL_RETRYING: Task is being retried

143

- SIGNAL_REVOKED: Task was revoked

144

- SIGNAL_CANCELED: Task was canceled

145

- SIGNAL_SCHEDULED: Task was added to schedule

146

- SIGNAL_LOCKED: Task could not acquire lock

147

- SIGNAL_EXPIRED: Task expired before execution

148

- SIGNAL_INTERRUPTED: Task was interrupted

149

150

Returns:

151

Decorator function

152

153

Handler function signature:

154

def handler(signal, task, *args, **kwargs): ...

155

"""

156

157

def disconnect_signal(self, receiver, *signals):

158

"""

159

Disconnect a signal handler.

160

161

Parameters:

162

- receiver: Handler function to disconnect

163

- *signals: Signal names to disconnect from

164

165

Returns:

166

None

167

"""

168

```

169

170

### Task Pipeline and Chaining

171

172

Create task chains and pipelines for complex workflows.

173

174

```python { .api }

175

def then(self, task, *args, **kwargs):

176

"""

177

Chain another task to run after this one completes successfully.

178

179

Parameters:

180

- task (TaskWrapper or Task): Task to run next

181

- *args: Arguments to pass to next task

182

- **kwargs: Keyword arguments to pass to next task

183

184

Returns:

185

Task: Self for method chaining

186

"""

187

188

def error(self, task, *args, **kwargs):

189

"""

190

Chain another task to run if this one fails.

191

192

Parameters:

193

- task (TaskWrapper or Task): Task to run on error

194

- *args: Arguments to pass to error task

195

- **kwargs: Keyword arguments to pass to error task

196

197

Returns:

198

Task: Self for method chaining

199

"""

200

```

201

202

### Task Revocation and Control

203

204

Control task execution with revocation and restoration capabilities.

205

206

```python { .api }

207

def revoke_all(self, task_class, revoke_until=None, revoke_once=False):

208

"""

209

Revoke all instances of a task type.

210

211

Parameters:

212

- task_class: Task class to revoke

213

- revoke_until (datetime): Revoke until specific time (optional)

214

- revoke_once (bool): Revoke only next execution (default: False)

215

216

Returns:

217

None

218

"""

219

220

def restore_all(self, task_class):

221

"""

222

Restore all instances of a revoked task type.

223

224

Parameters:

225

- task_class: Task class to restore

226

227

Returns:

228

bool: True if tasks were revoked and restored

229

"""

230

231

def revoke_by_id(self, id, revoke_until=None, revoke_once=False):

232

"""

233

Revoke specific task by ID.

234

235

Parameters:

236

- id (str): Task ID to revoke

237

- revoke_until (datetime): Revoke until specific time (optional)

238

- revoke_once (bool): Revoke only this execution (default: False)

239

240

Returns:

241

None

242

"""

243

244

def restore_by_id(self, id):

245

"""

246

Restore specific task by ID.

247

248

Parameters:

249

- id (str): Task ID to restore

250

251

Returns:

252

bool: True if task was revoked and restored

253

"""

254

255

def is_revoked(self, task, timestamp=None, peek=True):

256

"""

257

Check if task or task type is revoked.

258

259

Parameters:

260

- task: Task instance, task class, or task ID

261

- timestamp (datetime): Check time (default: now)

262

- peek (bool): Don't consume revocation data (default: True)

263

264

Returns:

265

bool: True if revoked

266

"""

267

```

268

269

## Usage Examples

270

271

### Pre and Post Execution Hooks

272

273

```python

274

from huey import RedisHuey

275

import logging

276

import time

277

278

huey = RedisHuey('lifecycle-app')

279

280

# Set up logging

281

logging.basicConfig(level=logging.INFO)

282

logger = logging.getLogger('task-hooks')

283

284

@huey.pre_execute()

285

def log_task_start(task):

286

logger.info(f"Starting task: {task.name} (ID: {task.id})")

287

# Could add authentication, resource checks, etc.

288

289

@huey.post_execute()

290

def log_task_complete(task, task_value, exception):

291

if exception:

292

logger.error(f"Task {task.name} failed: {exception}")

293

else:

294

logger.info(f"Task {task.name} completed: {task_value}")

295

296

@huey.task()

297

def process_order(order_id):

298

time.sleep(2) # Simulate processing

299

return f"Order {order_id} processed"

300

301

# Task execution will trigger hooks

302

result = process_order(12345)

303

```

304

305

### Startup and Shutdown Hooks

306

307

```python

308

import redis

309

310

@huey.on_startup()

311

def initialize_connections():

312

logger.info("Consumer starting up - initializing connections")

313

# Initialize database connections, cache, etc.

314

global redis_client

315

redis_client = redis.Redis(host='localhost', port=6379, db=0)

316

317

@huey.on_shutdown()

318

def cleanup_resources():

319

logger.info("Consumer shutting down - cleaning up resources")

320

# Close connections, save state, etc.

321

if 'redis_client' in globals():

322

redis_client.close()

323

324

@huey.task()

325

def cache_data(key, value):

326

redis_client.set(key, value)

327

return f"Cached {key}"

328

```

329

330

### Signal Handling

331

332

```python

333

from huey import signals as S

334

335

@huey.signal(S.SIGNAL_ENQUEUED)

336

def task_enqueued(signal, task):

337

logger.info(f"Task enqueued: {task.name}")

338

339

@huey.signal(S.SIGNAL_ERROR)

340

def task_error(signal, task, exception):

341

logger.error(f"Task {task.name} failed: {exception}")

342

# Could send alerts, update metrics, etc.

343

344

@huey.signal(S.SIGNAL_RETRYING)

345

def task_retrying(signal, task):

346

logger.warning(f"Retrying task: {task.name} ({task.retries} retries left)")

347

348

@huey.signal(S.SIGNAL_COMPLETE)

349

def task_complete(signal, task):

350

logger.info(f"Task completed: {task.name}")

351

# Could update progress tracking, send notifications, etc.

352

```

353

354

### Task Chaining and Pipelines

355

356

```python

357

@huey.task()

358

def download_file(url):

359

# Download file logic

360

return f"downloaded_{url.split('/')[-1]}"

361

362

@huey.task()

363

def process_file(filename):

364

# Process file logic

365

return f"processed_{filename}"

366

367

@huey.task()

368

def cleanup_file(filename):

369

# Cleanup logic

370

return f"cleaned_{filename}"

371

372

@huey.task()

373

def send_notification(message):

374

# Send notification

375

return f"notified: {message}"

376

377

# Create task pipeline

378

task = download_file.s("http://example.com/data.csv")

379

task = task.then(process_file)

380

task = task.then(cleanup_file)

381

task = task.then(send_notification, "Processing complete")

382

task = task.error(send_notification, "Processing failed")

383

384

# Enqueue the pipeline

385

result = huey.enqueue(task)

386

```

387

388

### Task Revocation and Control

389

390

```python

391

@huey.task()

392

def long_running_task(data):

393

# Simulate long-running task

394

time.sleep(60)

395

return f"Processed {data}"

396

397

# Start some tasks

398

results = []

399

for i in range(5):

400

result = long_running_task(f"data_{i}")

401

results.append(result)

402

403

# Revoke specific task

404

results[0].revoke()

405

406

# Revoke all instances of a task type

407

huey.revoke_all(long_running_task.task_class, revoke_once=True)

408

409

# Check if task is revoked

410

if results[1].is_revoked():

411

print("Task was revoked")

412

results[1].restore() # Restore if needed

413

414

# Revoke task by ID

415

task_id = results[2].id

416

huey.revoke_by_id(task_id, revoke_once=True)

417

```

418

419

### Advanced Hook Patterns

420

421

```python

422

# Context-aware hooks

423

current_user = None

424

425

@huey.pre_execute()

426

def set_task_context(task):

427

global current_user

428

# Extract user context from task data

429

if hasattr(task, 'kwargs') and 'user_id' in task.kwargs:

430

current_user = get_user(task.kwargs['user_id'])

431

432

@huey.post_execute()

433

def clear_task_context(task, task_value, exception):

434

global current_user

435

current_user = None

436

437

# Performance monitoring hook

438

task_times = {}

439

440

@huey.pre_execute('performance_monitor')

441

def start_timer(task):

442

task_times[task.id] = time.time()

443

444

@huey.post_execute('performance_monitor')

445

def end_timer(task, task_value, exception):

446

if task.id in task_times:

447

duration = time.time() - task_times[task.id]

448

logger.info(f"Task {task.name} took {duration:.2f} seconds")

449

del task_times[task.id]

450

451

# Conditional execution hook

452

@huey.pre_execute()

453

def check_maintenance_mode(task):

454

if is_maintenance_mode() and not task.name.startswith('maintenance_'):

455

raise CancelExecution("System in maintenance mode")

456

```