or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

timer-scheduling.mddocs/

0

# Timer and Scheduling

1

2

Built-in timer decorator for running periodic tasks and scheduled operations within services, supporting flexible scheduling patterns and robust execution management.

3

4

## Capabilities

5

6

### Timer Decorator

7

8

Decorator that schedules service methods to run at regular intervals with configurable timing and error handling.

9

10

```python { .api }

11

def timer(interval):

12

"""

13

Decorator to run a service method at regular intervals.

14

15

Parameters:

16

- interval: Time interval between executions in seconds (int or float)

17

18

The decorated method will be called repeatedly at the specified interval

19

from when the service starts until it stops.

20

"""

21

```

22

23

**Usage Example:**

24

25

```python

26

from nameko.timer import timer

27

from nameko.rpc import rpc

28

import time

29

import logging

30

31

class ScheduledTaskService:

32

name = "scheduled_task_service"

33

34

def __init__(self):

35

self.logger = logging.getLogger(__name__)

36

self.last_cleanup = time.time()

37

38

@timer(interval=60) # Run every 60 seconds

39

def cleanup_expired_sessions(self):

40

"""Clean up expired user sessions every minute"""

41

self.logger.info("Starting session cleanup task")

42

43

# Simulate cleanup logic

44

expired_count = self._cleanup_sessions()

45

46

self.logger.info(f"Cleaned up {expired_count} expired sessions")

47

self.last_cleanup = time.time()

48

49

@timer(interval=300) # Run every 5 minutes

50

def health_check(self):

51

"""Perform system health checks"""

52

self.logger.info("Performing health check")

53

54

# Check database connectivity

55

db_healthy = self._check_database_health()

56

57

# Check external API availability

58

api_healthy = self._check_external_apis()

59

60

if not (db_healthy and api_healthy):

61

self.logger.warning("System health check failed")

62

# Could send alert here

63

else:

64

self.logger.info("System health check passed")

65

66

@timer(interval=3600) # Run every hour

67

def generate_hourly_report(self):

68

"""Generate and send hourly system report"""

69

self.logger.info("Generating hourly report")

70

71

report_data = {

72

'timestamp': time.time(),

73

'active_users': self._count_active_users(),

74

'processed_requests': self._count_processed_requests(),

75

'system_load': self._get_system_load()

76

}

77

78

# Send report to monitoring system

79

self._send_report(report_data)

80

self.logger.info("Hourly report generated and sent")

81

82

def _cleanup_sessions(self):

83

# Simulate cleanup logic

84

return 42

85

86

def _check_database_health(self):

87

# Database health check logic

88

return True

89

90

def _check_external_apis(self):

91

# API health check logic

92

return True

93

94

def _count_active_users(self):

95

return 150

96

97

def _count_processed_requests(self):

98

return 1000

99

100

def _get_system_load(self):

101

return 0.3

102

103

def _send_report(self, data):

104

# Send report logic

105

pass

106

```

107

108

### Sub-second Timing

109

110

Timer decorator supports sub-second intervals for high-frequency tasks.

111

112

```python

113

class HighFrequencyService:

114

name = "high_frequency_service"

115

116

def __init__(self):

117

self.counter = 0

118

119

@timer(interval=0.1) # Run every 100 milliseconds

120

def high_frequency_task(self):

121

"""Task that runs 10 times per second"""

122

self.counter += 1

123

124

# Example: process real-time data stream

125

self._process_realtime_data()

126

127

if self.counter % 100 == 0: # Log every 10 seconds

128

print(f"Processed {self.counter} iterations")

129

130

@timer(interval=0.5) # Run every 500 milliseconds

131

def semi_frequent_task(self):

132

"""Task that runs twice per second"""

133

current_time = time.time()

134

135

# Example: update cache or refresh data

136

self._refresh_cache()

137

138

print(f"Cache refreshed at {current_time}")

139

```

140

141

### Error Handling in Timers

142

143

Timer methods should handle exceptions gracefully to prevent stopping the timer schedule.

144

145

