CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-minos-microservice-networks

The networks core of the Minos Framework providing networking components for reactive microservices

Pending
Overview
Eval results
Files

scheduling.mddocs/

Task Scheduling

Cron-based periodic task scheduling with async execution, lifecycle management, and integration with the decorator system. Supports complex scheduling patterns and error handling.

Capabilities

Cron Expression Parsing

Parse and execute cron expressions for scheduling periodic tasks.

class CronTab:
    def __init__(self, pattern: Union[str, CrontTabImpl]): ...
    impl: Optional[CrontTabImpl]
    repetitions: Union[int, float]  # 1 for @reboot, inf for others
    async def __aiter__(self) -> AsyncIterator[datetime]: ...
    async def sleep_until_next(self, *args, **kwargs) -> None: ...
    def get_delay_until_next(self, now: Optional[datetime] = None) -> float: ...
    def __hash__(self) -> int: ...
    def __eq__(self, other: Any) -> bool: ...

Usage Examples:

from minos.networks import CronTab
import asyncio

# Create cron for every 5 minutes
cron = CronTab("0 */5 * * * *")

# Sleep until next execution
await cron.sleep_until_next()

# Get delay until next run
delay_seconds = cron.get_delay_until_next()
print(f"Next run in {delay_seconds} seconds")

# Iterate over scheduled times
async for scheduled_time in cron:
    print(f"Executing at {scheduled_time}")
    break  # Exit after first execution

Periodic Task Management

Manage individual periodic tasks with lifecycle control.

class PeriodicTask:
    def __init__(self, crontab: Union[str, CronTab, CronTabImpl], fn: Callable[[ScheduledRequest], Awaitable[None]]): ...
    crontab: CronTab
    fn: Callable[[ScheduledRequest], Awaitable[None]]
    started: bool
    running: bool
    task: asyncio.Task
    async def start(self) -> None: ...
    async def stop(self, timeout: Optional[float] = None) -> None: ...
    async def run_forever(self) -> NoReturn: ...
    async def run_once(self, now: Optional[datetime] = None) -> None: ...

Usage Examples:

from minos.networks import PeriodicTask, ScheduledRequest

async def cleanup_handler(request: ScheduledRequest) -> None:
    print(f"Running cleanup at {request.scheduled_at}")
    # Perform cleanup logic

# Create periodic task
task = PeriodicTask(
    crontab="0 0 * * *",  # Daily at midnight
    fn=cleanup_handler
)

# Start the task
await task.start()
print(f"Task started: {task.started}")

# Task runs automatically based on cron schedule
await asyncio.sleep(3600)  # Let it run for an hour

# Stop the task
await task.stop(timeout=30)

Task Scheduler Service

Manage multiple periodic tasks as a service.

class PeriodicTaskScheduler:
    def __init__(self, tasks: set[PeriodicTask]): ...
    tasks: set[PeriodicTask]
    @classmethod  
    def _from_config(cls, config: Config, **kwargs) -> PeriodicTaskScheduler: ...
    @classmethod
    def _tasks_from_config(cls, config: Config, **kwargs) -> set[PeriodicTask]: ...
    async def start(self) -> None: ...
    async def stop(self, timeout: Optional[float] = None) -> None: ...

Usage Examples:

from minos.networks import PeriodicTaskScheduler, PeriodicTask

# Create multiple tasks
daily_task = PeriodicTask("0 0 * * *", daily_cleanup)
hourly_task = PeriodicTask("0 * * * *", hourly_report) 
minute_task = PeriodicTask("* * * * *", health_check)

# Create scheduler with tasks
scheduler = PeriodicTaskScheduler(
    tasks={daily_task, hourly_task, minute_task}
)

# Start all tasks
await scheduler.start()

# All tasks run automatically
# Stop all tasks
await scheduler.stop()

Periodic Port Service

Port implementation for periodic task scheduling with lifecycle management.

class PeriodicPort:
    scheduler: PeriodicTaskScheduler
    async def _start(self) -> None: ...
    async def _stop(self, exception: Exception = None) -> None: ...

class PeriodicTaskSchedulerService:
    """Deprecated - use PeriodicPort instead"""

Usage Examples:

from minos.networks import PeriodicPort
from minos.common import Config

# Create port from configuration
config = Config("config.yml")
port = PeriodicPort._from_config(config)

# Start periodic services
await port.start()

# Port manages scheduler lifecycle
# Stop periodic services
await port.stop()

Scheduled Request Interface

Request implementation for scheduled tasks.

class ScheduledRequest(Request):
    def __init__(self, scheduled_at: datetime): ...
    user: Optional[UUID]  # Always None for system requests
    has_content: bool     # Always True
    has_params: bool      # Always False
    async def _content(self, **kwargs) -> ScheduledRequestContent: ...
    def __eq__(self, other: Request) -> bool: ...
    def __repr__(self) -> str: ...

class ScheduledRequestContent:
    scheduled_at: datetime

class ScheduledResponseException:
    """Exception for scheduled task responses"""

Usage Examples:

from minos.networks import ScheduledRequest, enroute
from datetime import datetime

@enroute.periodic.event("0 */15 * * * *")  # Every 15 minutes
async def monitor_system(request: ScheduledRequest) -> Response:
    # Get scheduling information
    content = await request.content()
    scheduled_time = content.scheduled_at
    
    print(f"System monitor ran at {scheduled_time}")
    
    # Perform monitoring logic
    system_health = check_system_health()
    
    if not system_health:
        raise ScheduledResponseException("System unhealthy", status=500)
    
    return Response({"status": "healthy", "checked_at": scheduled_time})

Advanced Usage

