0
# Scheduling
1
2
Task scheduling system for executing tasks at specific times, on recurring schedules, or based on custom triggers. Supports cron-like expressions, one-time scheduling, and extensible schedule sources.
3
4
## Capabilities
5
6
### Scheduler Management
7
8
Core scheduler class that coordinates between schedule sources and task brokers to execute scheduled tasks.
9
10
```python { .api }
11
class TaskiqScheduler:
12
"""
13
Main scheduler class that manages scheduled task execution.
14
15
Coordinates between schedule sources and brokers to execute tasks
16
at the appropriate times based on cron expressions, fixed times,
17
or custom scheduling logic.
18
"""
19
20
def __init__(
21
self,
22
broker: AsyncBroker,
23
sources: List[ScheduleSource],
24
) -> None:
25
"""
26
Initialize scheduler with broker and schedule sources.
27
28
Args:
29
broker: Broker instance for task execution
30
sources: List of schedule sources providing tasks
31
"""
32
33
async def startup(self) -> None:
34
"""
35
Start the scheduler and initialize all components.
36
37
Calls startup on the broker and prepares schedule sources
38
for task execution.
39
"""
40
41
async def shutdown(self) -> None:
42
"""
43
Shutdown the scheduler and cleanup resources.
44
45
Stops all scheduled tasks and shuts down the broker.
46
"""
47
48
async def run_forever(self) -> None:
49
"""
50
Run the scheduler continuously until shutdown.
51
52
Main scheduler loop that checks schedule sources
53
and executes tasks when they become ready.
54
"""
55
56
async def on_ready(
57
self,
58
source: ScheduleSource,
59
task: ScheduledTask,
60
) -> None:
61
"""
62
Handler called when a scheduled task is ready for execution.
63
64
Args:
65
source: Schedule source that triggered the task
66
task: Scheduled task to be executed
67
"""
68
```
69
70
### Scheduled Tasks
71
72
Representation of tasks with scheduling metadata including timing, arguments, and execution context.
73
74
```python { .api }
75
class ScheduledTask:
76
"""
77
Represents a task scheduled for future execution.
78
79
Contains task identification, scheduling information,
80
and execution parameters.
81
"""
82
83
task_name: str
84
"""Name of the task to execute."""
85
86
cron: Optional[str]
87
"""Cron expression for recurring schedules (e.g., '0 0 * * *' for daily)."""
88
89
time: Optional[datetime]
90
"""Specific datetime for one-time execution."""
91
92
labels: Dict[str, Any]
93
"""Additional labels and metadata for the task."""
94
95
args: Tuple[Any, ...]
96
"""Positional arguments to pass to the task function."""
97
98
kwargs: Dict[str, Any]
99
"""Keyword arguments to pass to the task function."""
100
101
def __init__(
102
self,
103
task_name: str,
104
cron: Optional[str] = None,
105
time: Optional[datetime] = None,
106
labels: Optional[Dict[str, Any]] = None,
107
args: Optional[Tuple[Any, ...]] = None,
108
kwargs: Optional[Dict[str, Any]] = None,
109
) -> None: ...
110
```
111
112
### Schedule Sources
113
114
Abstract interface and implementations for providing scheduled tasks to the scheduler.
115
116
```python { .api }
117
class ScheduleSource:
118
"""
119
Abstract base class for schedule sources.
120
121
Schedule sources provide scheduled tasks to the scheduler
122
and can implement custom scheduling logic.
123
"""
124
125
async def startup(self) -> None:
126
"""Initialize the schedule source."""
127
128
async def shutdown(self) -> None:
129
"""Cleanup the schedule source."""
130
131
async def get_schedules(self) -> List[ScheduledTask]:
132
"""
133
Get list of scheduled tasks.
134
135
Returns:
136
List of scheduled tasks from this source
137
"""
138
139
async def pre_send(self, task: ScheduledTask) -> None:
140
"""
141
Pre-processing hook before task execution.
142
143
Can modify task or raise ScheduledTaskCancelledError to cancel.
144
145
Args:
146
task: Scheduled task about to be executed
147
148
Raises:
149
ScheduledTaskCancelledError: To cancel task execution
150
"""
151
152
async def post_send(self, task: ScheduledTask) -> None:
153
"""
154
Post-processing hook after task is sent.
155
156
Args:
157
task: Scheduled task that was sent
158
"""
159
160
class LabelBasedScheduleSource(ScheduleSource):
161
"""
162
Schedule source that discovers tasks based on labels.
163
164
Automatically finds tasks with scheduling labels in the broker's
165
task registry and creates appropriate scheduled tasks.
166
"""
167
168
def __init__(
169
self,
170
broker: AsyncBroker,
171
schedule_label: str = "schedule",
172
) -> None:
173
"""
174
Initialize label-based schedule source.
175
176
Args:
177
broker: Broker containing tasks to schedule
178
schedule_label: Label name containing schedule information
179
"""
180
181
async def get_schedules(self) -> List[ScheduledTask]:
182
"""Extract scheduled tasks from broker task registry."""
183
```
184
185
## Usage Examples
186
187
### Basic Cron Scheduling
188
189
```python
190
import asyncio
191
from datetime import datetime
192
from taskiq import InMemoryBroker
193
from taskiq.scheduler import TaskiqScheduler, ScheduledTask
194
from taskiq.schedule_sources import LabelBasedScheduleSource
195
196
broker = InMemoryBroker()
197
198
# Define scheduled tasks using labels
199
@broker.task(schedule="0 8 * * *") # Daily at 8 AM
200
async def daily_report() -> None:
201
print(f"Generating daily report at {datetime.now()}")
202
# Generate and send report
203
204
@broker.task(schedule="*/15 * * * *") # Every 15 minutes
205
async def health_check() -> None:
206
print(f"Health check at {datetime.now()}")
207
# Check system health
208
209
# Set up scheduler
210
schedule_source = LabelBasedScheduleSource(broker)
211
scheduler = TaskiqScheduler(broker, [schedule_source])
212
213
async def run_scheduler():
214
await scheduler.startup()
215
try:
216
await scheduler.run_forever()
217
finally:
218
await scheduler.shutdown()
219
220
# Run scheduler
221
asyncio.run(run_scheduler())
222
```
223
224
### One-time Scheduled Tasks
225
226
```python
227
from datetime import datetime, timedelta
228
from taskiq.scheduler import ScheduledTask
229
230
# Schedule task for specific time
231
future_time = datetime.now() + timedelta(hours=2)
232
scheduled_task = ScheduledTask(
233
task_name="my_module:delayed_task",
234
time=future_time,
235
args=("important_data",),
236
kwargs={"priority": "high"},
237
labels={"category": "one-time"},
238
)
239
240
# Custom schedule source for one-time tasks
241
class OneTimeScheduleSource(ScheduleSource):
242
def __init__(self):
243
self.tasks = []
244
245
def add_task(self, task: ScheduledTask):
246
self.tasks.append(task)
247
248
async def get_schedules(self) -> List[ScheduledTask]:
249
return self.tasks
250
251
async def post_send(self, task: ScheduledTask) -> None:
252
# Remove one-time tasks after execution
253
if task.time and task in self.tasks:
254
self.tasks.remove(task)
255
256
# Use custom source
257
one_time_source = OneTimeScheduleSource()
258
one_time_source.add_task(scheduled_task)
259
260
scheduler = TaskiqScheduler(
261
broker,
262
[LabelBasedScheduleSource(broker), one_time_source]
263
)
264
```
265
266
### Advanced Scheduling with Custom Logic
267
268
```python
269
class ConditionalScheduleSource(ScheduleSource):
270
"""Schedule source with custom conditions."""
271
272
def __init__(self, broker: AsyncBroker):
273
self.broker = broker
274
self.last_execution = {}
275
276
async def get_schedules(self) -> List[ScheduledTask]:
277
schedules = []
278
279
# Only schedule backup task if it's been more than 6 hours
280
last_backup = self.last_execution.get("backup_task")
281
if (not last_backup or
282
datetime.now() - last_backup > timedelta(hours=6)):
283
schedules.append(ScheduledTask(
284
task_name="my_module:backup_data",
285
time=datetime.now() + timedelta(minutes=1),
286
labels={"type": "maintenance"},
287
))
288
289
return schedules
290
291
async def pre_send(self, task: ScheduledTask) -> None:
292
# Check system load before executing maintenance tasks
293
if task.labels.get("type") == "maintenance":
294
if await self._system_load_too_high():
295
raise ScheduledTaskCancelledError("System load too high")
296
297
async def post_send(self, task: ScheduledTask) -> None:
298
# Track execution time
299
self.last_execution[task.task_name.split(":")[-1]] = datetime.now()
300
301
async def _system_load_too_high(self) -> bool:
302
# Custom system load check
303
import psutil
304
return psutil.cpu_percent() > 80.0
305
```
306
307
### Integration with Task Labels
308
309
```python
310
# Define tasks with various scheduling options
311
@broker.task(
312
schedule="0 2 * * 0", # Weekly on Sunday at 2 AM
313
priority="low",
314
timeout=3600, # 1 hour timeout
315
)
316
async def weekly_cleanup() -> None:
317
"""Weekly maintenance task."""
318
print("Running weekly cleanup")
319
# Cleanup old data
320
321
@broker.task(
322
schedule="*/5 * * * *", # Every 5 minutes
323
max_retries=3,
324
retry_delay=30,
325
)
326
async def monitoring_task() -> None:
327
"""Frequent monitoring task with retry logic."""
328
# Monitor system metrics
329
pass
330
331
# Schedule source automatically discovers these tasks
332
schedule_source = LabelBasedScheduleSource(
333
broker,
334
schedule_label="schedule" # Look for 'schedule' label
335
)
336
```
337
338
## Types
339
340
```python { .api }
341
ScheduledTaskCancelledError = Exception
342
"""Exception raised to cancel scheduled task execution."""
343
```
344
345
## Cron Expression Format
346
347
Taskiq uses standard cron expression format with five fields:
348
349
```
350
┌───────────── minute (0 - 59)
351
│ ┌───────────── hour (0 - 23)
352
│ │ ┌───────────── day of month (1 - 31)
353
│ │ │ ┌───────────── month (1 - 12)
354
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
355
│ │ │ │ │
356
* * * * *
357
```
358
359
Common examples:
360
- `"0 0 * * *"` - Daily at midnight
361
- `"30 8 * * 1-5"` - Weekdays at 8:30 AM
362
- `"0 */4 * * *"` - Every 4 hours
363
- `"15 2 1 * *"` - First day of month at 2:15 AM
364
- `"0 9-17 * * 1-5"` - Hourly during business hours