```python

146

import traceback

147

from nameko.timer import timer

148

149

class RobustTimerService:

150

name = "robust_timer_service"

151

152

def __init__(self):

153

self.logger = logging.getLogger(__name__)

154

self.error_count = 0

155

156

@timer(interval=30)

157

def robust_scheduled_task(self):

158

"""Timer with comprehensive error handling"""

159

try:

160

# Potentially failing operation

161

self._risky_operation()

162

163

# Reset error count on success

164

self.error_count = 0

165

self.logger.info("Scheduled task completed successfully")

166

167

except Exception as e:

168

self.error_count += 1

169

self.logger.error(f"Scheduled task failed (attempt {self.error_count}): {e}")

170

self.logger.error(f"Full traceback: {traceback.format_exc()}")

171

172

# Implement circuit breaker pattern

173

if self.error_count >= 5:

174

self.logger.critical("Too many consecutive failures, alerting administrators")

175

self._send_alert(f"Timer task failing repeatedly: {e}")

176

177

# Don't re-raise - let timer continue running

178

179

@timer(interval=60)

180

def task_with_retry_logic(self):

181

"""Timer with built-in retry logic"""

182

max_retries = 3

183

184

for attempt in range(max_retries):

185

try:

186

self._operation_that_might_fail()

187

self.logger.info("Operation succeeded")

188

return # Success, exit retry loop

189

190

except Exception as e:

191

self.logger.warning(f"Attempt {attempt + 1} failed: {e}")

192

193

if attempt == max_retries - 1:

194

# Final attempt failed

195

self.logger.error(f"All {max_retries} attempts failed")

196

self._handle_final_failure(e)

197

else:

198

# Wait before retry

199

time.sleep(2 ** attempt) # Exponential backoff

200

201

def _risky_operation(self):

202

# Simulate operation that might fail

203

import random

204

if random.random() < 0.2: # 20% chance of failure

205

raise Exception("Simulated operation failure")

206

207

def _operation_that_might_fail(self):

208

# Another potentially failing operation

209

pass

210

211

def _send_alert(self, message):

212

# Alert sending logic

213

pass

214

215

def _handle_final_failure(self, error):

216

# Handle final failure (logging, alerting, etc.)

217

pass

218

```

219

220

### Timer with Service Dependencies

221

222

Timer methods can use all service dependencies like RPC proxies, databases, etc.

223

224

```python

225

from nameko.timer import timer

226

from nameko.rpc import RpcProxy

227

from nameko.dependency_providers import Config

228

229

class DataSyncService:

230

name = "data_sync_service"

231

232

# Service dependencies available to timer methods

233

user_service = RpcProxy('user_service')

234

config = Config()

235

236

def __init__(self):

237

self.logger = logging.getLogger(__name__)

238

self.last_sync_time = None

239

240

@timer(interval=600) # Run every 10 minutes

241

def sync_user_data(self):

242

"""Synchronize user data between services"""

243

self.logger.info("Starting user data synchronization")

244

245

try:

246

# Get sync configuration

247

batch_size = self.config.get('SYNC_BATCH_SIZE', 100)

248

249

# Get users that need syncing from remote service

250

users_to_sync = self.user_service.get_users_for_sync(

251

since=self.last_sync_time,

252

limit=batch_size

253

)

254

255

# Process each user

256

synced_count = 0

257

for user in users_to_sync:

258

try:

259

self._sync_user_to_external_system(user)

260

synced_count += 1

261

except Exception as e:

262

self.logger.error(f"Failed to sync user {user['id']}: {e}")

263

264

self.last_sync_time = time.time()

265

self.logger.info(f"Synchronized {synced_count}/{len(users_to_sync)} users")

266

267

except Exception as e:

268

self.logger.error(f"User data sync failed: {e}")

269

270

@timer(interval=1800) # Run every 30 minutes

271

def cleanup_old_data(self):

272

"""Clean up old data across multiple services"""

273

self.logger.info("Starting cross-service cleanup")

274

275

try:

276

# Clean up user service data

277

cleanup_before = time.time() - (7 * 24 * 3600) # 7 days ago

278

279

result = self.user_service.cleanup_old_sessions(cleanup_before)

280

self.logger.info(f"Cleaned up {result['deleted_count']} old sessions")

281

282

# Could call other services for cleanup too

283

# notification_service.cleanup_old_notifications(cleanup_before)

284

# audit_service.archive_old_logs(cleanup_before)

285

286

except Exception as e:

287

self.logger.error(f"Cleanup task failed: {e}")

288

289

def _sync_user_to_external_system(self, user):

290

# Sync logic to external system

291

pass

292

```

293

294

### Advanced Scheduling Patterns

295

296

Complex scheduling patterns using timer with conditional logic.

297

298

```python

299

import calendar

300

from datetime import datetime, time as dt_time

301

302

class AdvancedSchedulerService:

303

name = "advanced_scheduler_service"

304

305

def __init__(self):

306

self.logger = logging.getLogger(__name__)

307

308

@timer(interval=60) # Check every minute

309

def business_hours_only_task(self):

310

"""Task that only runs during business hours"""

311

now = datetime.now()

312

313

# Only run Monday-Friday, 9 AM - 5 PM

314

if (now.weekday() < 5 and # Monday = 0, Friday = 4

315

dt_time(9, 0) <= now.time() <= dt_time(17, 0)):

316

317

self.logger.info("Running business hours task")

318

self._business_task()

319

else:

320

# Skip execution outside business hours

321

pass

322

323

@timer(interval=3600) # Check every hour

324

def end_of_day_task(self):

325

"""Task that runs at end of business day"""

326

now = datetime.now()

327

328

# Run at 6 PM on weekdays

329

if (now.weekday() < 5 and

330

now.hour == 18 and

331

now.minute < 5): # Run within first 5 minutes of 6 PM hour

332

333

self.logger.info("Running end-of-day task")

334

self._generate_daily_reports()

335

336

@timer(interval=86400) # Check once per day

337

def monthly_task(self):

338

"""Task that runs on the first day of each month"""

339

now = datetime.now()

340

341

if now.day == 1: # First day of month

342

self.logger.info("Running monthly task")

343

self._generate_monthly_report()

344

345

@timer(interval=3600) # Check every hour

346

def adaptive_frequency_task(self):

347

"""Task with adaptive frequency based on load"""

348

current_load = self._get_system_load()

349

350

# Skip during high load periods

351

if current_load > 0.8:

352

self.logger.info("Skipping task due to high system load")

353

return

354

355

# More frequent processing during low load

356

if current_load < 0.3:

357

self.logger.info("Running intensive task during low load")

358

self._intensive_processing()

359

else:

360

self.logger.info("Running standard task")

361

self._standard_processing()

362

363

def _business_task(self):

364

pass

365

366

def _generate_daily_reports(self):

367

pass

368

369

def _generate_monthly_report(self):

370

pass

371

372

def _get_system_load(self):

373

# Return system load metric (0.0 to 1.0)

374

return 0.5

375

376

def _intensive_processing(self):

377

pass

378

379

def _standard_processing(self):

380

pass

381

```

