or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md

scheduling.mddocs/

0

# Scheduling

1

2

Task scheduling system for executing tasks at specific times, on recurring schedules, or based on custom triggers. Supports cron-like expressions, one-time scheduling, and extensible schedule sources.

3

4

## Capabilities

5

6

### Scheduler Management

7

8

Core scheduler class that coordinates between schedule sources and task brokers to execute scheduled tasks.

9

10

```python { .api }

11

class TaskiqScheduler:

12

"""

13

Main scheduler class that manages scheduled task execution.

14

15

Coordinates between schedule sources and brokers to execute tasks

16

at the appropriate times based on cron expressions, fixed times,

17

or custom scheduling logic.

18

"""

19

20

def __init__(

21

self,

22

broker: AsyncBroker,

23

sources: List[ScheduleSource],

24

) -> None:

25

"""

26

Initialize scheduler with broker and schedule sources.

27

28

Args:

29

broker: Broker instance for task execution

30

sources: List of schedule sources providing tasks

31

"""

32

33

async def startup(self) -> None:

34

"""

35

Start the scheduler and initialize all components.

36

37

Calls startup on the broker and prepares schedule sources

38

for task execution.

39

"""

40

41

async def shutdown(self) -> None:

42

"""

43

Shutdown the scheduler and cleanup resources.

44

45

Stops all scheduled tasks and shuts down the broker.

46

"""

47

48

async def run_forever(self) -> None:

49

"""

50

Run the scheduler continuously until shutdown.

51

52

Main scheduler loop that checks schedule sources

53

and executes tasks when they become ready.

54

"""

55

56

async def on_ready(

57

self,

58

source: ScheduleSource,

59

task: ScheduledTask,

60

) -> None:

61

"""

62

Handler called when a scheduled task is ready for execution.

63

64

Args:

65

source: Schedule source that triggered the task

66

task: Scheduled task to be executed

67

"""

68

```

69

70

### Scheduled Tasks

71

72

Representation of tasks with scheduling metadata including timing, arguments, and execution context.

73

74

```python { .api }

75

class ScheduledTask:

76

"""

77

Represents a task scheduled for future execution.

78

79

Contains task identification, scheduling information,

80

and execution parameters.

81

"""

82

83

task_name: str

84

"""Name of the task to execute."""

85

86

cron: Optional[str]

87

"""Cron expression for recurring schedules (e.g., '0 0 * * *' for daily)."""

88

89

time: Optional[datetime]

90

"""Specific datetime for one-time execution."""

91

92

labels: Dict[str, Any]

93

"""Additional labels and metadata for the task."""

94

95

args: Tuple[Any, ...]

96

"""Positional arguments to pass to the task function."""

97

98

kwargs: Dict[str, Any]

99

"""Keyword arguments to pass to the task function."""

100

101

def __init__(

102

self,

103

task_name: str,

104

cron: Optional[str] = None,

105

time: Optional[datetime] = None,

106

labels: Optional[Dict[str, Any]] = None,

107

args: Optional[Tuple[Any, ...]] = None,

108

kwargs: Optional[Dict[str, Any]] = None,

109

) -> None: ...

110

```

111

112

### Schedule Sources

113

114

Abstract interface and implementations for providing scheduled tasks to the scheduler.

115

116

```python { .api }

117

class ScheduleSource:

118

"""

119

Abstract base class for schedule sources.

120

121

Schedule sources provide scheduled tasks to the scheduler

122

and can implement custom scheduling logic.

123

"""

124

125

async def startup(self) -> None:

126

"""Initialize the schedule source."""

127

128

async def shutdown(self) -> None:

129

"""Cleanup the schedule source."""

130

131

async def get_schedules(self) -> List[ScheduledTask]:

132

"""

133

Get list of scheduled tasks.

134

135

Returns:

136

List of scheduled tasks from this source

137

"""

138

139

async def pre_send(self, task: ScheduledTask) -> None:

140

"""

141

Pre-processing hook before task execution.

142

143

Can modify task or raise ScheduledTaskCancelledError to cancel.

144

145

Args:

146

task: Scheduled task about to be executed

147

148

Raises:

149

ScheduledTaskCancelledError: To cancel task execution

150

"""

151

152

async def post_send(self, task: ScheduledTask) -> None:

153

"""

154

Post-processing hook after task is sent.

155

156

Args:

157

task: Scheduled task that was sent

158

"""

159

160

class LabelBasedScheduleSource(ScheduleSource):

161

"""

162

Schedule source that discovers tasks based on labels.

163

164

Automatically finds tasks with scheduling labels in the broker's

165

task registry and creates appropriate scheduled tasks.

166

"""

167

168

def __init__(

169

self,

170

broker: AsyncBroker,

171

schedule_label: str = "schedule",

172

) -> None:

173

"""

174

Initialize label-based schedule source.

175

176

Args:

177

broker: Broker containing tasks to schedule

178

schedule_label: Label name containing schedule information

179

"""

180

181

async def get_schedules(self) -> List[ScheduledTask]:

182

"""Extract scheduled tasks from broker task registry."""

183

```

184

185

## Usage Examples

186

187

### Basic Cron Scheduling

188

189

