A microservices framework for Python that lets service developers concentrate on application logic and encourages testability
—
Built-in timer decorator for running periodic tasks and scheduled operations within services, supporting flexible scheduling patterns and robust execution management.
Decorator that schedules service methods to run at regular intervals with configurable timing and error handling.
def timer(interval):
"""
Decorator to run a service method at regular intervals.
Parameters:
- interval: Time interval between executions in seconds (int or float)
The decorated method will be called repeatedly at the specified interval
from when the service starts until it stops.
"""Usage Example:
from nameko.timer import timer
from nameko.rpc import rpc
import time
import logging
class ScheduledTaskService:
name = "scheduled_task_service"
def __init__(self):
self.logger = logging.getLogger(__name__)
self.last_cleanup = time.time()
@timer(interval=60) # Run every 60 seconds
def cleanup_expired_sessions(self):
"""Clean up expired user sessions every minute"""
self.logger.info("Starting session cleanup task")
# Simulate cleanup logic
expired_count = self._cleanup_sessions()
self.logger.info(f"Cleaned up {expired_count} expired sessions")
self.last_cleanup = time.time()
@timer(interval=300) # Run every 5 minutes
def health_check(self):
"""Perform system health checks"""
self.logger.info("Performing health check")
# Check database connectivity
db_healthy = self._check_database_health()
# Check external API availability
api_healthy = self._check_external_apis()
if not (db_healthy and api_healthy):
self.logger.warning("System health check failed")
# Could send alert here
else:
self.logger.info("System health check passed")
@timer(interval=3600) # Run every hour
def generate_hourly_report(self):
"""Generate and send hourly system report"""
self.logger.info("Generating hourly report")
report_data = {
'timestamp': time.time(),
'active_users': self._count_active_users(),
'processed_requests': self._count_processed_requests(),
'system_load': self._get_system_load()
}
# Send report to monitoring system
self._send_report(report_data)
self.logger.info("Hourly report generated and sent")
def _cleanup_sessions(self):
# Simulate cleanup logic
return 42
def _check_database_health(self):
# Database health check logic
return True
def _check_external_apis(self):
# API health check logic
return True
def _count_active_users(self):
return 150
def _count_processed_requests(self):
return 1000
def _get_system_load(self):
return 0.3
def _send_report(self, data):
# Send report logic
passTimer decorator supports sub-second intervals for high-frequency tasks.
class HighFrequencyService:
name = "high_frequency_service"
def __init__(self):
self.counter = 0
@timer(interval=0.1) # Run every 100 milliseconds
def high_frequency_task(self):
"""Task that runs 10 times per second"""
self.counter += 1
# Example: process real-time data stream
self._process_realtime_data()
if self.counter % 100 == 0: # Log every 10 seconds
print(f"Processed {self.counter} iterations")
@timer(interval=0.5) # Run every 500 milliseconds
def semi_frequent_task(self):
"""Task that runs twice per second"""
current_time = time.time()
# Example: update cache or refresh data
self._refresh_cache()
print(f"Cache refreshed at {current_time}")Timer methods should handle exceptions gracefully to prevent stopping the timer schedule.
import traceback
from nameko.timer import timer
class RobustTimerService:
name = "robust_timer_service"
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_count = 0
@timer(interval=30)
def robust_scheduled_task(self):
"""Timer with comprehensive error handling"""
try:
# Potentially failing operation
self._risky_operation()
# Reset error count on success
self.error_count = 0
self.logger.info("Scheduled task completed successfully")
except Exception as e:
self.error_count += 1
self.logger.error(f"Scheduled task failed (attempt {self.error_count}): {e}")
self.logger.error(f"Full traceback: {traceback.format_exc()}")
# Implement circuit breaker pattern
if self.error_count >= 5:
self.logger.critical("Too many consecutive failures, alerting administrators")
self._send_alert(f"Timer task failing repeatedly: {e}")
# Don't re-raise - let timer continue running
@timer(interval=60)
def task_with_retry_logic(self):
"""Timer with built-in retry logic"""
max_retries = 3
for attempt in range(max_retries):
try:
self._operation_that_might_fail()
self.logger.info("Operation succeeded")
return # Success, exit retry loop
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
# Final attempt failed
self.logger.error(f"All {max_retries} attempts failed")
self._handle_final_failure(e)
else:
# Wait before retry
time.sleep(2 ** attempt) # Exponential backoff
def _risky_operation(self):
# Simulate operation that might fail
import random
if random.random() < 0.2: # 20% chance of failure
raise Exception("Simulated operation failure")
def _operation_that_might_fail(self):
# Another potentially failing operation
pass
def _send_alert(self, message):
# Alert sending logic
pass
def _handle_final_failure(self, error):
# Handle final failure (logging, alerting, etc.)
passTimer methods can use all service dependencies like RPC proxies, databases, etc.
from nameko.timer import timer
from nameko.rpc import RpcProxy
from nameko.dependency_providers import Config
class DataSyncService:
name = "data_sync_service"
# Service dependencies available to timer methods
user_service = RpcProxy('user_service')
config = Config()
def __init__(self):
self.logger = logging.getLogger(__name__)
self.last_sync_time = None
@timer(interval=600) # Run every 10 minutes
def sync_user_data(self):
"""Synchronize user data between services"""
self.logger.info("Starting user data synchronization")
try:
# Get sync configuration
batch_size = self.config.get('SYNC_BATCH_SIZE', 100)
# Get users that need syncing from remote service
users_to_sync = self.user_service.get_users_for_sync(
since=self.last_sync_time,
limit=batch_size
)
# Process each user
synced_count = 0
for user in users_to_sync:
try:
self._sync_user_to_external_system(user)
synced_count += 1
except Exception as e:
self.logger.error(f"Failed to sync user {user['id']}: {e}")
self.last_sync_time = time.time()
self.logger.info(f"Synchronized {synced_count}/{len(users_to_sync)} users")
except Exception as e:
self.logger.error(f"User data sync failed: {e}")
@timer(interval=1800) # Run every 30 minutes
def cleanup_old_data(self):
"""Clean up old data across multiple services"""
self.logger.info("Starting cross-service cleanup")
try:
# Clean up user service data
cleanup_before = time.time() - (7 * 24 * 3600) # 7 days ago
result = self.user_service.cleanup_old_sessions(cleanup_before)
self.logger.info(f"Cleaned up {result['deleted_count']} old sessions")
# Could call other services for cleanup too
# notification_service.cleanup_old_notifications(cleanup_before)
# audit_service.archive_old_logs(cleanup_before)
except Exception as e:
self.logger.error(f"Cleanup task failed: {e}")
def _sync_user_to_external_system(self, user):
# Sync logic to external system
passComplex scheduling patterns using timer with conditional logic.
import calendar
from datetime import datetime, time as dt_time
class AdvancedSchedulerService:
name = "advanced_scheduler_service"
def __init__(self):
self.logger = logging.getLogger(__name__)
@timer(interval=60) # Check every minute
def business_hours_only_task(self):
"""Task that only runs during business hours"""
now = datetime.now()
# Only run Monday-Friday, 9 AM - 5 PM
if (now.weekday() < 5 and # Monday = 0, Friday = 4
dt_time(9, 0) <= now.time() <= dt_time(17, 0)):
self.logger.info("Running business hours task")
self._business_task()
else:
# Skip execution outside business hours
pass
@timer(interval=3600) # Check every hour
def end_of_day_task(self):
"""Task that runs at end of business day"""
now = datetime.now()
# Run at 6 PM on weekdays
if (now.weekday() < 5 and
now.hour == 18 and
now.minute < 5): # Run within first 5 minutes of 6 PM hour
self.logger.info("Running end-of-day task")
self._generate_daily_reports()
@timer(interval=86400) # Check once per day
def monthly_task(self):
"""Task that runs on the first day of each month"""
now = datetime.now()
if now.day == 1: # First day of month
self.logger.info("Running monthly task")
self._generate_monthly_report()
@timer(interval=3600) # Check every hour
def adaptive_frequency_task(self):
"""Task with adaptive frequency based on load"""
current_load = self._get_system_load()
# Skip during high load periods
if current_load > 0.8:
self.logger.info("Skipping task due to high system load")
return
# More frequent processing during low load
if current_load < 0.3:
self.logger.info("Running intensive task during low load")
self._intensive_processing()
else:
self.logger.info("Running standard task")
self._standard_processing()
def _business_task(self):
pass
def _generate_daily_reports(self):
pass
def _generate_monthly_report(self):
pass
def _get_system_load(self):
# Return system load metric (0.0 to 1.0)
return 0.5
def _intensive_processing(self):
pass
def _standard_processing(self):
passPatterns for coordinating timer execution across multiple service instances.
import uuid
from nameko.timer import timer
from nameko.dependency_providers import Config
class CoordinatedTimerService:
name = "coordinated_timer_service"
config = Config()
def __init__(self):
self.instance_id = str(uuid.uuid4())
self.logger = logging.getLogger(__name__)
@timer(interval=300) # Run every 5 minutes
def leader_election_task(self):
"""Task that uses leader election to run on only one instance"""
# Simple leader election using external coordination service
if self._acquire_leadership("cleanup_task", ttl=300):
self.logger.info(f"Instance {self.instance_id} acquired leadership")
try:
self._perform_cleanup_task()
finally:
self._release_leadership("cleanup_task")
else:
self.logger.debug("Another instance is handling cleanup task")
@timer(interval=60)
def distributed_work_task(self):
"""Task that distributes work across instances"""
# Get work items assigned to this instance
work_items = self._get_work_for_instance(self.instance_id)
for item in work_items:
try:
self._process_work_item(item)
self._mark_work_completed(item['id'])
except Exception as e:
self.logger.error(f"Failed to process work item {item['id']}: {e}")
self._mark_work_failed(item['id'])
def _acquire_leadership(self, task_name, ttl):
"""Acquire distributed lock for task leadership"""
# Implementation would use Redis, etcd, or database for coordination
return True # Simplified for example
def _release_leadership(self, task_name):
"""Release distributed lock"""
pass
def _get_work_for_instance(self, instance_id):
"""Get work items assigned to this service instance"""
# Implementation would use consistent hashing or work queue
return [] # Simplified for example
def _perform_cleanup_task(self):
pass
def _process_work_item(self, item):
pass
def _mark_work_completed(self, item_id):
pass
def _mark_work_failed(self, item_id):
passBest practices for timer performance and resource management.
class OptimizedTimerService:
name = "optimized_timer_service"
def __init__(self):
self.logger = logging.getLogger(__name__)
self.batch_buffer = []
self.last_batch_time = time.time()
@timer(interval=10) # Frequent collection
def collect_metrics_batch(self):
"""Collect metrics in batches for efficiency"""
# Collect current metrics
current_metrics = self._collect_current_metrics()
self.batch_buffer.extend(current_metrics)
# Process batch when it reaches size limit or time limit
if (len(self.batch_buffer) >= 100 or
time.time() - self.last_batch_time >= 60):
self._process_metrics_batch(self.batch_buffer)
self.batch_buffer = []
self.last_batch_time = time.time()
@timer(interval=1) # High frequency but lightweight
def lightweight_monitoring(self):
"""High-frequency monitoring with minimal overhead"""
# Only collect essential metrics to minimize impact
cpu_usage = self._get_cpu_usage() # Fast system call
# Only log if significant change
if abs(cpu_usage - getattr(self, '_last_cpu', 0)) > 0.1:
self.logger.debug(f"CPU usage: {cpu_usage:.1%}")
self._last_cpu = cpu_usage
def _collect_current_metrics(self):
# Return list of current metrics
return [{'timestamp': time.time(), 'value': 1.0}]
def _process_metrics_batch(self, metrics):
# Process batch of metrics efficiently
self.logger.info(f"Processed batch of {len(metrics)} metrics")
def _get_cpu_usage(self):
# Return current CPU usage (mock)
return 0.5Install with Tessl CLI
npx tessl i tessl/pypi-nameko