382

383

### Timer Coordination

384

385

Patterns for coordinating timer execution across multiple service instances.

386

387

```python

388

import uuid

389

from nameko.timer import timer

390

from nameko.dependency_providers import Config

391

392

class CoordinatedTimerService:

393

name = "coordinated_timer_service"

394

395

config = Config()

396

397

def __init__(self):

398

self.instance_id = str(uuid.uuid4())

399

self.logger = logging.getLogger(__name__)

400

401

@timer(interval=300) # Run every 5 minutes

402

def leader_election_task(self):

403

"""Task that uses leader election to run on only one instance"""

404

405

# Simple leader election using external coordination service

406

if self._acquire_leadership("cleanup_task", ttl=300):

407

self.logger.info(f"Instance {self.instance_id} acquired leadership")

408

409

try:

410

self._perform_cleanup_task()

411

finally:

412

self._release_leadership("cleanup_task")

413

else:

414

self.logger.debug("Another instance is handling cleanup task")

415

416

@timer(interval=60)

417

def distributed_work_task(self):

418

"""Task that distributes work across instances"""

419

420

# Get work items assigned to this instance

421

work_items = self._get_work_for_instance(self.instance_id)

422

423

for item in work_items:

424

try:

425

self._process_work_item(item)

426

self._mark_work_completed(item['id'])

427

except Exception as e:

428

self.logger.error(f"Failed to process work item {item['id']}: {e}")

429

self._mark_work_failed(item['id'])

430

431

def _acquire_leadership(self, task_name, ttl):

432

"""Acquire distributed lock for task leadership"""

433

# Implementation would use Redis, etcd, or database for coordination

434

return True # Simplified for example

435

436

def _release_leadership(self, task_name):

437

"""Release distributed lock"""

438

pass

439

440

def _get_work_for_instance(self, instance_id):

441

"""Get work items assigned to this service instance"""

442

# Implementation would use consistent hashing or work queue

443

return [] # Simplified for example

444

445

def _perform_cleanup_task(self):

446

pass

447

448

def _process_work_item(self, item):

449

pass

450

451

def _mark_work_completed(self, item_id):

452

pass

453

454

def _mark_work_failed(self, item_id):

455

pass

456

```

457

458

### Performance Considerations

459

460

Best practices for timer performance and resource management.

461

462

```python

463

class OptimizedTimerService:

464

name = "optimized_timer_service"

465

466

def __init__(self):

467

self.logger = logging.getLogger(__name__)

468

self.batch_buffer = []

469

self.last_batch_time = time.time()

470

471

@timer(interval=10) # Frequent collection

472

def collect_metrics_batch(self):

473

"""Collect metrics in batches for efficiency"""

474

475

# Collect current metrics

476

current_metrics = self._collect_current_metrics()

477

self.batch_buffer.extend(current_metrics)

478

479

# Process batch when it reaches size limit or time limit

480

if (len(self.batch_buffer) >= 100 or

481

time.time() - self.last_batch_time >= 60):

482

483

self._process_metrics_batch(self.batch_buffer)

484

self.batch_buffer = []

485

self.last_batch_time = time.time()

486

487

@timer(interval=1) # High frequency but lightweight

488

def lightweight_monitoring(self):

489

"""High-frequency monitoring with minimal overhead"""

490

491

# Only collect essential metrics to minimize impact

492

cpu_usage = self._get_cpu_usage() # Fast system call

493

494

# Only log if significant change

495

if abs(cpu_usage - getattr(self, '_last_cpu', 0)) > 0.1:

496

self.logger.debug(f"CPU usage: {cpu_usage:.1%}")

497

self._last_cpu = cpu_usage

498

499

def _collect_current_metrics(self):

500

# Return list of current metrics

501

return [{'timestamp': time.time(), 'value': 1.0}]

502

503

def _process_metrics_batch(self, metrics):

504

# Process batch of metrics efficiently

505

self.logger.info(f"Processed batch of {len(metrics)} metrics")

506

507

def _get_cpu_usage(self):

508

# Return current CPU usage (mock)

509

return 0.5

510

```