or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-task-queue.mdexception-handling.mdindex.mdlocking-concurrency.mdresult-management.mdscheduling.mdstorage-backends.mdtask-lifecycle.md

core-task-queue.mddocs/

0

# Core Task Queue Operations

1

2

The primary task queue functionality that forms the foundation of Huey's asynchronous task execution system. These operations handle task creation, enqueueing, result handling, and basic queue management.

3

4

## Capabilities

5

6

### Task Creation and Decoration

7

8

Convert regular Python functions into asynchronous tasks that can be queued and executed by worker processes.

9

10

```python { .api }

11

def task(self, retries=0, retry_delay=0, priority=None, context=False,

12

name=None, expires=None, **kwargs):

13

"""

14

Decorator to convert a function into a task.

15

16

Parameters:

17

- retries (int): Number of times to retry failed tasks (default: 0)

18

- retry_delay (int): Delay in seconds between retries (default: 0)

19

- priority (int): Task priority, higher values processed first (optional)

20

- context (bool): Pass task instance as 'task' keyword argument (default: False)

21

- name (str): Custom task name (default: function name)

22

- expires (datetime/int): Task expiration time or seconds from now (optional)

23

- **kwargs: Additional task configuration options

24

25

Returns:

26

TaskWrapper instance that can be called to enqueue tasks

27

"""

28

29

def context_task(self, obj, as_argument=False, **kwargs):

30

"""

31

Decorator for tasks that require context management.

32

33

Parameters:

34

- obj: Context manager object to use

35

- as_argument (bool): Pass context as first argument (default: False)

36

- **kwargs: Additional task configuration options

37

38

Returns:

39

Task decorator function

40

"""

41

```

42

43

### Task Enqueueing and Execution

44

45

Add tasks to the queue and manage their execution lifecycle.

46

47

```python { .api }

48

def enqueue(self, task):

49

"""

50

Add a task to the execution queue.

51

52

Parameters:

53

- task (Task): Task instance to enqueue

54

55

Returns:

56

Result instance if results are enabled, None otherwise

57

"""

58

59

def dequeue(self):

60

"""

61

Remove and return the next task from the queue.

62

63

Returns:

64

Task instance or None if queue is empty

65

"""

66

67

def execute(self, task, timestamp=None):

68

"""

69

Execute a task immediately.

70

71

Parameters:

72

- task (Task): Task to execute

73

- timestamp (datetime): Execution timestamp (default: current time)

74

75

Returns:

76

Task result value

77

"""

78

```

79

80

### Queue Status and Management

81

82

Monitor queue status and perform maintenance operations.

83

84

```python { .api }

85

def pending_count(self):

86

"""

87

Get the number of pending tasks in the queue.

88

89

Returns:

90

int: Number of tasks waiting to be processed

91

"""

92

93

def pending(self, limit=None):

94

"""

95

Get list of pending tasks.

96

97

Parameters:

98

- limit (int): Maximum number of tasks to return (optional)

99

100

Returns:

101

List of Task instances

102

"""

103

104

def flush(self):

105

"""

106

Remove all tasks and data from all storage areas.

107

108

Returns:

109

None

110

"""

111

112

def __len__(self):

113

"""

114

Get queue length (same as pending_count).

115

116

Returns:

117

int: Number of pending tasks

118

"""

119

```

120

121

### Data Storage Operations

122

123

Low-level key-value storage operations for custom data persistence.

124

125

```python { .api }

126

def put(self, key, data):

127

"""

128

Store arbitrary data with a key.

129

130

Parameters:

131

- key (str): Storage key

132

- data: Data to store (will be serialized)

133

134

Returns:

135

bool: Success status

136

"""

137

138

def get(self, key, peek=False):

139

"""

140

Retrieve data by key.

141

142

Parameters:

143

- key (str): Storage key

144

- peek (bool): Don't remove data if True (default: False)

145

146

Returns:

147

Stored data or None if not found

148

"""

149

150

def delete(self, key):

151

"""

152

Delete data by key.

153

154

Parameters:

155

- key (str): Storage key to delete

156

157

Returns:

158

bool: True if key existed and was deleted

159

"""

160

161

def put_if_empty(self, key, data):

162

"""

163

Store data only if key doesn't exist.

164

165

Parameters:

166

- key (str): Storage key

167

- data: Data to store

168

169

Returns:

170

bool: True if data was stored, False if key already existed

171

"""

172

```

173

174

### Task Serialization

175

176

Low-level task serialization methods for custom storage implementations and debugging.

177

178

```python { .api }

179

def serialize_task(self, task):

180

"""

181

Serialize a task to bytes for storage.

182

183

Parameters:

184

- task (Task): Task instance to serialize

185

186

Returns:

187

bytes: Serialized task data

188

"""

189

190

def deserialize_task(self, data):

191

"""

192

Deserialize bytes to a task instance.

193

194

Parameters:

195

- data (bytes): Serialized task data

196

197

Returns:

198

Task: Deserialized task instance

199

"""

200

```

201

202

### Raw Storage Operations

203

204

Direct storage access methods for advanced use cases and custom implementations.

205

206

```python { .api }

207

def get_raw(self, key, peek=False):

208

"""

209

Get raw data from storage without deserialization.

210

211

Parameters:

212

- key (str): Storage key

213

- peek (bool): Don't remove data if True (default: False)

214

215

Returns:

216

bytes or None: Raw stored data

217

"""

218

219

def put_result(self, key, data):

220

"""

221

Store result data directly in result storage.

222

223

Parameters:

224

- key (str): Result key

225

- data: Result data to store

226

227

Returns:

228

bool: Success status

229

"""

230

```

231

232

### Consumer Management

233

234

Create and configure consumer processes for task execution.

235

236

```python { .api }

237

def create_consumer(self, **options):

238

"""

239

Create a consumer instance for processing tasks.

240

241

Parameters:

242

- **options: Consumer configuration options

243

244

Returns:

245

Consumer instance

246

"""

247

```

248

249

## Usage Examples

250

251

### Basic Task Definition and Execution

252

253

```python

254

from huey import RedisHuey

255

256

huey = RedisHuey('my-app')

257

258

@huey.task()

259

def send_email(to, subject, body):

260

# Send email logic here

261

return f"Email sent to {to}"

262

263

# Enqueue task

264

result = send_email('user@example.com', 'Hello', 'Message body')

265

266

# Get result (blocks until ready)

267

message = result()

268

print(message) # "Email sent to user@example.com"

269

```

270

271

### Task with Retry Configuration

272

273

```python

274

@huey.task(retries=3, retry_delay=60)

275

def process_payment(amount, card_token):

276

# Payment processing that might fail

277

if random.random() < 0.3: # 30% failure rate

278

raise Exception("Payment gateway error")

279

return f"Processed ${amount}"

280

281

# This task will retry up to 3 times with 60-second delays

282

result = process_payment(100.00, 'tok_123')

283

```

284

285

### Context Task with Resource Management

286

287

```python

288

import sqlite3

289

290

@huey.context_task(sqlite3.connect('app.db'), as_argument=True)

291

def update_user_stats(db_conn, user_id, stats):

292

cursor = db_conn.cursor()

293

cursor.execute("UPDATE users SET stats=? WHERE id=?", (stats, user_id))

294

db_conn.commit()

295

return f"Updated user {user_id}"

296

297

# Database connection is automatically managed

298

result = update_user_stats(123, {'views': 50, 'clicks': 5})

299

```