CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-nameko

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability

Pending
Overview
Eval results
Files

timer-scheduling.mddocs/

Timer and Scheduling

Built-in timer decorator for running periodic tasks and scheduled operations within services, supporting flexible scheduling patterns and robust execution management.

Capabilities

Timer Decorator

Decorator that schedules service methods to run at regular intervals with configurable timing and error handling.

def timer(interval):
    """
    Decorator to run a service method at regular intervals.
    
    Parameters:
    - interval: Time interval between executions in seconds (int or float)
    
    The decorated method will be called repeatedly at the specified interval
    from when the service starts until it stops.
    """

Usage Example:

from nameko.timer import timer
from nameko.rpc import rpc
import time
import logging

class ScheduledTaskService:
    name = "scheduled_task_service"
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.last_cleanup = time.time()
    
    @timer(interval=60)  # Run every 60 seconds
    def cleanup_expired_sessions(self):
        """Clean up expired user sessions every minute"""
        self.logger.info("Starting session cleanup task")
        
        # Simulate cleanup logic
        expired_count = self._cleanup_sessions()
        
        self.logger.info(f"Cleaned up {expired_count} expired sessions")
        self.last_cleanup = time.time()
    
    @timer(interval=300)  # Run every 5 minutes
    def health_check(self):
        """Perform system health checks"""
        self.logger.info("Performing health check")
        
        # Check database connectivity
        db_healthy = self._check_database_health()
        
        # Check external API availability
        api_healthy = self._check_external_apis()
        
        if not (db_healthy and api_healthy):
            self.logger.warning("System health check failed")
            # Could send alert here
        else:
            self.logger.info("System health check passed")
    
    @timer(interval=3600)  # Run every hour
    def generate_hourly_report(self):
        """Generate and send hourly system report"""
        self.logger.info("Generating hourly report")
        
        report_data = {
            'timestamp': time.time(),
            'active_users': self._count_active_users(),
            'processed_requests': self._count_processed_requests(),
            'system_load': self._get_system_load()
        }
        
        # Send report to monitoring system
        self._send_report(report_data)
        self.logger.info("Hourly report generated and sent")
    
    def _cleanup_sessions(self):
        # Simulate cleanup logic
        return 42
    
    def _check_database_health(self):
        # Database health check logic
        return True
    
    def _check_external_apis(self):
        # API health check logic
        return True
    
    def _count_active_users(self):
        return 150
    
    def _count_processed_requests(self):
        return 1000
    
    def _get_system_load(self):
        return 0.3
    
    def _send_report(self, data):
        # Send report logic
        pass

Sub-second Timing

Timer decorator supports sub-second intervals for high-frequency tasks.

class HighFrequencyService:
    name = "high_frequency_service"
    
    def __init__(self):
        self.counter = 0
    
    @timer(interval=0.1)  # Run every 100 milliseconds
    def high_frequency_task(self):
        """Task that runs 10 times per second"""
        self.counter += 1
        
        # Example: process real-time data stream
        self._process_realtime_data()
        
        if self.counter % 100 == 0:  # Log every 10 seconds
            print(f"Processed {self.counter} iterations")
    
    @timer(interval=0.5)  # Run every 500 milliseconds
    def semi_frequent_task(self):
        """Task that runs twice per second"""
        current_time = time.time()
        
        # Example: update cache or refresh data
        self._refresh_cache()
        
        print(f"Cache refreshed at {current_time}")

Error Handling in Timers

Timer methods should handle exceptions gracefully to prevent stopping the timer schedule.

import traceback
from nameko.timer import timer

class RobustTimerService:
    name = "robust_timer_service"
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.error_count = 0
    
    @timer(interval=30)
    def robust_scheduled_task(self):
        """Timer with comprehensive error handling"""
        try:
            # Potentially failing operation
            self._risky_operation()
            
            # Reset error count on success
            self.error_count = 0
            self.logger.info("Scheduled task completed successfully")
            
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Scheduled task failed (attempt {self.error_count}): {e}")
            self.logger.error(f"Full traceback: {traceback.format_exc()}")
            
            # Implement circuit breaker pattern
            if self.error_count >= 5:
                self.logger.critical("Too many consecutive failures, alerting administrators")
                self._send_alert(f"Timer task failing repeatedly: {e}")
                
            # Don't re-raise - let timer continue running
    
    @timer(interval=60)
    def task_with_retry_logic(self):
        """Timer with built-in retry logic"""
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                self._operation_that_might_fail()
                self.logger.info("Operation succeeded")
                return  # Success, exit retry loop
                
            except Exception as e:
                self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
                
                if attempt == max_retries - 1:
                    # Final attempt failed
                    self.logger.error(f"All {max_retries} attempts failed")
                    self._handle_final_failure(e)
                else:
                    # Wait before retry
                    time.sleep(2 ** attempt)  # Exponential backoff
    
    def _risky_operation(self):
        # Simulate operation that might fail
        import random
        if random.random() < 0.2:  # 20% chance of failure
            raise Exception("Simulated operation failure")
    
    def _operation_that_might_fail(self):
        # Another potentially failing operation
        pass
    
    def _send_alert(self, message):
        # Alert sending logic
        pass
    
    def _handle_final_failure(self, error):
        # Handle final failure (logging, alerting, etc.)
        pass

