or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mddecorators.mddiscovery.mdhttp.mdindex.mdrequests.mdrouters.mdscheduling.mdspecs.mdsystem-utils.md

scheduling.mddocs/

0

# Task Scheduling

1

2

Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.

3

4

## Capabilities

5

6

### Cron Expression Parsing

7

8

Parse and execute cron expressions for scheduling periodic tasks.

9

10

```python { .api }

11

class CronTab:

12

def __init__(self, pattern: Union[str, CrontTabImpl]): ...

13

impl: Optional[CrontTabImpl]

14

repetitions: Union[int, float] # 1 for @reboot, inf for others

15

async def __aiter__(self) -> AsyncIterator[datetime]: ...

16

async def sleep_until_next(self, *args, **kwargs) -> None: ...

17

def get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...

18

def __hash__(self) -> int: ...

19

def __eq__(self, other: Any) -> bool: ...

20

```

21

22

**Usage Examples:**

23

24

```python

25

from minos.networks import CronTab

26

import asyncio

27

28

# Create cron for every 5 minutes

29

cron = CronTab("0 */5 * * * *")

30

31

# Sleep until next execution

32

await cron.sleep_until_next()

33

34

# Get delay until next run

35

delay_seconds = cron.get_delay_until_next()

36

print(f"Next run in {delay_seconds} seconds")

37

38

# Iterate over scheduled times

39

async for scheduled_time in cron:

40

print(f"Executing at {scheduled_time}")

41

break # Exit after first execution

42

```

43

44

### Periodic Task Management

45

46

Manage individual periodic tasks with lifecycle control.

47

48

```python { .api }

49

class PeriodicTask:

50

def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]): ...

51

crontab: CronTab

52

fn: Callable[[ScheduledRequest], Awaitable[None]]

53

started: bool

54

running: bool

55

task: asyncio.Task

56

async def start(self) -> None: ...

57

async def stop(self, timeout: Optional[float] = None) -> None: ...

58

async def run_forever(self) -> NoReturn: ...

59

async def run_once(self, now: Optional[datetime] = None) -> None: ...

60

```

61

62

**Usage Examples:**

63

64

```python

65

from minos.networks import PeriodicTask, ScheduledRequest

66

67

async def cleanup_handler(request: ScheduledRequest) -> None:

68

print(f"Running cleanup at {request.scheduled_at}")

69

# Perform cleanup logic

70

71

# Create periodic task

72

task = PeriodicTask(

73

crontab="0 0 * * *", # Daily at midnight

74

fn=cleanup_handler

75

)

76

77

# Start the task

78

await task.start()

79

print(f"Task started: {task.started}")

80

81

# Task runs automatically based on cron schedule

82

await asyncio.sleep(3600) # Let it run for an hour

83

84

# Stop the task

85

await task.stop(timeout=30)

86

```

87

88

### Task Scheduler Service

89

90

Manage multiple periodic tasks as a service.

91

92

```python { .api }

93

class PeriodicTaskScheduler:

94

def __init__(self, tasks: set[PeriodicTask]): ...

95

tasks: set[PeriodicTask]

96

@classmethod

97

def _from_config(cls, config: Config, **kwargs) -> PeriodicTaskScheduler: ...

98

@classmethod

99

def _tasks_from_config(cls, config: Config, **kwargs) -> set[PeriodicTask]: ...

100

async def start(self) -> None: ...

101

async def stop(self, timeout: Optional[float] = None) -> None: ...

102

```

103

104

**Usage Examples:**

105

106

```python

107

from minos.networks import PeriodicTaskScheduler, PeriodicTask

108

109

# Create multiple tasks

110

daily_task = PeriodicTask("0 0 * * *", daily_cleanup)

111

hourly_task = PeriodicTask("0 * * * *", hourly_report)

112

minute_task = PeriodicTask("* * * * *", health_check)

113

114

# Create scheduler with tasks

115

scheduler = PeriodicTaskScheduler(

116

tasks={daily_task, hourly_task, minute_task}

117

)

118

119

# Start all tasks

120

await scheduler.start()

121

122

# All tasks run automatically

123

# Stop all tasks

124

await scheduler.stop()

125

```

126

127