Complex Cron Expressions

from minos.networks import enroute, CronTab

# Business hours only (9 AM to 5 PM, Monday to Friday)
@enroute.periodic.event("0 9-17 * * MON-FRI")
async def business_hours_task(request: ScheduledRequest) -> Response:
    return Response({"executed_during_business_hours": True})

# Every 30 seconds
@enroute.periodic.event("*/30 * * * * *")
async def frequent_check(request: ScheduledRequest) -> Response:
    return Response({"frequent_check": True})

# First day of every month at midnight
@enroute.periodic.event("0 0 1 * *")
async def monthly_report(request: ScheduledRequest) -> Response:
    return Response({"monthly_report": "generated"})

# Using CronTab object for complex patterns
complex_cron = CronTab("0 0 * * SUN")  # Every Sunday at midnight
@enroute.periodic.event(complex_cron)
async def weekly_backup(request: ScheduledRequest) -> Response:
    return Response({"backup": "completed"})

Complete Scheduling Service

from minos.networks import (
    PeriodicPort, PeriodicTask, PeriodicTaskScheduler, 
    enroute, ScheduledRequest, Response
)
from minos.common import Config

class ScheduledServices:
    @enroute.periodic.event("0 0 * * *")  # Daily at midnight
    async def daily_cleanup(self, request: ScheduledRequest) -> Response:
        content = await request.content()
        print(f"Daily cleanup at {content.scheduled_at}")
        
        # Cleanup logic
        cleanup_old_files()
        cleanup_database()
        
        return Response({"cleanup": "completed"})
    
    @enroute.periodic.event("0 */6 * * *")  # Every 6 hours
    async def health_check(self, request: ScheduledRequest) -> Response:
        content = await request.content()
        
        # Health check logic
        services_status = check_all_services()
        
        if not all(services_status.values()):
            alert_administrators(services_status)
        
        return Response({
            "health_check": services_status,
            "checked_at": content.scheduled_at
        })
    
    @enroute.periodic.event("0 0 1 * *")  # First of every month
    async def monthly_report(self, request: ScheduledRequest) -> Response:
        content = await request.content()
        
        # Generate monthly reports
        report = generate_monthly_analytics()
        send_report_to_stakeholders(report)
        
        return Response({
            "report": "generated",
            "period": content.scheduled_at.strftime("%Y-%m")
        })

# Setup and run scheduling service
config = Config("config.yml")
port = PeriodicPort._from_config(config)

# Start all scheduled tasks
await port.start()
print("Scheduled services started")

# Services run automatically
# Stop all scheduled tasks when shutting down
await port.stop()

Error Handling and Retries

from minos.networks import ScheduledResponseException
import asyncio

class RobustScheduledService:
    @enroute.periodic.event("0 */5 * * * *")  # Every 5 minutes
    async def robust_task(self, request: ScheduledRequest) -> Response:
        max_retries = 3
        retry_delay = 5  # seconds
        
        for attempt in range(max_retries):
            try:
                # Potentially failing operation
                result = await perform_critical_operation()
                
                return Response({
                    "result": result,
                    "attempt": attempt + 1
                })
                
            except Exception as e:
                if attempt == max_retries - 1:
                    # Final attempt failed
                    raise ScheduledResponseException(
                        f"Task failed after {max_retries} attempts: {e}",
                        status=500
                    )
                
                # Wait before retry
                await asyncio.sleep(retry_delay)
                retry_delay *= 2  # Exponential backoff
        
        # Should never reach here
        raise ScheduledResponseException("Unexpected error", status=500)

async def perform_critical_operation():
    # Simulate potentially failing operation
    import random
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Simulated failure")
    return "Operation successful"

Dynamic Task Management

class DynamicScheduler:
    def __init__(self):
        self.scheduler = None
        self.dynamic_tasks = {}
    
    async def add_task(self, task_id: str, cron_pattern: str, handler):
        """Dynamically add a new scheduled task"""
        task = PeriodicTask(cron_pattern, handler)
        self.dynamic_tasks[task_id] = task
        
        if self.scheduler:
            # Add to running scheduler
            self.scheduler.tasks.add(task)
            await task.start()
    
    async def remove_task(self, task_id: str):
        """Dynamically remove a scheduled task"""
        if task_id in self.dynamic_tasks:
            task = self.dynamic_tasks[task_id]
            await task.stop()
            
            if self.scheduler:
                self.scheduler.tasks.discard(task)
            
            del self.dynamic_tasks[task_id]
    
    async def start_scheduler(self):
        """Start the dynamic scheduler"""
        self.scheduler = PeriodicTaskScheduler(set(self.dynamic_tasks.values()))
        await self.scheduler.start()
    
    async def stop_scheduler(self):
        """Stop the dynamic scheduler"""
        if self.scheduler:
            await self.scheduler.stop()

# Usage
dynamic_scheduler = DynamicScheduler()

# Add tasks dynamically
await dynamic_scheduler.add_task(
    "cleanup",
    "0 2 * * *",  # 2 AM daily
    lambda req: cleanup_handler(req)
)

await dynamic_scheduler.add_task(
    "heartbeat", 
    "*/30 * * * * *",  # Every 30 seconds
    lambda req: heartbeat_handler(req)
)

# Start scheduling
await dynamic_scheduler.start_scheduler()

# Later, remove a task
await dynamic_scheduler.remove_task("heartbeat")

Install with Tessl CLI

npx tessl i tessl/pypi-minos-microservice-networks

docs

brokers.md

decorators.md

discovery.md

http.md

index.md

requests.md

routers.md

scheduling.md

specs.md

system-utils.md

tile.json