Timer with Service Dependencies

Timer methods can use all service dependencies like RPC proxies, databases, etc.

from nameko.timer import timer
from nameko.rpc import RpcProxy
from nameko.dependency_providers import Config

class DataSyncService:
    name = "data_sync_service"
    
    # Service dependencies available to timer methods
    user_service = RpcProxy('user_service')
    config = Config()
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.last_sync_time = None
    
    @timer(interval=600)  # Run every 10 minutes
    def sync_user_data(self):
        """Synchronize user data between services"""
        self.logger.info("Starting user data synchronization")
        
        try:
            # Get sync configuration
            batch_size = self.config.get('SYNC_BATCH_SIZE', 100)
            
            # Get users that need syncing from remote service
            users_to_sync = self.user_service.get_users_for_sync(
                since=self.last_sync_time,
                limit=batch_size
            )
            
            # Process each user
            synced_count = 0
            for user in users_to_sync:
                try:
                    self._sync_user_to_external_system(user)
                    synced_count += 1
                except Exception as e:
                    self.logger.error(f"Failed to sync user {user['id']}: {e}")
            
            self.last_sync_time = time.time()
            self.logger.info(f"Synchronized {synced_count}/{len(users_to_sync)} users")
            
        except Exception as e:
            self.logger.error(f"User data sync failed: {e}")
    
    @timer(interval=1800)  # Run every 30 minutes
    def cleanup_old_data(self):
        """Clean up old data across multiple services"""
        self.logger.info("Starting cross-service cleanup")
        
        try:
            # Clean up user service data
            cleanup_before = time.time() - (7 * 24 * 3600)  # 7 days ago
            
            result = self.user_service.cleanup_old_sessions(cleanup_before)
            self.logger.info(f"Cleaned up {result['deleted_count']} old sessions")
            
            # Could call other services for cleanup too
            # notification_service.cleanup_old_notifications(cleanup_before)
            # audit_service.archive_old_logs(cleanup_before)
            
        except Exception as e:
            self.logger.error(f"Cleanup task failed: {e}")
    
    def _sync_user_to_external_system(self, user):
        # Sync logic to external system
        pass

Advanced Scheduling Patterns

Complex scheduling patterns using timer with conditional logic.

import calendar
from datetime import datetime, time as dt_time

class AdvancedSchedulerService:
    name = "advanced_scheduler_service"
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
    
    @timer(interval=60)  # Check every minute
    def business_hours_only_task(self):
        """Task that only runs during business hours"""
        now = datetime.now()
        
        # Only run Monday-Friday, 9 AM - 5 PM
        if (now.weekday() < 5 and  # Monday = 0, Friday = 4
            dt_time(9, 0) <= now.time() <= dt_time(17, 0)):
            
            self.logger.info("Running business hours task")
            self._business_task()
        else:
            # Skip execution outside business hours
            pass
    
    @timer(interval=3600)  # Check every hour
    def end_of_day_task(self):
        """Task that runs at end of business day"""
        now = datetime.now()
        
        # Run at 6 PM on weekdays
        if (now.weekday() < 5 and 
            now.hour == 18 and 
            now.minute < 5):  # Run within first 5 minutes of 6 PM hour
            
            self.logger.info("Running end-of-day task")
            self._generate_daily_reports()
    
    @timer(interval=86400)  # Check once per day
    def monthly_task(self):
        """Task that runs on the first day of each month"""
        now = datetime.now()
        
        if now.day == 1:  # First day of month
            self.logger.info("Running monthly task")
            self._generate_monthly_report()
    
    @timer(interval=3600)  # Check every hour
    def adaptive_frequency_task(self):
        """Task with adaptive frequency based on load"""
        current_load = self._get_system_load()
        
        # Skip during high load periods
        if current_load > 0.8:
            self.logger.info("Skipping task due to high system load")
            return
        
        # More frequent processing during low load
        if current_load < 0.3:
            self.logger.info("Running intensive task during low load")
            self._intensive_processing()
        else:
            self.logger.info("Running standard task")
            self._standard_processing()
    
    def _business_task(self):
        pass
    
    def _generate_daily_reports(self):
        pass
    
    def _generate_monthly_report(self):
        pass
    
    def _get_system_load(self):
        # Return system load metric (0.0 to 1.0)
        return 0.5
    
    def _intensive_processing(self):
        pass
    
    def _standard_processing(self):
        pass

