0
# Schedule Sources
1
2
TaskIQ-Redis provides schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support. Schedule sources handle task scheduling, execution timing, and cleanup operations.
3
4
## Capabilities
5
6
### List Redis Schedule Source (Recommended)
7
8
Array-based schedule source that provides efficient scheduling with migration support. This is the recommended replacement for the deprecated hash-based `RedisScheduleSource`.
9
10
```python { .api }
11
class ListRedisScheduleSource(ScheduleSource):
12
def __init__(
13
self,
14
url: str,
15
prefix: str = "schedule",
16
max_connection_pool_size: Optional[int] = None,
17
serializer: Optional[TaskiqSerializer] = None,
18
buffer_size: int = 50,
19
skip_past_schedules: bool = False,
20
**connection_kwargs: Any,
21
) -> None:
22
"""
23
Array-based schedule source for Redis.
24
25
Parameters:
26
- url: Redis connection URL
27
- prefix: Prefix for Redis keys (default: "schedule")
28
- max_connection_pool_size: Maximum connections in pool
29
- serializer: Custom serializer (default: PickleSerializer)
30
- buffer_size: Buffer size for retrieving schedules (default: 50)
31
- skip_past_schedules: Skip schedules in the past (default: False)
32
- connection_kwargs: Additional Redis connection arguments
33
"""
34
35
async def startup(self) -> None:
36
"""Initialize the schedule source."""
37
38
async def add_schedule(self, schedule: ScheduledTask) -> None:
39
"""
40
Add a scheduled task.
41
42
Parameters:
43
- schedule: Scheduled task to add
44
"""
45
46
async def get_schedules(self) -> List[ScheduledTask]:
47
"""
48
Retrieve all scheduled tasks.
49
50
Returns:
51
- List[ScheduledTask]: All currently scheduled tasks
52
"""
53
54
async def delete_schedule(self, schedule_id: str) -> None:
55
"""
56
Remove a scheduled task.
57
58
Parameters:
59
- schedule_id: Unique identifier of schedule to remove
60
"""
61
62
async def post_send(self, task: ScheduledTask) -> None:
63
"""
64
Clean up after task execution.
65
66
Parameters:
67
- task: Task that was executed
68
"""
69
70
def with_migrate_from(
71
self,
72
source: ScheduleSource,
73
delete_schedules: bool = True
74
) -> Self:
75
"""
76
Enable migration from another schedule source.
77
78
Parameters:
79
- source: Source schedule source to migrate from
80
- delete_schedules: Delete schedules from source after migration
81
82
Returns:
83
- Self: Schedule source with migration enabled
84
"""
85
```
86
87
**Usage Example:**
88
89
```python
90
from taskiq_redis import ListRedisScheduleSource
91
from taskiq import ScheduledTask
92
from datetime import datetime, timedelta
93
94
# Create schedule source
95
schedule_source = ListRedisScheduleSource(
96
url="redis://localhost:6379",
97
prefix="my_schedules",
98
buffer_size=100
99
)
100
101
# Add a scheduled task
102
schedule = ScheduledTask(
103
task_id="daily-report",
104
task_name="generate_report",
105
schedule_time=datetime.now() + timedelta(hours=24),
106
args=["daily"],
107
kwargs={"format": "pdf"}
108
)
109
110
await schedule_source.add_schedule(schedule)
111
112
# Get all scheduled tasks
113
schedules = await schedule_source.get_schedules()
114
print(f"Found {len(schedules)} scheduled tasks")
115
116
# Delete a schedule
117
await schedule_source.delete_schedule("daily-report")
118
```
119
120
### Migration from Deprecated Schedule Source
121
122
Migrate from the deprecated `RedisScheduleSource` to the recommended `ListRedisScheduleSource`:
123
124
```python
125
from taskiq_redis import ListRedisScheduleSource, RedisScheduleSource
126
127
# Old deprecated schedule source
128
old_source = RedisScheduleSource("redis://localhost:6379")
129
130
# New recommended schedule source with migration
131
new_source = ListRedisScheduleSource(
132
url="redis://localhost:6379"
133
).with_migrate_from(old_source, delete_schedules=True)
134
135
# Migration will happen automatically during startup
136
await new_source.startup()
137
```
138
139
### Redis Schedule Source (Deprecated)
140
141
Hash-based schedule source (deprecated, use `ListRedisScheduleSource` instead).
142
143
```python { .api }
144
class RedisScheduleSource(ScheduleSource):
145
def __init__(
146
self,
147
url: str,
148
prefix: str = "schedule",
149
buffer_size: int = 50,
150
max_connection_pool_size: Optional[int] = None,
151
serializer: Optional[TaskiqSerializer] = None,
152
**connection_kwargs: Any,
153
) -> None:
154
"""
155
Hash-based schedule source for Redis (DEPRECATED).
156
157
Use ListRedisScheduleSource instead.
158
"""
159
160
async def delete_schedule(self, schedule_id: str) -> None:
161
"""Remove a scheduled task."""
162
163
async def add_schedule(self, schedule: ScheduledTask) -> None:
164
"""Add a scheduled task."""
165
166
async def get_schedules(self) -> List[ScheduledTask]:
167
"""Retrieve all scheduled tasks."""
168
169
async def post_send(self, task: ScheduledTask) -> None:
170
"""Clean up after task execution."""
171
172
async def shutdown(self) -> None:
173
"""Shut down the schedule source."""
174
```
175
176
### Redis Cluster Schedule Source
177
178
Schedule source for Redis Cluster deployments.
179
180
```python { .api }
181
class RedisClusterScheduleSource(ScheduleSource):
182
def __init__(
183
self,
184
url: str,
185
prefix: str = "schedule",
186
serializer: Optional[TaskiqSerializer] = None,
187
**connection_kwargs: Any,
188
) -> None:
189
"""
190
Schedule source for Redis Cluster.
191
192
Parameters:
193
- url: Redis cluster connection URL
194
- prefix: Prefix for Redis keys (default: "schedule")
195
- serializer: Custom serializer (default: PickleSerializer)
196
- connection_kwargs: Additional Redis cluster connection arguments
197
"""
198
199
async def delete_schedule(self, schedule_id: str) -> None:
200
"""Remove a scheduled task from Redis cluster."""
201
202
async def add_schedule(self, schedule: ScheduledTask) -> None:
203
"""Add a scheduled task to Redis cluster."""
204
205
async def get_schedules(self) -> List[ScheduledTask]:
206
"""Retrieve all scheduled tasks from Redis cluster."""
207
208
async def post_send(self, task: ScheduledTask) -> None:
209
"""Clean up after task execution in Redis cluster."""
210
211
async def shutdown(self) -> None:
212
"""Shut down the cluster schedule source."""
213
```
214
215
**Usage Example:**
216
217
```python
218
from taskiq_redis import RedisClusterScheduleSource
219
220
# Create cluster schedule source
221
schedule_source = RedisClusterScheduleSource(
222
url="redis://cluster-node1:6379",
223
prefix="cluster_schedules"
224
)
225
226
# Use same API as standard schedule source
227
await schedule_source.add_schedule(schedule)
228
schedules = await schedule_source.get_schedules()
229
```
230
231
### Redis Sentinel Schedule Source
232
233
Schedule source for Redis Sentinel deployments with high availability.
234
235
```python { .api }
236
class RedisSentinelScheduleSource(ScheduleSource):
237
def __init__(
238
self,
239
sentinels: List[Tuple[str, int]],
240
master_name: str,
241
prefix: str = "schedule",
242
buffer_size: int = 50,
243
serializer: Optional[TaskiqSerializer] = None,
244
min_other_sentinels: int = 0,
245
sentinel_kwargs: Optional[Any] = None,
246
**connection_kwargs: Any,
247
) -> None:
248
"""
249
Schedule source for Redis Sentinel.
250
251
Parameters:
252
- sentinels: List of sentinel (host, port) pairs
253
- master_name: Sentinel master name
254
- prefix: Prefix for Redis keys (default: "schedule")
255
- buffer_size: Buffer size for retrieving schedules (default: 50)
256
- serializer: Custom serializer (default: PickleSerializer)
257
- min_other_sentinels: Minimum other sentinels required (default: 0)
258
- sentinel_kwargs: Additional sentinel configuration
259
- connection_kwargs: Additional Redis connection arguments
260
"""
261
262
async def delete_schedule(self, schedule_id: str) -> None:
263
"""Remove a scheduled task via Sentinel."""
264
265
async def add_schedule(self, schedule: ScheduledTask) -> None:
266
"""Add a scheduled task via Sentinel."""
267
268
async def get_schedules(self) -> List[ScheduledTask]:
269
"""Retrieve all scheduled tasks via Sentinel."""
270
271
async def post_send(self, task: ScheduledTask) -> None:
272
"""Clean up after task execution via Sentinel."""
273
274
async def shutdown(self) -> None:
275
"""Shut down the Sentinel schedule source."""
276
```
277
278
**Usage Example:**
279
280
```python
281
from taskiq_redis import RedisSentinelScheduleSource
282
283
# Create high-availability schedule source
284
schedule_source = RedisSentinelScheduleSource(
285
sentinels=[
286
("sentinel1", 26379),
287
("sentinel2", 26379),
288
("sentinel3", 26379)
289
],
290
master_name="mymaster",
291
prefix="ha_schedules",
292
min_other_sentinels=1
293
)
294
295
# Use same API with automatic failover
296
await schedule_source.add_schedule(schedule)
297
schedules = await schedule_source.get_schedules()
298
```
299
300
## Scheduling Patterns
301
302
### One-time Scheduled Tasks
303
304
```python
305
from taskiq_redis import ListRedisScheduleSource
306
from taskiq import ScheduledTask
307
from datetime import datetime, timedelta
308
309
schedule_source = ListRedisScheduleSource("redis://localhost:6379")
310
311
# Schedule task to run in 1 hour
312
future_time = datetime.now() + timedelta(hours=1)
313
schedule = ScheduledTask(
314
task_id="one-time-report",
315
task_name="generate_report",
316
schedule_time=future_time,
317
args=["monthly"],
318
kwargs={"format": "excel"}
319
)
320
321
await schedule_source.add_schedule(schedule)
322
```
323
324
### Recurring Tasks with Custom Logic
325
326
```python
327
from datetime import datetime, timedelta
328
329
# Schedule daily backups
330
async def schedule_daily_backup():
331
tomorrow = datetime.now() + timedelta(days=1)
332
tomorrow = tomorrow.replace(hour=2, minute=0, second=0, microsecond=0)
333
334
schedule = ScheduledTask(
335
task_id=f"backup-{tomorrow.strftime('%Y%m%d')}",
336
task_name="create_backup",
337
schedule_time=tomorrow,
338
args=["full"],
339
kwargs={"compress": True}
340
)
341
342
await schedule_source.add_schedule(schedule)
343
344
# Add initial schedule
345
await schedule_daily_backup()
346
```
347
348
### Schedule Management
349
350
```python
351
from taskiq_redis import ListRedisScheduleSource
352
353
schedule_source = ListRedisScheduleSource("redis://localhost:6379")
354
355
# Get all pending schedules
356
all_schedules = await schedule_source.get_schedules()
357
print(f"Total schedules: {len(all_schedules)}")
358
359
# Find specific schedule
360
target_schedule = None
361
for schedule in all_schedules:
362
if schedule.task_name == "generate_report":
363
target_schedule = schedule
364
break
365
366
if target_schedule:
367
# Update schedule (delete and re-add with new time)
368
await schedule_source.delete_schedule(target_schedule.task_id)
369
370
# Reschedule for later
371
target_schedule.schedule_time = datetime.now() + timedelta(hours=2)
372
await schedule_source.add_schedule(target_schedule)
373
374
# Clean up old schedules
375
now = datetime.now()
376
for schedule in all_schedules:
377
if schedule.schedule_time < now - timedelta(days=7):
378
await schedule_source.delete_schedule(schedule.task_id)
379
```
380
381
## Types
382
383
```python { .api }
384
from typing import List, Optional, Any, Tuple
385
from taskiq.abc.schedule_source import ScheduleSource
386
from taskiq.abc.serializer import TaskiqSerializer
387
from taskiq import ScheduledTask
388
```