or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdindex.mdresult-backends.mdschedule-sources.md

schedule-sources.mddocs/

0

# Schedule Sources

1

2

TaskIQ-Redis provides schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support. Schedule sources handle task scheduling, execution timing, and cleanup operations.

3

4

## Capabilities

5

6

### List Redis Schedule Source (Recommended)

7

8

Array-based schedule source that provides efficient scheduling with migration support. This is the recommended replacement for the deprecated hash-based `RedisScheduleSource`.

9

10

```python { .api }

11

class ListRedisScheduleSource(ScheduleSource):

12

def __init__(

13

self,

14

url: str,

15

prefix: str = "schedule",

16

max_connection_pool_size: Optional[int] = None,

17

serializer: Optional[TaskiqSerializer] = None,

18

buffer_size: int = 50,

19

skip_past_schedules: bool = False,

20

**connection_kwargs: Any,

21

) -> None:

22

"""

23

Array-based schedule source for Redis.

24

25

Parameters:

26

- url: Redis connection URL

27

- prefix: Prefix for Redis keys (default: "schedule")

28

- max_connection_pool_size: Maximum connections in pool

29

- serializer: Custom serializer (default: PickleSerializer)

30

- buffer_size: Buffer size for retrieving schedules (default: 50)

31

- skip_past_schedules: Skip schedules in the past (default: False)

32

- connection_kwargs: Additional Redis connection arguments

33

"""

34

35

async def startup(self) -> None:

36

"""Initialize the schedule source."""

37

38

async def add_schedule(self, schedule: ScheduledTask) -> None:

39

"""

40

Add a scheduled task.

41

42

Parameters:

43

- schedule: Scheduled task to add

44

"""

45

46

async def get_schedules(self) -> List[ScheduledTask]:

47

"""

48

Retrieve all scheduled tasks.

49

50

Returns:

51

- List[ScheduledTask]: All currently scheduled tasks

52

"""

53

54

async def delete_schedule(self, schedule_id: str) -> None:

55

"""

56

Remove a scheduled task.

57

58

Parameters:

59

- schedule_id: Unique identifier of schedule to remove

60

"""

61

62

async def post_send(self, task: ScheduledTask) -> None:

63

"""

64

Clean up after task execution.

65

66

Parameters:

67

- task: Task that was executed

68

"""

69

70

def with_migrate_from(

71

self,

72

source: ScheduleSource,

73

delete_schedules: bool = True

74

) -> Self:

75

"""

76

Enable migration from another schedule source.

77

78

Parameters:

79

- source: Source schedule source to migrate from

80

- delete_schedules: Delete schedules from source after migration

81

82

Returns:

83

- Self: Schedule source with migration enabled

84

"""

85

```

86

87

**Usage Example:**

88

89

```python

90

from taskiq_redis import ListRedisScheduleSource

91

from taskiq import ScheduledTask

92

from datetime import datetime, timedelta

93

94

# Create schedule source

95

schedule_source = ListRedisScheduleSource(

96

url="redis://localhost:6379",

97

prefix="my_schedules",

98

buffer_size=100

99

)

100

101

# Add a scheduled task

102

schedule = ScheduledTask(

103

task_id="daily-report",

104

task_name="generate_report",

105

schedule_time=datetime.now() + timedelta(hours=24),

106

args=["daily"],

107

kwargs={"format": "pdf"}

108

)

109

110

await schedule_source.add_schedule(schedule)

111

112

# Get all scheduled tasks

113

schedules = await schedule_source.get_schedules()

114

print(f"Found {len(schedules)} scheduled tasks")

115

116

# Delete a schedule

117

await schedule_source.delete_schedule("daily-report")

118

```

119

120

### Migration from Deprecated Schedule Source

121

122

Migrate from the deprecated `RedisScheduleSource` to the recommended `ListRedisScheduleSource`:

123

124

```python

125

from taskiq_redis import ListRedisScheduleSource, RedisScheduleSource

126

127

# Old deprecated schedule source

128

old_source = RedisScheduleSource("redis://localhost:6379")

129

130

# New recommended schedule source with migration

131

new_source = ListRedisScheduleSource(

132

url="redis://localhost:6379"

133

).with_migrate_from(old_source, delete_schedules=True)

134

135

# Migration will happen automatically during startup

136

await new_source.startup()

137

```