Timer Coordination

Patterns for coordinating timer execution across multiple service instances.

import uuid
from nameko.timer import timer
from nameko.dependency_providers import Config

class CoordinatedTimerService:
    name = "coordinated_timer_service"
    
    config = Config()
    
    def __init__(self):
        self.instance_id = str(uuid.uuid4())
        self.logger = logging.getLogger(__name__)
    
    @timer(interval=300)  # Run every 5 minutes
    def leader_election_task(self):
        """Task that uses leader election to run on only one instance"""
        
        # Simple leader election using external coordination service
        if self._acquire_leadership("cleanup_task", ttl=300):
            self.logger.info(f"Instance {self.instance_id} acquired leadership")
            
            try:
                self._perform_cleanup_task()
            finally:
                self._release_leadership("cleanup_task")
        else:
            self.logger.debug("Another instance is handling cleanup task")
    
    @timer(interval=60)
    def distributed_work_task(self):
        """Task that distributes work across instances"""
        
        # Get work items assigned to this instance
        work_items = self._get_work_for_instance(self.instance_id)
        
        for item in work_items:
            try:
                self._process_work_item(item)
                self._mark_work_completed(item['id'])
            except Exception as e:
                self.logger.error(f"Failed to process work item {item['id']}: {e}")
                self._mark_work_failed(item['id'])
    
    def _acquire_leadership(self, task_name, ttl):
        """Acquire distributed lock for task leadership"""
        # Implementation would use Redis, etcd, or database for coordination
        return True  # Simplified for example
    
    def _release_leadership(self, task_name):
        """Release distributed lock"""
        pass
    
    def _get_work_for_instance(self, instance_id):
        """Get work items assigned to this service instance"""
        # Implementation would use consistent hashing or work queue
        return []  # Simplified for example
    
    def _perform_cleanup_task(self):
        pass
    
    def _process_work_item(self, item):
        pass
    
    def _mark_work_completed(self, item_id):
        pass
    
    def _mark_work_failed(self, item_id):
        pass

Performance Considerations

Best practices for timer performance and resource management.

class OptimizedTimerService:
    name = "optimized_timer_service"
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.batch_buffer = []
        self.last_batch_time = time.time()
    
    @timer(interval=10)  # Frequent collection
    def collect_metrics_batch(self):
        """Collect metrics in batches for efficiency"""
        
        # Collect current metrics
        current_metrics = self._collect_current_metrics()
        self.batch_buffer.extend(current_metrics)
        
        # Process batch when it reaches size limit or time limit
        if (len(self.batch_buffer) >= 100 or 
            time.time() - self.last_batch_time >= 60):
            
            self._process_metrics_batch(self.batch_buffer)
            self.batch_buffer = []
            self.last_batch_time = time.time()
    
    @timer(interval=1)  # High frequency but lightweight
    def lightweight_monitoring(self):
        """High-frequency monitoring with minimal overhead"""
        
        # Only collect essential metrics to minimize impact
        cpu_usage = self._get_cpu_usage()  # Fast system call
        
        # Only log if significant change
        if abs(cpu_usage - getattr(self, '_last_cpu', 0)) > 0.1:
            self.logger.debug(f"CPU usage: {cpu_usage:.1%}")
            self._last_cpu = cpu_usage
    
    def _collect_current_metrics(self):
        # Return list of current metrics
        return [{'timestamp': time.time(), 'value': 1.0}]
    
    def _process_metrics_batch(self, metrics):
        # Process batch of metrics efficiently
        self.logger.info(f"Processed batch of {len(metrics)} metrics")
    
    def _get_cpu_usage(self):
        # Return current CPU usage (mock)
        return 0.5

Install with Tessl CLI

npx tessl i tessl/pypi-nameko

docs

cli-interface.md

dependency-injection.md

event-system.md

http-interface.md

index.md

rpc-communication.md

service-management.md

standalone-clients.md

testing-framework.md

timer-scheduling.md

tile.json