The networks core of the Minos Framework providing networking components for reactive microservices
—
Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.
Parse and execute cron expressions for scheduling periodic tasks.
class CronTab:
def __init__(self, pattern: Union[str, CrontTabImpl]): ...
impl: Optional[CrontTabImpl]
repetitions: Union[int, float] # 1 for @reboot, inf for others
async def __aiter__(self) -> AsyncIterator[datetime]: ...
async def sleep_until_next(self, *args, **kwargs) -> None: ...
def get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...
def __hash__(self) -> int: ...
def __eq__(self, other: Any) -> bool: ...Usage Examples:
from minos.networks import CronTab
import asyncio
# Create cron for every 5 minutes
cron = CronTab("0 */5 * * * *")
# Sleep until next execution
await cron.sleep_until_next()
# Get delay until next run
delay_seconds = cron.get_delay_until_next()
print(f"Next run in {delay_seconds} seconds")
# Iterate over scheduled times
async for scheduled_time in cron:
print(f"Executing at {scheduled_time}")
break # Exit after first executionManage individual periodic tasks with lifecycle control.
class PeriodicTask:
def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]): ...
crontab: CronTab
fn: Callable[[ScheduledRequest], Awaitable[None]]
started: bool
running: bool
task: asyncio.Task
async def start(self) -> None: ...
async def stop(self, timeout: Optional[float] = None) -> None: ...
async def run_forever(self) -> NoReturn: ...
async def run_once(self, now: Optional[datetime] = None) -> None: ...Usage Examples:
from minos.networks import PeriodicTask, ScheduledRequest
async def cleanup_handler(request: ScheduledRequest) -> None:
print(f"Running cleanup at {request.scheduled_at}")
# Perform cleanup logic
# Create periodic task
task = PeriodicTask(
crontab="0 0 * * *", # Daily at midnight
fn=cleanup_handler
)
# Start the task
await task.start()
print(f"Task started: {task.started}")
# Task runs automatically based on cron schedule
await asyncio.sleep(3600) # Let it run for an hour
# Stop the task
await task.stop(timeout=30)Manage multiple periodic tasks as a service.
class PeriodicTaskScheduler:
def __init__(self, tasks: set[PeriodicTask]): ...
tasks: set[PeriodicTask]
@classmethod
def _from_config(cls, config: Config, **kwargs) -> PeriodicTaskScheduler: ...
@classmethod
def _tasks_from_config(cls, config: Config, **kwargs) -> set[PeriodicTask]: ...
async def start(self) -> None: ...
async def stop(self, timeout: Optional[float] = None) -> None: ...Usage Examples:
from minos.networks import PeriodicTaskScheduler, PeriodicTask
# Create multiple tasks
daily_task = PeriodicTask("0 0 * * *", daily_cleanup)
hourly_task = PeriodicTask("0 * * * *", hourly_report)
minute_task = PeriodicTask("* * * * *", health_check)
# Create scheduler with tasks
scheduler = PeriodicTaskScheduler(
tasks={daily_task, hourly_task, minute_task}
)
# Start all tasks
await scheduler.start()
# All tasks run automatically
# Stop all tasks
await scheduler.stop()Port implementation for periodic task scheduling with lifecycle management.
class PeriodicPort:
scheduler: PeriodicTaskScheduler
async def _start(self) -> None: ...
async def _stop(self, exception: Exception = None) -> None: ...
class PeriodicTaskSchedulerService:
"""Deprecated - use PeriodicPort instead"""Usage Examples:
from minos.networks import PeriodicPort
from minos.common import Config
# Create port from configuration
config = Config("config.yml")
port = PeriodicPort._from_config(config)
# Start periodic services
await port.start()
# Port manages scheduler lifecycle
# Stop periodic services
await port.stop()Request implementation for scheduled tasks.
class ScheduledRequest(Request):
def __init__(self, scheduled_at: datetime): ...
user: Optional[UUID] # Always None for system requests
has_content: bool # Always True
has_params: bool # Always False
async def _content(self, **kwargs) -> ScheduledRequestContent: ...
def __eq__(self, other: Request) -> bool: ...
def __repr__(self) -> str: ...
class ScheduledRequestContent:
scheduled_at: datetime
class ScheduledResponseException:
"""Exception for scheduled task responses"""Usage Examples:
from minos.networks import ScheduledRequest, enroute
from datetime import datetime
@enroute.periodic.event("0 */15 * * * *") # Every 15 minutes
async def monitor_system(request: ScheduledRequest) -> Response:
# Get scheduling information
content = await request.content()
scheduled_time = content.scheduled_at
print(f"System monitor ran at {scheduled_time}")
# Perform monitoring logic
system_health = check_system_health()
if not system_health:
raise ScheduledResponseException("System unhealthy", status=500)
return Response({"status": "healthy", "checked_at": scheduled_time})from minos.networks import enroute, CronTab
# Business hours only (9 AM to 5 PM, Monday to Friday)
@enroute.periodic.event("0 9-17 * * MON-FRI")
async def business_hours_task(request: ScheduledRequest) -> Response:
return Response({"executed_during_business_hours": True})
# Every 30 seconds
@enroute.periodic.event("*/30 * * * * *")
async def frequent_check(request: ScheduledRequest) -> Response:
return Response({"frequent_check": True})
# First day of every month at midnight
@enroute.periodic.event("0 0 1 * *")
async def monthly_report(request: ScheduledRequest) -> Response:
return Response({"monthly_report": "generated"})
# Using CronTab object for complex patterns
complex_cron = CronTab("0 0 * * SUN") # Every Sunday at midnight
@enroute.periodic.event(complex_cron)
async def weekly_backup(request: ScheduledRequest) -> Response:
return Response({"backup": "completed"})from minos.networks import (
PeriodicPort, PeriodicTask, PeriodicTaskScheduler,
enroute, ScheduledRequest, Response
)
from minos.common import Config
class ScheduledServices:
@enroute.periodic.event("0 0 * * *") # Daily at midnight
async def daily_cleanup(self, request: ScheduledRequest) -> Response:
content = await request.content()
print(f"Daily cleanup at {content.scheduled_at}")
# Cleanup logic
cleanup_old_files()
cleanup_database()
return Response({"cleanup": "completed"})
@enroute.periodic.event("0 */6 * * *") # Every 6 hours
async def health_check(self, request: ScheduledRequest) -> Response:
content = await request.content()
# Health check logic
services_status = check_all_services()
if not all(services_status.values()):
alert_administrators(services_status)
return Response({
"health_check": services_status,
"checked_at": content.scheduled_at
})
@enroute.periodic.event("0 0 1 * *") # First of every month
async def monthly_report(self, request: ScheduledRequest) -> Response:
content = await request.content()
# Generate monthly reports
report = generate_monthly_analytics()
send_report_to_stakeholders(report)
return Response({
"report": "generated",
"period": content.scheduled_at.strftime("%Y-%m")
})
# Setup and run scheduling service
config = Config("config.yml")
port = PeriodicPort._from_config(config)
# Start all scheduled tasks
await port.start()
print("Scheduled services started")
# Services run automatically
# Stop all scheduled tasks when shutting down
await port.stop()from minos.networks import ScheduledResponseException
import asyncio
class RobustScheduledService:
@enroute.periodic.event("0 */5 * * * *") # Every 5 minutes
async def robust_task(self, request: ScheduledRequest) -> Response:
max_retries = 3
retry_delay = 5 # seconds
for attempt in range(max_retries):
try:
# Potentially failing operation
result = await perform_critical_operation()
return Response({
"result": result,
"attempt": attempt + 1
})
except Exception as e:
if attempt == max_retries - 1:
# Final attempt failed
raise ScheduledResponseException(
f"Task failed after {max_retries} attempts: {e}",
status=500
)
# Wait before retry
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
# Should never reach here
raise ScheduledResponseException("Unexpected error", status=500)
async def perform_critical_operation():
# Simulate potentially failing operation
import random
if random.random() < 0.3: # 30% failure rate
raise Exception("Simulated failure")
return "Operation successful"class DynamicScheduler:
def __init__(self):
self.scheduler = None
self.dynamic_tasks = {}
async def add_task(self, task_id: str, cron_pattern: str, handler):
"""Dynamically add a new scheduled task"""
task = PeriodicTask(cron_pattern, handler)
self.dynamic_tasks[task_id] = task
if self.scheduler:
# Add to running scheduler
self.scheduler.tasks.add(task)
await task.start()
async def remove_task(self, task_id: str):
"""Dynamically remove a scheduled task"""
if task_id in self.dynamic_tasks:
task = self.dynamic_tasks[task_id]
await task.stop()
if self.scheduler:
self.scheduler.tasks.discard(task)
del self.dynamic_tasks[task_id]
async def start_scheduler(self):
"""Start the dynamic scheduler"""
self.scheduler = PeriodicTaskScheduler(set(self.dynamic_tasks.values()))
await self.scheduler.start()
async def stop_scheduler(self):
"""Stop the dynamic scheduler"""
if self.scheduler:
await self.scheduler.stop()
# Usage
dynamic_scheduler = DynamicScheduler()
# Add tasks dynamically
await dynamic_scheduler.add_task(
"cleanup",
"0 2 * * *", # 2 AM daily
lambda req: cleanup_handler(req)
)
await dynamic_scheduler.add_task(
"heartbeat",
"*/30 * * * * *", # Every 30 seconds
lambda req: heartbeat_handler(req)
)
# Start scheduling
await dynamic_scheduler.start_scheduler()
# Later, remove a task
await dynamic_scheduler.remove_task("heartbeat")Install with Tessl CLI
npx tessl i tessl/pypi-minos-microservice-networks