138

139

### Redis Schedule Source (Deprecated)

140

141

Hash-based schedule source (deprecated, use `ListRedisScheduleSource` instead).

142

143

```python { .api }

144

class RedisScheduleSource(ScheduleSource):

145

def __init__(

146

self,

147

url: str,

148

prefix: str = "schedule",

149

buffer_size: int = 50,

150

max_connection_pool_size: Optional[int] = None,

151

serializer: Optional[TaskiqSerializer] = None,

152

**connection_kwargs: Any,

153

) -> None:

154

"""

155

Hash-based schedule source for Redis (DEPRECATED).

156

157

Use ListRedisScheduleSource instead.

158

"""

159

160

async def delete_schedule(self, schedule_id: str) -> None:

161

"""Remove a scheduled task."""

162

163

async def add_schedule(self, schedule: ScheduledTask) -> None:

164

"""Add a scheduled task."""

165

166

async def get_schedules(self) -> List[ScheduledTask]:

167

"""Retrieve all scheduled tasks."""

168

169

async def post_send(self, task: ScheduledTask) -> None:

170

"""Clean up after task execution."""

171

172

async def shutdown(self) -> None:

173

"""Shut down the schedule source."""

174

```

175

176

### Redis Cluster Schedule Source

177

178

Schedule source for Redis Cluster deployments.

179

180

```python { .api }

181

class RedisClusterScheduleSource(ScheduleSource):

182

def __init__(

183

self,

184

url: str,

185

prefix: str = "schedule",

186

serializer: Optional[TaskiqSerializer] = None,

187

**connection_kwargs: Any,

188

) -> None:

189

"""

190

Schedule source for Redis Cluster.

191

192

Parameters:

193

- url: Redis cluster connection URL

194

- prefix: Prefix for Redis keys (default: "schedule")

195

- serializer: Custom serializer (default: PickleSerializer)

196

- connection_kwargs: Additional Redis cluster connection arguments

197

"""

198

199

async def delete_schedule(self, schedule_id: str) -> None:

200

"""Remove a scheduled task from Redis cluster."""

201

202

async def add_schedule(self, schedule: ScheduledTask) -> None:

203

"""Add a scheduled task to Redis cluster."""

204

205

async def get_schedules(self) -> List[ScheduledTask]:

206

"""Retrieve all scheduled tasks from Redis cluster."""

207

208

async def post_send(self, task: ScheduledTask) -> None:

209

"""Clean up after task execution in Redis cluster."""

210

211

async def shutdown(self) -> None:

212

"""Shut down the cluster schedule source."""

213

```

214

215

**Usage Example:**

216

217

```python

218

from taskiq_redis import RedisClusterScheduleSource

219

220

# Create cluster schedule source

221

schedule_source = RedisClusterScheduleSource(

222

url="redis://cluster-node1:6379",

223

prefix="cluster_schedules"

224

)

225

226

# Use same API as standard schedule source

227

await schedule_source.add_schedule(schedule)

228

schedules = await schedule_source.get_schedules()

229

```

230

231

### Redis Sentinel Schedule Source

232

233

Schedule source for Redis Sentinel deployments with high availability.

234

235

```python { .api }

236

class RedisSentinelScheduleSource(ScheduleSource):

237

def __init__(

238

self,

239

sentinels: List[Tuple[str, int]],

240

master_name: str,

241

prefix: str = "schedule",

242

buffer_size: int = 50,

243

serializer: Optional[TaskiqSerializer] = None,

244

min_other_sentinels: int = 0,

245

sentinel_kwargs: Optional[Any] = None,

246

**connection_kwargs: Any,

247

) -> None:

248

"""

249

Schedule source for Redis Sentinel.

250

251

Parameters:

252

- sentinels: List of sentinel (host, port) pairs

253

- master_name: Sentinel master name

254

- prefix: Prefix for Redis keys (default: "schedule")

255

- buffer_size: Buffer size for retrieving schedules (default: 50)

256

- serializer: Custom serializer (default: PickleSerializer)

257

- min_other_sentinels: Minimum other sentinels required (default: 0)

258

- sentinel_kwargs: Additional sentinel configuration

259

- connection_kwargs: Additional Redis connection arguments

260

"""

261

262

async def delete_schedule(self, schedule_id: str) -> None:

263

"""Remove a scheduled task via Sentinel."""

264

265

async def add_schedule(self, schedule: ScheduledTask) -> None:

266

"""Add a scheduled task via Sentinel."""

267

268

async def get_schedules(self) -> List[ScheduledTask]:

269

"""Retrieve all scheduled tasks via Sentinel."""

270

271

async def post_send(self, task: ScheduledTask) -> None:

272

"""Clean up after task execution via Sentinel."""

273

274

async def shutdown(self) -> None:

275

"""Shut down the Sentinel schedule source."""

276

```

