0
# Core Task Queue Operations
1
2
The primary task queue functionality that forms the foundation of Huey's asynchronous task execution system. These operations handle task creation, enqueueing, result handling, and basic queue management.
3
4
## Capabilities
5
6
### Task Creation and Decoration
7
8
Convert regular Python functions into asynchronous tasks that can be queued and executed by worker processes.
9
10
```python { .api }
11
def task(self, retries=0, retry_delay=0, priority=None, context=False,
12
name=None, expires=None, **kwargs):
13
"""
14
Decorator to convert a function into a task.
15
16
Parameters:
17
- retries (int): Number of times to retry failed tasks (default: 0)
18
- retry_delay (int): Delay in seconds between retries (default: 0)
19
- priority (int): Task priority, higher values processed first (optional)
20
- context (bool): Pass task instance as 'task' keyword argument (default: False)
21
- name (str): Custom task name (default: function name)
22
- expires (datetime/int): Task expiration time or seconds from now (optional)
23
- **kwargs: Additional task configuration options
24
25
Returns:
26
TaskWrapper instance that can be called to enqueue tasks
27
"""
28
29
def context_task(self, obj, as_argument=False, **kwargs):
30
"""
31
Decorator for tasks that require context management.
32
33
Parameters:
34
- obj: Context manager object to use
35
- as_argument (bool): Pass context as first argument (default: False)
36
- **kwargs: Additional task configuration options
37
38
Returns:
39
Task decorator function
40
"""
41
```
42
43
### Task Enqueueing and Execution
44
45
Add tasks to the queue and manage their execution lifecycle.
46
47
```python { .api }
48
def enqueue(self, task):
49
"""
50
Add a task to the execution queue.
51
52
Parameters:
53
- task (Task): Task instance to enqueue
54
55
Returns:
56
Result instance if results are enabled, None otherwise
57
"""
58
59
def dequeue(self):
60
"""
61
Remove and return the next task from the queue.
62
63
Returns:
64
Task instance or None if queue is empty
65
"""
66
67
def execute(self, task, timestamp=None):
68
"""
69
Execute a task immediately.
70
71
Parameters:
72
- task (Task): Task to execute
73
- timestamp (datetime): Execution timestamp (default: current time)
74
75
Returns:
76
Task result value
77
"""
78
```
79
80
### Queue Status and Management
81
82
Monitor queue status and perform maintenance operations.
83
84
```python { .api }
85
def pending_count(self):
86
"""
87
Get the number of pending tasks in the queue.
88
89
Returns:
90
int: Number of tasks waiting to be processed
91
"""
92
93
def pending(self, limit=None):
94
"""
95
Get list of pending tasks.
96
97
Parameters:
98
- limit (int): Maximum number of tasks to return (optional)
99
100
Returns:
101
List of Task instances
102
"""
103
104
def flush(self):
105
"""
106
Remove all tasks and data from all storage areas.
107
108
Returns:
109
None
110
"""
111
112
def __len__(self):
113
"""
114
Get queue length (same as pending_count).
115
116
Returns:
117
int: Number of pending tasks
118
"""
119
```
120
121
### Data Storage Operations
122
123
Low-level key-value storage operations for custom data persistence.
124
125
```python { .api }
126
def put(self, key, data):
127
"""
128
Store arbitrary data with a key.
129
130
Parameters:
131
- key (str): Storage key
132
- data: Data to store (will be serialized)
133
134
Returns:
135
bool: Success status
136
"""
137
138
def get(self, key, peek=False):
139
"""
140
Retrieve data by key.
141
142
Parameters:
143
- key (str): Storage key
144
- peek (bool): Don't remove data if True (default: False)
145
146
Returns:
147
Stored data or None if not found
148
"""
149
150
def delete(self, key):
151
"""
152
Delete data by key.
153
154
Parameters:
155
- key (str): Storage key to delete
156
157
Returns:
158
bool: True if key existed and was deleted
159
"""
160
161
def put_if_empty(self, key, data):
162
"""
163
Store data only if key doesn't exist.
164
165
Parameters:
166
- key (str): Storage key
167
- data: Data to store
168
169
Returns:
170
bool: True if data was stored, False if key already existed
171
"""
172
```
173
174
### Task Serialization
175
176
Low-level task serialization methods for custom storage implementations and debugging.
177
178
```python { .api }
179
def serialize_task(self, task):
180
"""
181
Serialize a task to bytes for storage.
182
183
Parameters:
184
- task (Task): Task instance to serialize
185
186
Returns:
187
bytes: Serialized task data
188
"""
189
190
def deserialize_task(self, data):
191
"""
192
Deserialize bytes to a task instance.
193
194
Parameters:
195
- data (bytes): Serialized task data
196
197
Returns:
198
Task: Deserialized task instance
199
"""
200
```
201
202
### Raw Storage Operations
203
204
Direct storage access methods for advanced use cases and custom implementations.
205
206
```python { .api }
207
def get_raw(self, key, peek=False):
208
"""
209
Get raw data from storage without deserialization.
210
211
Parameters:
212
- key (str): Storage key
213
- peek (bool): Don't remove data if True (default: False)
214
215
Returns:
216
bytes or None: Raw stored data
217
"""
218
219
def put_result(self, key, data):
220
"""
221
Store result data directly in result storage.
222
223
Parameters:
224
- key (str): Result key
225
- data: Result data to store
226
227
Returns:
228
bool: Success status
229
"""
230
```
231
232
### Consumer Management
233
234
Create and configure consumer processes for task execution.
235
236
```python { .api }
237
def create_consumer(self, **options):
238
"""
239
Create a consumer instance for processing tasks.
240
241
Parameters:
242
- **options: Consumer configuration options
243
244
Returns:
245
Consumer instance
246
"""
247
```
248
249
## Usage Examples
250
251
### Basic Task Definition and Execution
252
253
```python
254
from huey import RedisHuey
255
256
huey = RedisHuey('my-app')
257
258
@huey.task()
259
def send_email(to, subject, body):
260
# Send email logic here
261
return f"Email sent to {to}"
262
263
# Enqueue task
264
result = send_email('user@example.com', 'Hello', 'Message body')
265
266
# Get result (blocks until ready)
267
message = result()
268
print(message) # "Email sent to user@example.com"
269
```
270
271
### Task with Retry Configuration
272
273
```python
274
@huey.task(retries=3, retry_delay=60)
275
def process_payment(amount, card_token):
276
# Payment processing that might fail
277
if random.random() < 0.3: # 30% failure rate
278
raise Exception("Payment gateway error")
279
return f"Processed ${amount}"
280
281
# This task will retry up to 3 times with 60-second delays
282
result = process_payment(100.00, 'tok_123')
283
```
284
285
### Context Task with Resource Management
286
287
```python
288
import sqlite3
289
290
@huey.context_task(sqlite3.connect('app.db'), as_argument=True)
291
def update_user_stats(db_conn, user_id, stats):
292
cursor = db_conn.cursor()
293
cursor.execute("UPDATE users SET stats=? WHERE id=?", (stats, user_id))
294
db_conn.commit()
295
return f"Updated user {user_id}"
296
297
# Database connection is automatically managed
298
result = update_user_stats(123, {'views': 50, 'clicks': 5})
299
```