### Periodic Port Service

128

129

Port implementation for periodic task scheduling with lifecycle management.

130

131

```python { .api }

132

class PeriodicPort:

133

scheduler: PeriodicTaskScheduler

134

async def _start(self) -> None: ...

135

async def _stop(self, exception: Exception = None) -> None: ...

136

137

class PeriodicTaskSchedulerService:

138

"""Deprecated - use PeriodicPort instead"""

139

```

140

141

**Usage Examples:**

142

143

```python

144

from minos.networks import PeriodicPort

145

from minos.common import Config

146

147

# Create port from configuration

148

config = Config("config.yml")

149

port = PeriodicPort._from_config(config)

150

151

# Start periodic services

152

await port.start()

153

154

# Port manages scheduler lifecycle

155

# Stop periodic services

156

await port.stop()

157

```

158

159

### Scheduled Request Interface

160

161

Request implementation for scheduled tasks.

162

163

```python { .api }

164

class ScheduledRequest(Request):

165

def __init__(self, scheduled_at: datetime): ...

166

user: Optional[UUID] # Always None for system requests

167

has_content: bool # Always True

168

has_params: bool # Always False

169

async def _content(self, **kwargs) -> ScheduledRequestContent: ...

170

def __eq__(self, other: Request) -> bool: ...

171

def __repr__(self) -> str: ...

172

173

class ScheduledRequestContent:

174

scheduled_at: datetime

175

176

class ScheduledResponseException:

177

"""Exception for scheduled task responses"""

178

```

179

180

**Usage Examples:**

181

182

```python

183

from minos.networks import ScheduledRequest, enroute

184

from datetime import datetime

185

186

@enroute.periodic.event("0 */15 * * * *") # Every 15 minutes

187

async def monitor_system(request: ScheduledRequest) -> Response:

188

# Get scheduling information

189

content = await request.content()

190

scheduled_time = content.scheduled_at

191

192

print(f"System monitor ran at {scheduled_time}")

193

194

# Perform monitoring logic

195

system_health = check_system_health()

196

197

if not system_health:

198

raise ScheduledResponseException("System unhealthy", status=500)

199

200

return Response({"status": "healthy", "checked_at": scheduled_time})

201

```

202

203

## Advanced Usage

204

205

### Complex Cron Expressions

206

207

```python

208

from minos.networks import enroute, CronTab

209

210

# Business hours only (9 AM to 5 PM, Monday to Friday)

211

@enroute.periodic.event("0 9-17 * * MON-FRI")

212

async def business_hours_task(request: ScheduledRequest) -> Response:

213

return Response({"executed_during_business_hours": True})

214

215

# Every 30 seconds

216

@enroute.periodic.event("*/30 * * * * *")

217

async def frequent_check(request: ScheduledRequest) -> Response:

218

return Response({"frequent_check": True})

219

220

# First day of every month at midnight

221

@enroute.periodic.event("0 0 1 * *")

222

async def monthly_report(request: ScheduledRequest) -> Response:

223

return Response({"monthly_report": "generated"})

224

225

# Using CronTab object for complex patterns

226

complex_cron = CronTab("0 0 * * SUN") # Every Sunday at midnight

227

@enroute.periodic.event(complex_cron)

228

async def weekly_backup(request: ScheduledRequest) -> Response:

229

return Response({"backup": "completed"})

230

```

231

232

### Complete Scheduling Service

233

234

