0
# Timer and Scheduling
1
2
Built-in timer decorator for running periodic tasks and scheduled operations within services, supporting flexible scheduling patterns and robust execution management.
3
4
## Capabilities
5
6
### Timer Decorator
7
8
Decorator that schedules service methods to run at regular intervals with configurable timing and error handling.
9
10
```python { .api }
11
def timer(interval):
12
"""
13
Decorator to run a service method at regular intervals.
14
15
Parameters:
16
- interval: Time interval between executions in seconds (int or float)
17
18
The decorated method will be called repeatedly at the specified interval
19
from when the service starts until it stops.
20
"""
21
```
22
23
**Usage Example:**
24
25
```python
26
from nameko.timer import timer
27
from nameko.rpc import rpc
28
import time
29
import logging
30
31
class ScheduledTaskService:
32
name = "scheduled_task_service"
33
34
def __init__(self):
35
self.logger = logging.getLogger(__name__)
36
self.last_cleanup = time.time()
37
38
@timer(interval=60) # Run every 60 seconds
39
def cleanup_expired_sessions(self):
40
"""Clean up expired user sessions every minute"""
41
self.logger.info("Starting session cleanup task")
42
43
# Simulate cleanup logic
44
expired_count = self._cleanup_sessions()
45
46
self.logger.info(f"Cleaned up {expired_count} expired sessions")
47
self.last_cleanup = time.time()
48
49
@timer(interval=300) # Run every 5 minutes
50
def health_check(self):
51
"""Perform system health checks"""
52
self.logger.info("Performing health check")
53
54
# Check database connectivity
55
db_healthy = self._check_database_health()
56
57
# Check external API availability
58
api_healthy = self._check_external_apis()
59
60
if not (db_healthy and api_healthy):
61
self.logger.warning("System health check failed")
62
# Could send alert here
63
else:
64
self.logger.info("System health check passed")
65
66
@timer(interval=3600) # Run every hour
67
def generate_hourly_report(self):
68
"""Generate and send hourly system report"""
69
self.logger.info("Generating hourly report")
70
71
report_data = {
72
'timestamp': time.time(),
73
'active_users': self._count_active_users(),
74
'processed_requests': self._count_processed_requests(),
75
'system_load': self._get_system_load()
76
}
77
78
# Send report to monitoring system
79
self._send_report(report_data)
80
self.logger.info("Hourly report generated and sent")
81
82
def _cleanup_sessions(self):
83
# Simulate cleanup logic
84
return 42
85
86
def _check_database_health(self):
87
# Database health check logic
88
return True
89
90
def _check_external_apis(self):
91
# API health check logic
92
return True
93
94
def _count_active_users(self):
95
return 150
96
97
def _count_processed_requests(self):
98
return 1000
99
100
def _get_system_load(self):
101
return 0.3
102
103
def _send_report(self, data):
104
# Send report logic
105
pass
106
```
107
108
### Sub-second Timing
109
110
Timer decorator supports sub-second intervals for high-frequency tasks.
111
112
```python
113
class HighFrequencyService:
114
name = "high_frequency_service"
115
116
def __init__(self):
117
self.counter = 0
118
119
@timer(interval=0.1) # Run every 100 milliseconds
120
def high_frequency_task(self):
121
"""Task that runs 10 times per second"""
122
self.counter += 1
123
124
# Example: process real-time data stream
125
self._process_realtime_data()
126
127
if self.counter % 100 == 0: # Log every 10 seconds
128
print(f"Processed {self.counter} iterations")
129
130
@timer(interval=0.5) # Run every 500 milliseconds
131
def semi_frequent_task(self):
132
"""Task that runs twice per second"""
133
current_time = time.time()
134
135
# Example: update cache or refresh data
136
self._refresh_cache()
137
138
print(f"Cache refreshed at {current_time}")
139
```
140
141
### Error Handling in Timers
142
143
Timer methods should handle exceptions gracefully to prevent stopping the timer schedule.
144
145
```python
146
import traceback
147
from nameko.timer import timer
148
149
class RobustTimerService:
150
name = "robust_timer_service"
151
152
def __init__(self):
153
self.logger = logging.getLogger(__name__)
154
self.error_count = 0
155
156
@timer(interval=30)
157
def robust_scheduled_task(self):
158
"""Timer with comprehensive error handling"""
159
try:
160
# Potentially failing operation
161
self._risky_operation()
162
163
# Reset error count on success
164
self.error_count = 0
165
self.logger.info("Scheduled task completed successfully")
166
167
except Exception as e:
168
self.error_count += 1
169
self.logger.error(f"Scheduled task failed (attempt {self.error_count}): {e}")
170
self.logger.error(f"Full traceback: {traceback.format_exc()}")
171
172
# Implement circuit breaker pattern
173
if self.error_count >= 5:
174
self.logger.critical("Too many consecutive failures, alerting administrators")
175
self._send_alert(f"Timer task failing repeatedly: {e}")
176
177
# Don't re-raise - let timer continue running
178
179
@timer(interval=60)
180
def task_with_retry_logic(self):
181
"""Timer with built-in retry logic"""
182
max_retries = 3
183
184
for attempt in range(max_retries):
185
try:
186
self._operation_that_might_fail()
187
self.logger.info("Operation succeeded")
188
return # Success, exit retry loop
189
190
except Exception as e:
191
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
192
193
if attempt == max_retries - 1:
194
# Final attempt failed
195
self.logger.error(f"All {max_retries} attempts failed")
196
self._handle_final_failure(e)
197
else:
198
# Wait before retry
199
time.sleep(2 ** attempt) # Exponential backoff
200
201
def _risky_operation(self):
202
# Simulate operation that might fail
203
import random
204
if random.random() < 0.2: # 20% chance of failure
205
raise Exception("Simulated operation failure")
206
207
def _operation_that_might_fail(self):
208
# Another potentially failing operation
209
pass
210
211
def _send_alert(self, message):
212
# Alert sending logic
213
pass
214
215
def _handle_final_failure(self, error):
216
# Handle final failure (logging, alerting, etc.)
217
pass
218
```
219
220
### Timer with Service Dependencies
221
222
Timer methods can use all service dependencies like RPC proxies, databases, etc.
223
224
```python
225
from nameko.timer import timer
226
from nameko.rpc import RpcProxy
227
from nameko.dependency_providers import Config
228
229
class DataSyncService:
230
name = "data_sync_service"
231
232
# Service dependencies available to timer methods
233
user_service = RpcProxy('user_service')
234
config = Config()
235
236
def __init__(self):
237
self.logger = logging.getLogger(__name__)
238
self.last_sync_time = None
239
240
@timer(interval=600) # Run every 10 minutes
241
def sync_user_data(self):
242
"""Synchronize user data between services"""
243
self.logger.info("Starting user data synchronization")
244
245
try:
246
# Get sync configuration
247
batch_size = self.config.get('SYNC_BATCH_SIZE', 100)
248
249
# Get users that need syncing from remote service
250
users_to_sync = self.user_service.get_users_for_sync(
251
since=self.last_sync_time,
252
limit=batch_size
253
)
254
255
# Process each user
256
synced_count = 0
257
for user in users_to_sync:
258
try:
259
self._sync_user_to_external_system(user)
260
synced_count += 1
261
except Exception as e:
262
self.logger.error(f"Failed to sync user {user['id']}: {e}")
263
264
self.last_sync_time = time.time()
265
self.logger.info(f"Synchronized {synced_count}/{len(users_to_sync)} users")
266
267
except Exception as e:
268
self.logger.error(f"User data sync failed: {e}")
269
270
@timer(interval=1800) # Run every 30 minutes
271
def cleanup_old_data(self):
272
"""Clean up old data across multiple services"""
273
self.logger.info("Starting cross-service cleanup")
274
275
try:
276
# Clean up user service data
277
cleanup_before = time.time() - (7 * 24 * 3600) # 7 days ago
278
279
result = self.user_service.cleanup_old_sessions(cleanup_before)
280
self.logger.info(f"Cleaned up {result['deleted_count']} old sessions")
281
282
# Could call other services for cleanup too
283
# notification_service.cleanup_old_notifications(cleanup_before)
284
# audit_service.archive_old_logs(cleanup_before)
285
286
except Exception as e:
287
self.logger.error(f"Cleanup task failed: {e}")
288
289
def _sync_user_to_external_system(self, user):
290
# Sync logic to external system
291
pass
292
```
293
294
### Advanced Scheduling Patterns
295
296
Complex scheduling patterns using timer with conditional logic.
297
298
```python
299
import calendar
300
from datetime import datetime, time as dt_time
301
302
class AdvancedSchedulerService:
303
name = "advanced_scheduler_service"
304
305
def __init__(self):
306
self.logger = logging.getLogger(__name__)
307
308
@timer(interval=60) # Check every minute
309
def business_hours_only_task(self):
310
"""Task that only runs during business hours"""
311
now = datetime.now()
312
313
# Only run Monday-Friday, 9 AM - 5 PM
314
if (now.weekday() < 5 and # Monday = 0, Friday = 4
315
dt_time(9, 0) <= now.time() <= dt_time(17, 0)):
316
317
self.logger.info("Running business hours task")
318
self._business_task()
319
else:
320
# Skip execution outside business hours
321
pass
322
323
@timer(interval=3600) # Check every hour
324
def end_of_day_task(self):
325
"""Task that runs at end of business day"""
326
now = datetime.now()
327
328
# Run at 6 PM on weekdays
329
if (now.weekday() < 5 and
330
now.hour == 18 and
331
now.minute < 5): # Run within first 5 minutes of 6 PM hour
332
333
self.logger.info("Running end-of-day task")
334
self._generate_daily_reports()
335
336
@timer(interval=86400) # Check once per day
337
def monthly_task(self):
338
"""Task that runs on the first day of each month"""
339
now = datetime.now()
340
341
if now.day == 1: # First day of month
342
self.logger.info("Running monthly task")
343
self._generate_monthly_report()
344
345
@timer(interval=3600) # Check every hour
346
def adaptive_frequency_task(self):
347
"""Task with adaptive frequency based on load"""
348
current_load = self._get_system_load()
349
350
# Skip during high load periods
351
if current_load > 0.8:
352
self.logger.info("Skipping task due to high system load")
353
return
354
355
# More frequent processing during low load
356
if current_load < 0.3:
357
self.logger.info("Running intensive task during low load")
358
self._intensive_processing()
359
else:
360
self.logger.info("Running standard task")
361
self._standard_processing()
362
363
def _business_task(self):
364
pass
365
366
def _generate_daily_reports(self):
367
pass
368
369
def _generate_monthly_report(self):
370
pass
371
372
def _get_system_load(self):
373
# Return system load metric (0.0 to 1.0)
374
return 0.5
375
376
def _intensive_processing(self):
377
pass
378
379
def _standard_processing(self):
380
pass
381
```
382
383
### Timer Coordination
384
385
Patterns for coordinating timer execution across multiple service instances.
386
387
```python
388
import uuid
389
from nameko.timer import timer
390
from nameko.dependency_providers import Config
391
392
class CoordinatedTimerService:
393
name = "coordinated_timer_service"
394
395
config = Config()
396
397
def __init__(self):
398
self.instance_id = str(uuid.uuid4())
399
self.logger = logging.getLogger(__name__)
400
401
@timer(interval=300) # Run every 5 minutes
402
def leader_election_task(self):
403
"""Task that uses leader election to run on only one instance"""
404
405
# Simple leader election using external coordination service
406
if self._acquire_leadership("cleanup_task", ttl=300):
407
self.logger.info(f"Instance {self.instance_id} acquired leadership")
408
409
try:
410
self._perform_cleanup_task()
411
finally:
412
self._release_leadership("cleanup_task")
413
else:
414
self.logger.debug("Another instance is handling cleanup task")
415
416
@timer(interval=60)
417
def distributed_work_task(self):
418
"""Task that distributes work across instances"""
419
420
# Get work items assigned to this instance
421
work_items = self._get_work_for_instance(self.instance_id)
422
423
for item in work_items:
424
try:
425
self._process_work_item(item)
426
self._mark_work_completed(item['id'])
427
except Exception as e:
428
self.logger.error(f"Failed to process work item {item['id']}: {e}")
429
self._mark_work_failed(item['id'])
430
431
def _acquire_leadership(self, task_name, ttl):
432
"""Acquire distributed lock for task leadership"""
433
# Implementation would use Redis, etcd, or database for coordination
434
return True # Simplified for example
435
436
def _release_leadership(self, task_name):
437
"""Release distributed lock"""
438
pass
439
440
def _get_work_for_instance(self, instance_id):
441
"""Get work items assigned to this service instance"""
442
# Implementation would use consistent hashing or work queue
443
return [] # Simplified for example
444
445
def _perform_cleanup_task(self):
446
pass
447
448
def _process_work_item(self, item):
449
pass
450
451
def _mark_work_completed(self, item_id):
452
pass
453
454
def _mark_work_failed(self, item_id):
455
pass
456
```
457
458
### Performance Considerations
459
460
Best practices for timer performance and resource management.
461
462
```python
463
class OptimizedTimerService:
464
name = "optimized_timer_service"
465
466
def __init__(self):
467
self.logger = logging.getLogger(__name__)
468
self.batch_buffer = []
469
self.last_batch_time = time.time()
470
471
@timer(interval=10) # Frequent collection
472
def collect_metrics_batch(self):
473
"""Collect metrics in batches for efficiency"""
474
475
# Collect current metrics
476
current_metrics = self._collect_current_metrics()
477
self.batch_buffer.extend(current_metrics)
478
479
# Process batch when it reaches size limit or time limit
480
if (len(self.batch_buffer) >= 100 or
481
time.time() - self.last_batch_time >= 60):
482
483
self._process_metrics_batch(self.batch_buffer)
484
self.batch_buffer = []
485
self.last_batch_time = time.time()
486
487
@timer(interval=1) # High frequency but lightweight
488
def lightweight_monitoring(self):
489
"""High-frequency monitoring with minimal overhead"""
490
491
# Only collect essential metrics to minimize impact
492
cpu_usage = self._get_cpu_usage() # Fast system call
493
494
# Only log if significant change
495
if abs(cpu_usage - getattr(self, '_last_cpu', 0)) > 0.1:
496
self.logger.debug(f"CPU usage: {cpu_usage:.1%}")
497
self._last_cpu = cpu_usage
498
499
def _collect_current_metrics(self):
500
# Return list of current metrics
501
return [{'timestamp': time.time(), 'value': 1.0}]
502
503
def _process_metrics_batch(self, metrics):
504
# Process batch of metrics efficiently
505
self.logger.info(f"Processed batch of {len(metrics)} metrics")
506
507
def _get_cpu_usage(self):
508
# Return current CPU usage (mock)
509
return 0.5
510
```