```python

190

import asyncio

191

from datetime import datetime

192

from taskiq import InMemoryBroker

193

from taskiq.scheduler import TaskiqScheduler, ScheduledTask

194

from taskiq.schedule_sources import LabelBasedScheduleSource

195

196

broker = InMemoryBroker()

197

198

# Define scheduled tasks using labels

199

@broker.task(schedule="0 8 * * *") # Daily at 8 AM

200

async def daily_report() -> None:

201

print(f"Generating daily report at {datetime.now()}")

202

# Generate and send report

203

204

@broker.task(schedule="*/15 * * * *") # Every 15 minutes

205

async def health_check() -> None:

206

print(f"Health check at {datetime.now()}")

207

# Check system health

208

209

# Set up scheduler

210

schedule_source = LabelBasedScheduleSource(broker)

211

scheduler = TaskiqScheduler(broker, [schedule_source])

212

213

async def run_scheduler():

214

await scheduler.startup()

215

try:

216

await scheduler.run_forever()

217

finally:

218

await scheduler.shutdown()

219

220

# Run scheduler

221

asyncio.run(run_scheduler())

222

```

223

224

### One-time Scheduled Tasks

225

226

```python

227

from datetime import datetime, timedelta

228

from taskiq.scheduler import ScheduledTask

229

230

# Schedule task for specific time

231

future_time = datetime.now() + timedelta(hours=2)

232

scheduled_task = ScheduledTask(

233

task_name="my_module:delayed_task",

234

time=future_time,

235

args=("important_data",),

236

kwargs={"priority": "high"},

237

labels={"category": "one-time"},

238

)

239

240

# Custom schedule source for one-time tasks

241

class OneTimeScheduleSource(ScheduleSource):

242

def __init__(self):

243

self.tasks = []

244

245

def add_task(self, task: ScheduledTask):

246

self.tasks.append(task)

247

248

async def get_schedules(self) -> List[ScheduledTask]:

249

return self.tasks

250

251

async def post_send(self, task: ScheduledTask) -> None:

252

# Remove one-time tasks after execution

253

if task.time and task in self.tasks:

254

self.tasks.remove(task)

255

256

# Use custom source

257

one_time_source = OneTimeScheduleSource()

258

one_time_source.add_task(scheduled_task)

259

260

scheduler = TaskiqScheduler(

261

broker,

262

[LabelBasedScheduleSource(broker), one_time_source]

263

)

264

```

265

266

### Advanced Scheduling with Custom Logic

267

268

```python

269

class ConditionalScheduleSource(ScheduleSource):

270

"""Schedule source with custom conditions."""

271

272

def __init__(self, broker: AsyncBroker):

273

self.broker = broker

274

self.last_execution = {}

275

276

async def get_schedules(self) -> List[ScheduledTask]:

277

schedules = []

278

279

# Only schedule backup task if it's been more than 6 hours

280

last_backup = self.last_execution.get("backup_task")

281

if (not last_backup or

282

datetime.now() - last_backup > timedelta(hours=6)):

283

schedules.append(ScheduledTask(

284

task_name="my_module:backup_data",

285

time=datetime.now() + timedelta(minutes=1),

286

labels={"type": "maintenance"},

287

))

288

289

return schedules

290

291

async def pre_send(self, task: ScheduledTask) -> None:

292

# Check system load before executing maintenance tasks

293

if task.labels.get("type") == "maintenance":

294

if await self._system_load_too_high():

295

raise ScheduledTaskCancelledError("System load too high")

296

297

async def post_send(self, task: ScheduledTask) -> None:

298

# Track execution time

299

self.last_execution[task.task_name.split(":")[-1]] = datetime.now()

300

301

async def _system_load_too_high(self) -> bool:

302

# Custom system load check

303

import psutil

304

return psutil.cpu_percent() > 80.0

305

```

306

307

### Integration with Task Labels

308

309

```python

310

# Define tasks with various scheduling options

311

@broker.task(

312

schedule="0 2 * * 0", # Weekly on Sunday at 2 AM

313

priority="low",

314

timeout=3600, # 1 hour timeout

315

)

316

async def weekly_cleanup() -> None:

317

"""Weekly maintenance task."""

318

print("Running weekly cleanup")

319

# Cleanup old data

320

321

@broker.task(

322

schedule="*/5 * * * *", # Every 5 minutes

323

max_retries=3,

324

retry_delay=30,

325

)

326

async def monitoring_task() -> None:

327

"""Frequent monitoring task with retry logic."""

328

# Monitor system metrics

329

pass

330

331

# Schedule source automatically discovers these tasks

332

schedule_source = LabelBasedScheduleSource(

333

broker,

334

schedule_label="schedule" # Look for 'schedule' label

335

)

336

```

337

338

## Types

339

340

```python { .api }

341

ScheduledTaskCancelledError = Exception

342

"""Exception raised to cancel scheduled task execution."""

343

```

344

345

## Cron Expression Format

346

347

Taskiq uses standard cron expression format with five fields:

348

349

```

350

┌───────────── minute (0 - 59)

351

│ ┌───────────── hour (0 - 23)

352

│ │ ┌───────────── day of month (1 - 31)

353

│ │ │ ┌───────────── month (1 - 12)

354

│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)

355

│ │ │ │ │

356

* * * * *

357

```

358

359

Common examples:

360

- `"0 0 * * *"` - Daily at midnight

361

- `"30 8 * * 1-5"` - Weekdays at 8:30 AM

362

- `"0 */4 * * *"` - Every 4 hours

363

- `"15 2 1 * *"` - First day of month at 2:15 AM

364

- `"0 9-17 * * 1-5"` - Hourly during business hours