```python

235

from minos.networks import (

236

PeriodicPort, PeriodicTask, PeriodicTaskScheduler,

237

enroute, ScheduledRequest, Response

238

)

239

from minos.common import Config

240

241

class ScheduledServices:

242

@enroute.periodic.event("0 0 * * *") # Daily at midnight

243

async def daily_cleanup(self, request: ScheduledRequest) -> Response:

244

content = await request.content()

245

print(f"Daily cleanup at {content.scheduled_at}")

246

247

# Cleanup logic

248

cleanup_old_files()

249

cleanup_database()

250

251

return Response({"cleanup": "completed"})

252

253

@enroute.periodic.event("0 */6 * * *") # Every 6 hours

254

async def health_check(self, request: ScheduledRequest) -> Response:

255

content = await request.content()

256

257

# Health check logic

258

services_status = check_all_services()

259

260

if not all(services_status.values()):

261

alert_administrators(services_status)

262

263

return Response({

264

"health_check": services_status,

265

"checked_at": content.scheduled_at

266

})

267

268

@enroute.periodic.event("0 0 1 * *") # First of every month

269

async def monthly_report(self, request: ScheduledRequest) -> Response:

270

content = await request.content()

271

272

# Generate monthly reports

273

report = generate_monthly_analytics()

274

send_report_to_stakeholders(report)

275

276

return Response({

277

"report": "generated",

278

"period": content.scheduled_at.strftime("%Y-%m")

279

})

280

281

# Setup and run scheduling service

282

config = Config("config.yml")

283

port = PeriodicPort._from_config(config)

284

285

# Start all scheduled tasks

286

await port.start()

287

print("Scheduled services started")

288

289

# Services run automatically

290

# Stop all scheduled tasks when shutting down

291

await port.stop()

292

```

293

294

### Error Handling and Retries

295

296

```python

297

from minos.networks import ScheduledResponseException

298

import asyncio

299

300

class RobustScheduledService:

301

@enroute.periodic.event("0 */5 * * * *") # Every 5 minutes

302

async def robust_task(self, request: ScheduledRequest) -> Response:

303

max_retries = 3

304

retry_delay = 5 # seconds

305

306

for attempt in range(max_retries):

307

try:

308

# Potentially failing operation

309

result = await perform_critical_operation()

310

311

return Response({

312

"result": result,

313

"attempt": attempt + 1

314

})

315

316

except Exception as e:

317

if attempt == max_retries - 1:

318

# Final attempt failed

319

raise ScheduledResponseException(

320

f"Task failed after {max_retries} attempts: {e}",

321

status=500

322

)

323

324

# Wait before retry

325

await asyncio.sleep(retry_delay)

326

retry_delay *= 2 # Exponential backoff

327

328

# Should never reach here

329

raise ScheduledResponseException("Unexpected error", status=500)

330

331

async def perform_critical_operation():

332

# Simulate potentially failing operation

333

import random

334

if random.random() < 0.3: # 30% failure rate

335

raise Exception("Simulated failure")

336

return "Operation successful"

337

```

338

339

### Dynamic Task Management

340

341

```python

342

class DynamicScheduler:

343

def __init__(self):

344

self.scheduler = None

345

self.dynamic_tasks = {}

346

347

async def add_task(self, task_id: str, cron_pattern: str, handler):

348

"""Dynamically add a new scheduled task"""

349

task = PeriodicTask(cron_pattern, handler)

350

self.dynamic_tasks[task_id] = task

351

352

if self.scheduler:

353

# Add to running scheduler

354

self.scheduler.tasks.add(task)

355

await task.start()

356

357

async def remove_task(self, task_id: str):

358

"""Dynamically remove a scheduled task"""

359

if task_id in self.dynamic_tasks:

360

task = self.dynamic_tasks[task_id]

361

await task.stop()

362

363

if self.scheduler:

364

self.scheduler.tasks.discard(task)

365

366

del self.dynamic_tasks[task_id]

367

368

async def start_scheduler(self):

369

"""Start the dynamic scheduler"""

370

self.scheduler = PeriodicTaskScheduler(set(self.dynamic_tasks.values()))

371

await self.scheduler.start()

372

373

async def stop_scheduler(self):

374

"""Stop the dynamic scheduler"""

375

if self.scheduler:

376

await self.scheduler.stop()

377

378

# Usage

379

dynamic_scheduler = DynamicScheduler()

380

381

# Add tasks dynamically

382

await dynamic_scheduler.add_task(

383

"cleanup",

384

"0 2 * * *", # 2 AM daily

385

lambda req: cleanup_handler(req)

386

)

387

388

await dynamic_scheduler.add_task(

389

"heartbeat",

390

"*/30 * * * * *", # Every 30 seconds

391

lambda req: heartbeat_handler(req)

392

)

393

394

# Start scheduling

395

await dynamic_scheduler.start_scheduler()

396

397

# Later, remove a task

398

await dynamic_scheduler.remove_task("heartbeat")

399

```