0
# Task Scheduling
1
2
Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.
3
4
## Capabilities
5
6
### Cron Expression Parsing
7
8
Parse and execute cron expressions for scheduling periodic tasks.
9
10
```python { .api }
11
class CronTab:
12
def __init__(self, pattern: Union[str, CrontTabImpl]): ...
13
impl: Optional[CrontTabImpl]
14
repetitions: Union[int, float] # 1 for @reboot, inf for others
15
async def __aiter__(self) -> AsyncIterator[datetime]: ...
16
async def sleep_until_next(self, *args, **kwargs) -> None: ...
17
def get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...
18
def __hash__(self) -> int: ...
19
def __eq__(self, other: Any) -> bool: ...
20
```
21
22
**Usage Examples:**
23
24
```python
25
from minos.networks import CronTab
26
import asyncio
27
28
# Create cron for every 5 minutes
29
cron = CronTab("0 */5 * * * *")
30
31
# Sleep until next execution
32
await cron.sleep_until_next()
33
34
# Get delay until next run
35
delay_seconds = cron.get_delay_until_next()
36
print(f"Next run in {delay_seconds} seconds")
37
38
# Iterate over scheduled times
39
async for scheduled_time in cron:
40
print(f"Executing at {scheduled_time}")
41
break # Exit after first execution
42
```
43
44
### Periodic Task Management
45
46
Manage individual periodic tasks with lifecycle control.
47
48
```python { .api }
49
class PeriodicTask:
50
def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]): ...
51
crontab: CronTab
52
fn: Callable[[ScheduledRequest], Awaitable[None]]
53
started: bool
54
running: bool
55
task: asyncio.Task
56
async def start(self) -> None: ...
57
async def stop(self, timeout: Optional[float] = None) -> None: ...
58
async def run_forever(self) -> NoReturn: ...
59
async def run_once(self, now: Optional[datetime] = None) -> None: ...
60
```
61
62
**Usage Examples:**
63
64
```python
65
from minos.networks import PeriodicTask, ScheduledRequest
66
67
async def cleanup_handler(request: ScheduledRequest) -> None:
68
print(f"Running cleanup at {request.scheduled_at}")
69
# Perform cleanup logic
70
71
# Create periodic task
72
task = PeriodicTask(
73
crontab="0 0 * * *", # Daily at midnight
74
fn=cleanup_handler
75
)
76
77
# Start the task
78
await task.start()
79
print(f"Task started: {task.started}")
80
81
# Task runs automatically based on cron schedule
82
await asyncio.sleep(3600) # Let it run for an hour
83
84
# Stop the task
85
await task.stop(timeout=30)
86
```
87
88
### Task Scheduler Service
89
90
Manage multiple periodic tasks as a service.
91
92
```python { .api }
93
class PeriodicTaskScheduler:
94
def __init__(self, tasks: set[PeriodicTask]): ...
95
tasks: set[PeriodicTask]
96
@classmethod
97
def _from_config(cls, config: Config, **kwargs) -> PeriodicTaskScheduler: ...
98
@classmethod
99
def _tasks_from_config(cls, config: Config, **kwargs) -> set[PeriodicTask]: ...
100
async def start(self) -> None: ...
101
async def stop(self, timeout: Optional[float] = None) -> None: ...
102
```
103
104
**Usage Examples:**
105
106
```python
107
from minos.networks import PeriodicTaskScheduler, PeriodicTask
108
109
# Create multiple tasks
110
daily_task = PeriodicTask("0 0 * * *", daily_cleanup)
111
hourly_task = PeriodicTask("0 * * * *", hourly_report)
112
minute_task = PeriodicTask("* * * * *", health_check)
113
114
# Create scheduler with tasks
115
scheduler = PeriodicTaskScheduler(
116
tasks={daily_task, hourly_task, minute_task}
117
)
118
119
# Start all tasks
120
await scheduler.start()
121
122
# All tasks run automatically
123
# Stop all tasks
124
await scheduler.stop()
125
```
126
127
### Periodic Port Service
128
129
Port implementation for periodic task scheduling with lifecycle management.
130
131
```python { .api }
132
class PeriodicPort:
133
scheduler: PeriodicTaskScheduler
134
async def _start(self) -> None: ...
135
async def _stop(self, exception: Exception = None) -> None: ...
136
137
class PeriodicTaskSchedulerService:
138
"""Deprecated - use PeriodicPort instead"""
139
```
140
141
**Usage Examples:**
142
143
```python
144
from minos.networks import PeriodicPort
145
from minos.common import Config
146
147
# Create port from configuration
148
config = Config("config.yml")
149
port = PeriodicPort._from_config(config)
150
151
# Start periodic services
152
await port.start()
153
154
# Port manages scheduler lifecycle
155
# Stop periodic services
156
await port.stop()
157
```
158
159
### Scheduled Request Interface
160
161
Request implementation for scheduled tasks.
162
163
```python { .api }
164
class ScheduledRequest(Request):
165
def __init__(self, scheduled_at: datetime): ...
166
user: Optional[UUID] # Always None for system requests
167
has_content: bool # Always True
168
has_params: bool # Always False
169
async def _content(self, **kwargs) -> ScheduledRequestContent: ...
170
def __eq__(self, other: Request) -> bool: ...
171
def __repr__(self) -> str: ...
172
173
class ScheduledRequestContent:
174
scheduled_at: datetime
175
176
class ScheduledResponseException:
177
"""Exception for scheduled task responses"""
178
```
179
180
**Usage Examples:**
181
182
```python
183
from minos.networks import ScheduledRequest, enroute
184
from datetime import datetime
185
186
@enroute.periodic.event("0 */15 * * * *") # Every 15 minutes
187
async def monitor_system(request: ScheduledRequest) -> Response:
188
# Get scheduling information
189
content = await request.content()
190
scheduled_time = content.scheduled_at
191
192
print(f"System monitor ran at {scheduled_time}")
193
194
# Perform monitoring logic
195
system_health = check_system_health()
196
197
if not system_health:
198
raise ScheduledResponseException("System unhealthy", status=500)
199
200
return Response({"status": "healthy", "checked_at": scheduled_time})
201
```
202
203
## Advanced Usage
204
205
### Complex Cron Expressions
206
207
```python
208
from minos.networks import enroute, CronTab
209
210
# Business hours only (9 AM to 5 PM, Monday to Friday)
211
@enroute.periodic.event("0 9-17 * * MON-FRI")
212
async def business_hours_task(request: ScheduledRequest) -> Response:
213
return Response({"executed_during_business_hours": True})
214
215
# Every 30 seconds
216
@enroute.periodic.event("*/30 * * * * *")
217
async def frequent_check(request: ScheduledRequest) -> Response:
218
return Response({"frequent_check": True})
219
220
# First day of every month at midnight
221
@enroute.periodic.event("0 0 1 * *")
222
async def monthly_report(request: ScheduledRequest) -> Response:
223
return Response({"monthly_report": "generated"})
224
225
# Using CronTab object for complex patterns
226
complex_cron = CronTab("0 0 * * SUN") # Every Sunday at midnight
227
@enroute.periodic.event(complex_cron)
228
async def weekly_backup(request: ScheduledRequest) -> Response:
229
return Response({"backup": "completed"})
230
```
231
232
### Complete Scheduling Service
233
234
```python
235
from minos.networks import (
236
PeriodicPort, PeriodicTask, PeriodicTaskScheduler,
237
enroute, ScheduledRequest, Response
238
)
239
from minos.common import Config
240
241
class ScheduledServices:
242
@enroute.periodic.event("0 0 * * *") # Daily at midnight
243
async def daily_cleanup(self, request: ScheduledRequest) -> Response:
244
content = await request.content()
245
print(f"Daily cleanup at {content.scheduled_at}")
246
247
# Cleanup logic
248
cleanup_old_files()
249
cleanup_database()
250
251
return Response({"cleanup": "completed"})
252
253
@enroute.periodic.event("0 */6 * * *") # Every 6 hours
254
async def health_check(self, request: ScheduledRequest) -> Response:
255
content = await request.content()
256
257
# Health check logic
258
services_status = check_all_services()
259
260
if not all(services_status.values()):
261
alert_administrators(services_status)
262
263
return Response({
264
"health_check": services_status,
265
"checked_at": content.scheduled_at
266
})
267
268
@enroute.periodic.event("0 0 1 * *") # First of every month
269
async def monthly_report(self, request: ScheduledRequest) -> Response:
270
content = await request.content()
271
272
# Generate monthly reports
273
report = generate_monthly_analytics()
274
send_report_to_stakeholders(report)
275
276
return Response({
277
"report": "generated",
278
"period": content.scheduled_at.strftime("%Y-%m")
279
})
280
281
# Setup and run scheduling service
282
config = Config("config.yml")
283
port = PeriodicPort._from_config(config)
284
285
# Start all scheduled tasks
286
await port.start()
287
print("Scheduled services started")
288
289
# Services run automatically
290
# Stop all scheduled tasks when shutting down
291
await port.stop()
292
```
293
294
### Error Handling and Retries
295
296
```python
297
from minos.networks import ScheduledResponseException
298
import asyncio
299
300
class RobustScheduledService:
301
@enroute.periodic.event("0 */5 * * * *") # Every 5 minutes
302
async def robust_task(self, request: ScheduledRequest) -> Response:
303
max_retries = 3
304
retry_delay = 5 # seconds
305
306
for attempt in range(max_retries):
307
try:
308
# Potentially failing operation
309
result = await perform_critical_operation()
310
311
return Response({
312
"result": result,
313
"attempt": attempt + 1
314
})
315
316
except Exception as e:
317
if attempt == max_retries - 1:
318
# Final attempt failed
319
raise ScheduledResponseException(
320
f"Task failed after {max_retries} attempts: {e}",
321
status=500
322
)
323
324
# Wait before retry
325
await asyncio.sleep(retry_delay)
326
retry_delay *= 2 # Exponential backoff
327
328
# Should never reach here
329
raise ScheduledResponseException("Unexpected error", status=500)
330
331
async def perform_critical_operation():
332
# Simulate potentially failing operation
333
import random
334
if random.random() < 0.3: # 30% failure rate
335
raise Exception("Simulated failure")
336
return "Operation successful"
337
```
338
339
### Dynamic Task Management
340
341
```python
342
class DynamicScheduler:
343
def __init__(self):
344
self.scheduler = None
345
self.dynamic_tasks = {}
346
347
async def add_task(self, task_id: str, cron_pattern: str, handler):
348
"""Dynamically add a new scheduled task"""
349
task = PeriodicTask(cron_pattern, handler)
350
self.dynamic_tasks[task_id] = task
351
352
if self.scheduler:
353
# Add to running scheduler
354
self.scheduler.tasks.add(task)
355
await task.start()
356
357
async def remove_task(self, task_id: str):
358
"""Dynamically remove a scheduled task"""
359
if task_id in self.dynamic_tasks:
360
task = self.dynamic_tasks[task_id]
361
await task.stop()
362
363
if self.scheduler:
364
self.scheduler.tasks.discard(task)
365
366
del self.dynamic_tasks[task_id]
367
368
async def start_scheduler(self):
369
"""Start the dynamic scheduler"""
370
self.scheduler = PeriodicTaskScheduler(set(self.dynamic_tasks.values()))
371
await self.scheduler.start()
372
373
async def stop_scheduler(self):
374
"""Stop the dynamic scheduler"""
375
if self.scheduler:
376
await self.scheduler.stop()
377
378
# Usage
379
dynamic_scheduler = DynamicScheduler()
380
381
# Add tasks dynamically
382
await dynamic_scheduler.add_task(
383
"cleanup",
384
"0 2 * * *", # 2 AM daily
385
lambda req: cleanup_handler(req)
386
)
387
388
await dynamic_scheduler.add_task(
389
"heartbeat",
390
"*/30 * * * * *", # Every 30 seconds
391
lambda req: heartbeat_handler(req)
392
)
393
394
# Start scheduling
395
await dynamic_scheduler.start_scheduler()
396
397
# Later, remove a task
398
await dynamic_scheduler.remove_task("heartbeat")
399
```