277

278

**Usage Example:**

279

280

```python

281

from taskiq_redis import RedisSentinelScheduleSource

282

283

# Create high-availability schedule source

284

schedule_source = RedisSentinelScheduleSource(

285

sentinels=[

286

("sentinel1", 26379),

287

("sentinel2", 26379),

288

("sentinel3", 26379)

289

],

290

master_name="mymaster",

291

prefix="ha_schedules",

292

min_other_sentinels=1

293

)

294

295

# Use same API with automatic failover

296

await schedule_source.add_schedule(schedule)

297

schedules = await schedule_source.get_schedules()

298

```

299

300

## Scheduling Patterns

301

302

### One-time Scheduled Tasks

303

304

```python

305

from taskiq_redis import ListRedisScheduleSource

306

from taskiq import ScheduledTask

307

from datetime import datetime, timedelta

308

309

schedule_source = ListRedisScheduleSource("redis://localhost:6379")

310

311

# Schedule task to run in 1 hour

312

future_time = datetime.now() + timedelta(hours=1)

313

schedule = ScheduledTask(

314

task_id="one-time-report",

315

task_name="generate_report",

316

schedule_time=future_time,

317

args=["monthly"],

318

kwargs={"format": "excel"}

319

)

320

321

await schedule_source.add_schedule(schedule)

322

```

323

324

### Recurring Tasks with Custom Logic

325

326

```python

327

from datetime import datetime, timedelta

328

329

# Schedule daily backups

330

async def schedule_daily_backup():

331

tomorrow = datetime.now() + timedelta(days=1)

332

tomorrow = tomorrow.replace(hour=2, minute=0, second=0, microsecond=0)

333

334

schedule = ScheduledTask(

335

task_id=f"backup-{tomorrow.strftime('%Y%m%d')}",

336

task_name="create_backup",

337

schedule_time=tomorrow,

338

args=["full"],

339

kwargs={"compress": True}

340

)

341

342

await schedule_source.add_schedule(schedule)

343

344

# Add initial schedule

345

await schedule_daily_backup()

346

```

347

348

### Schedule Management

349

350

```python

351

from taskiq_redis import ListRedisScheduleSource

352

353

schedule_source = ListRedisScheduleSource("redis://localhost:6379")

354

355

# Get all pending schedules

356

all_schedules = await schedule_source.get_schedules()

357

print(f"Total schedules: {len(all_schedules)}")

358

359

# Find specific schedule

360

target_schedule = None

361

for schedule in all_schedules:

362

if schedule.task_name == "generate_report":

363

target_schedule = schedule

364

break

365

366

if target_schedule:

367

# Update schedule (delete and re-add with new time)

368

await schedule_source.delete_schedule(target_schedule.task_id)

369

370

# Reschedule for later

371

target_schedule.schedule_time = datetime.now() + timedelta(hours=2)

372

await schedule_source.add_schedule(target_schedule)

373

374

# Clean up old schedules

375

now = datetime.now()

376

for schedule in all_schedules:

377

if schedule.schedule_time < now - timedelta(days=7):

378

await schedule_source.delete_schedule(schedule.task_id)

379

```

380

381

## Types

382

383

```python { .api }

384

from typing import List, Optional, Any, Tuple

385

from taskiq.abc.schedule_source import ScheduleSource

386

from taskiq.abc.serializer import TaskiqSerializer

387

from taskiq import ScheduledTask

388

```