or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

events.mddocs/

0

# Events & Monitoring

1

2

Luigi's event system provides hooks for monitoring task execution, workflow progress, and integration with external monitoring systems. Events enable tracking, logging, and alerting capabilities.

3

4

## Capabilities

5

6

### Base Event Class

7

8

Foundation class for Luigi's event system that enables monitoring and tracking of task execution lifecycle.

9

10

```python { .api }

11

class Event:

12

"""Base class for Luigi events."""

13

14

def __init__(self):

15

"""Initialize event instance."""

16

17

@staticmethod

18

def trigger_event(event, task_obj=None, flushing=False):

19

"""

20

Trigger an event with optional task context.

21

22

Args:

23

event: Event instance to trigger

24

task_obj: Task object associated with event

25

flushing: Whether this is a flush event

26

"""

27

```

28

29

### Task Lifecycle Events

30

31

Events that are triggered during different phases of task execution.

32

33

```python { .api }

34

# Task execution events

35

class TaskEvent(Event):

36

"""Base class for task-related events."""

37

38

def __init__(self, task):

39

"""

40

Initialize task event.

41

42

Args:

43

task: Task instance associated with event

44

"""

45

46

class TaskStartEvent(TaskEvent):

47

"""Event triggered when task execution starts."""

48

49

class TaskSuccessEvent(TaskEvent):

50

"""Event triggered when task completes successfully."""

51

52

class TaskFailureEvent(TaskEvent):

53

"""Event triggered when task execution fails."""

54

55

def __init__(self, task, exception):

56

"""

57

Initialize task failure event.

58

59

Args:

60

task: Failed task instance

61

exception: Exception that caused failure

62

"""

63

64

class TaskProcessEvent(TaskEvent):

65

"""Event triggered during task processing."""

66

67

# Dependency events

68

class DependencyDiscovered(TaskEvent):

69

"""Event triggered when task dependency is discovered."""

70

71

def __init__(self, task, dependency, upstream):

72

"""

73

Initialize dependency discovered event.

74

75

Args:

76

task: Task with dependency

77

dependency: Discovered dependency task

78

upstream: Whether dependency is upstream

79

"""

80

81

class DependencyMissing(TaskEvent):

82

"""Event triggered when required dependency is missing."""

83

84

def __init__(self, task, missing_dependency):

85

"""

86

Initialize dependency missing event.

87

88

Args:

89

task: Task with missing dependency

90

missing_dependency: Missing dependency task

91

"""

92

```

93

94

### Execution Status Codes

95

96

Enumeration of possible execution outcomes and status codes for Luigi workflows.

97

98

```python { .api }

99

class LuigiStatusCode:

100

"""Status codes for Luigi execution results."""

101

102

SUCCESS = 0

103

"""All tasks completed successfully without any failures."""

104

105

SUCCESS_WITH_RETRY = 1

106

"""Tasks completed successfully but some required retries."""

107

108

FAILED = 2

109

"""One or more tasks failed during execution."""

110

111

FAILED_AND_SCHEDULING_FAILED = 3

112

"""Both task execution and scheduling encountered failures."""

113

114

SCHEDULING_FAILED = 4

115

"""Task scheduling failed before execution could begin."""

116

117

NOT_RUN = 5

118

"""Tasks were not executed (e.g., already complete)."""

119

120

MISSING_EXT = 6

121

"""Missing external dependencies prevented execution."""

122

123

@classmethod

124

def has_value(cls, value: int) -> bool:

125

"""

126

Check if value is a valid status code.

127

128

Args:

129

value: Status code value to check

130

131

Returns:

132

bool: True if valid status code

133

"""

134

```

135

136

### Execution Summary

137

138

Classes for generating and managing execution summaries with detailed statistics and results.

139

140

```python { .api }

141

class execution_summary:

142

"""Configuration for execution summary generation."""

143

144

summary_length: int = 100

145

"""Maximum length of execution summary."""

146

147

no_configure_logging: bool = False

148

"""Whether to disable logging configuration."""

149

150

def summary(tasks=None, worker_obj=None) -> dict:

151

"""

152

Generate execution summary for completed tasks.

153

154

Args:

155

tasks: List of tasks to summarize

156

worker_obj: Worker object with execution details

157

158

Returns:

159

dict: Execution summary with statistics and details

160

"""

161

162

class LuigiRunResult:

163

"""Container for Luigi execution results and status."""

164

165

def __init__(self, status: LuigiStatusCode, worker=None,

166

scheduling_succeeded: bool = True):

167

"""

168

Initialize run result.

169

170

Args:

171

status: Execution status code

172

worker: Worker instance that executed tasks

173

scheduling_succeeded: Whether scheduling was successful

174

"""

175

176

@property

177

def status(self) -> LuigiStatusCode:

178

"""Get execution status code."""

179

180

@property

181

def worker(self):

182

"""Get worker instance."""

183

184

@property

185

def scheduling_succeeded(self) -> bool:

186

"""Check if scheduling succeeded."""

187

```

188

189

## Usage Examples

190

191

### Basic Event Handling

192

193

```python

194

import luigi

195

from luigi.event import Event

196

197

class CustomEvent(Event):

198

"""Custom event for application-specific monitoring."""

199

200

def __init__(self, message: str, data: dict = None):

201

super().__init__()

202

self.message = message

203

self.data = data or {}

204

self.timestamp = time.time()

205

206

# Event handler functions

207

@CustomEvent.event_handler

208

def handle_custom_event(event):

209

"""Handle custom events."""

210

print(f"Custom event: {event.message}")

211

if event.data:

212

print(f"Event data: {event.data}")

213

214

class MonitoredTask(luigi.Task):

215

"""Task that emits custom events."""

216

217

def run(self):

218

# Emit start event

219

Event.trigger_event(CustomEvent("Task started", {"task_id": self.task_id}))

220

221

try:

222

# Task logic

223

with self.output().open('w') as f:

224

f.write("Task completed")

225

226

# Emit success event

227

Event.trigger_event(CustomEvent("Task completed successfully"))

228

229

except Exception as e:

230

# Emit failure event

231

Event.trigger_event(CustomEvent("Task failed", {"error": str(e)}))

232

raise

233

234

def output(self):

235

return luigi.LocalTarget("monitored_output.txt")

236

```

237

238

### Task Lifecycle Monitoring

239

240

```python

241

import luigi

242

from luigi.event import Event

243

import logging

244

import time

245

246

# Set up logging for events

247

logging.basicConfig(level=logging.INFO)

248

logger = logging.getLogger('luigi.events')

249

250

# Event handlers for task lifecycle

251

@luigi.event.Event.trigger_event

252

def log_task_start(event):

253

"""Log when tasks start execution."""

254

if hasattr(event, 'task'):

255

logger.info(f"STARTED: {event.task.task_id}")

256

257

@luigi.event.Event.trigger_event

258

def log_task_success(event):

259

"""Log when tasks complete successfully."""

260

if hasattr(event, 'task'):

261

logger.info(f"SUCCESS: {event.task.task_id}")

262

263

@luigi.event.Event.trigger_event

264

def log_task_failure(event):

265

"""Log when tasks fail."""

266

if hasattr(event, 'task') and hasattr(event, 'exception'):

267

logger.error(f"FAILED: {event.task.task_id} - {event.exception}")

268

269

class LifecycleTask(luigi.Task):

270

"""Task with comprehensive lifecycle monitoring."""

271

272

task_name = luigi.Parameter()

273

should_fail = luigi.BoolParameter(default=False)

274

275

def run(self):

276

logger.info(f"Executing {self.task_name}")

277

278

# Simulate work

279

time.sleep(1)

280

281

if self.should_fail:

282

raise Exception(f"Task {self.task_name} configured to fail")

283

284

with self.output().open('w') as f:

285

f.write(f"Completed: {self.task_name}")

286

287

def output(self):

288

return luigi.LocalTarget(f"output_{self.task_name}.txt")

289

290

# Run tasks with lifecycle monitoring

291

if __name__ == '__main__':

292

tasks = [

293

LifecycleTask(task_name="task1"),

294

LifecycleTask(task_name="task2"),

295

LifecycleTask(task_name="task3", should_fail=True)

296

]

297

298

result = luigi.build(tasks, local_scheduler=True)

299

print(f"Final status: {result.status}")

300

```

301

302

### Execution Summary Analysis

303

304

```python

305

import luigi

306

from luigi.execution_summary import summary, LuigiStatusCode

307

from luigi.event import Event

308

import json

309

310

class SummaryAnalysisTask(luigi.Task):

311

"""Task that analyzes execution summaries."""

312

313

def output(self):

314

return luigi.LocalTarget("execution_analysis.json")

315

316

def run(self):

317

# Create some test tasks

318

test_tasks = [

319

SimpleTask(name=f"task_{i}") for i in range(5)

320

]

321

322

# Execute tasks and collect results

323

result = luigi.build(test_tasks, local_scheduler=True)

324

325

# Generate execution summary

326

exec_summary = summary(tasks=test_tasks, worker_obj=result.worker)

327

328

# Analyze results

329

analysis = {

330

'execution_status': result.status,

331

'scheduling_succeeded': result.scheduling_succeeded,

332

'summary': exec_summary,

333

'task_analysis': {

334

'total_tasks': len(test_tasks),

335

'completed_tasks': sum(1 for task in test_tasks if task.complete()),

336

'failed_tasks': len(test_tasks) - sum(1 for task in test_tasks if task.complete())

337

},

338

'status_interpretation': self.interpret_status(result.status)

339

}

340

341

# Save analysis

342

with self.output().open('w') as f:

343

json.dump(analysis, f, indent=2, default=str)

344

345

def interpret_status(self, status: LuigiStatusCode) -> str:

346

"""Interpret status code into human-readable description."""

347

interpretations = {

348

LuigiStatusCode.SUCCESS: "All tasks completed successfully",

349

LuigiStatusCode.SUCCESS_WITH_RETRY: "Tasks completed after retries",

350

LuigiStatusCode.FAILED: "Some tasks failed",

351

LuigiStatusCode.SCHEDULING_FAILED: "Task scheduling failed",

352

LuigiStatusCode.NOT_RUN: "Tasks were not executed",

353

LuigiStatusCode.MISSING_EXT: "Missing external dependencies"

354

}

355

return interpretations.get(status, f"Unknown status: {status}")

356

357

class SimpleTask(luigi.Task):

358

name = luigi.Parameter()

359

360

def output(self):

361

return luigi.LocalTarget(f"simple_{self.name}.txt")

362

363

def run(self):

364

with self.output().open('w') as f:

365

f.write(f"Simple task: {self.name}")

366

```

367

368

### External Monitoring Integration

369

370

```python

371

import luigi

372

from luigi.event import Event

373

import requests

374

import json

375

from datetime import datetime

376

377

class MetricsCollector:

378

"""Collect and send metrics to external monitoring system."""

379

380

def __init__(self, metrics_endpoint: str):

381

self.endpoint = metrics_endpoint

382

self.metrics = []

383

384

def record_metric(self, metric_name: str, value: float, tags: dict = None):

385

"""Record a metric for later sending."""

386

metric = {

387

'name': metric_name,

388

'value': value,

389

'timestamp': datetime.utcnow().isoformat(),

390

'tags': tags or {}

391

}

392

self.metrics.append(metric)

393

394

def send_metrics(self):

395

"""Send collected metrics to monitoring system."""

396

if not self.metrics:

397

return

398

399

try:

400

response = requests.post(

401

self.endpoint,

402

json={'metrics': self.metrics},

403

timeout=10

404

)

405

response.raise_for_status()

406

print(f"Sent {len(self.metrics)} metrics")

407

self.metrics.clear()

408

409

except Exception as e:

410

print(f"Failed to send metrics: {e}")

411

412

# Global metrics collector

413

metrics = MetricsCollector("http://monitoring.example.com/api/metrics")

414

415

# Event handlers for metrics collection

416

@Event.event_handler

417

def collect_task_metrics(event):

418

"""Collect metrics from task events."""

419

420

if hasattr(event, 'task'):

421

task_id = event.task.task_id

422

task_family = event.task.task_family

423

424

if isinstance(event, luigi.event.TaskSuccessEvent):

425

metrics.record_metric(

426

'luigi.task.success',

427

1,

428

{'task_family': task_family, 'task_id': task_id}

429

)

430

431

elif isinstance(event, luigi.event.TaskFailureEvent):

432

metrics.record_metric(

433

'luigi.task.failure',

434

1,

435

{'task_family': task_family, 'task_id': task_id}

436

)

437

438

class MonitoredWorkflow(luigi.WrapperTask):

439

"""Workflow with external monitoring integration."""

440

441

def requires(self):

442

return [

443

ProcessDataTask(dataset="A"),

444

ProcessDataTask(dataset="B"),

445

ProcessDataTask(dataset="C")

446

]

447

448

def run(self):

449

# Send collected metrics after workflow completion

450

metrics.send_metrics()

451

452

class ProcessDataTask(luigi.Task):

453

dataset = luigi.Parameter()

454

455

def output(self):

456

return luigi.LocalTarget(f"processed_{self.dataset}.txt")

457

458

def run(self):

459

# Record custom business metrics

460

metrics.record_metric(

461

'luigi.dataset.processed',

462

1,

463

{'dataset': self.dataset}

464

)

465

466

# Simulate processing

467

import time

468

start_time = time.time()

469

470

with self.output().open('w') as f:

471

f.write(f"Processed dataset {self.dataset}")

472

473

# Record processing time

474

processing_time = time.time() - start_time

475

metrics.record_metric(

476

'luigi.processing.duration',

477

processing_time,

478

{'dataset': self.dataset}

479

)

480

```

481

482

### Health Check and Alerting

483

484

```python

485

import luigi

486

from luigi.event import Event

487

from luigi.execution_summary import LuigiStatusCode

488

import smtplib

489

from email.mime.text import MIMEText

490

import logging

491

492

class HealthMonitor:

493

"""Monitor Luigi workflow health and send alerts."""

494

495

def __init__(self, alert_email: str, smtp_config: dict):

496

self.alert_email = alert_email

497

self.smtp_config = smtp_config

498

self.failure_count = 0

499

self.max_failures = 3

500

501

def check_health(self, result: luigi.LuigiRunResult):

502

"""Check workflow health and send alerts if needed."""

503

504

if result.status == LuigiStatusCode.SUCCESS:

505

self.failure_count = 0

506

self.send_success_notification()

507

508

elif result.status in [LuigiStatusCode.FAILED,

509

LuigiStatusCode.SCHEDULING_FAILED]:

510

self.failure_count += 1

511

512

if self.failure_count >= self.max_failures:

513

self.send_alert(f"Workflow has failed {self.failure_count} times")

514

else:

515

self.send_warning(f"Workflow failed (attempt {self.failure_count})")

516

517

def send_alert(self, message: str):

518

"""Send critical alert email."""

519

self._send_email(

520

subject="🚨 CRITICAL: Luigi Workflow Alert",

521

body=f"CRITICAL ALERT: {message}\n\nImmediate attention required."

522

)

523

524

def send_warning(self, message: str):

525

"""Send warning email."""

526

self._send_email(

527

subject="⚠️ WARNING: Luigi Workflow Warning",

528

body=f"WARNING: {message}\n\nMonitoring situation."

529

)

530

531

def send_success_notification(self):

532

"""Send success notification if recovering from failures."""

533

if self.failure_count > 0:

534

self._send_email(

535

subject="✅ SUCCESS: Luigi Workflow Recovered",

536

body="Workflow has recovered and is running successfully."

537

)

538

539

def _send_email(self, subject: str, body: str):

540

"""Send email notification."""

541

try:

542

msg = MIMEText(body)

543

msg['Subject'] = subject

544

msg['From'] = self.smtp_config['from']

545

msg['To'] = self.alert_email

546

547

server = smtplib.SMTP(self.smtp_config['host'], self.smtp_config['port'])

548

server.starttls()

549

server.login(self.smtp_config['user'], self.smtp_config['password'])

550

server.send_message(msg)

551

server.quit()

552

553

logging.info(f"Alert sent: {subject}")

554

555

except Exception as e:

556

logging.error(f"Failed to send alert: {e}")

557

558

# Configure health monitor

559

health_monitor = HealthMonitor(

560

alert_email="admin@example.com",

561

smtp_config={

562

'host': 'smtp.example.com',

563

'port': 587,

564

'user': 'luigi@example.com',

565

'password': 'password',

566

'from': 'luigi@example.com'

567

}

568

)

569

570

class MonitoredPipeline(luigi.WrapperTask):

571

"""Pipeline with health monitoring and alerting."""

572

573

def requires(self):

574

return [

575

CriticalTask(task_id=f"task_{i}") for i in range(3)

576

]

577

578

class CriticalTask(luigi.Task):

579

task_id = luigi.Parameter()

580

581

def output(self):

582

return luigi.LocalTarget(f"critical_{self.task_id}.txt")

583

584

def run(self):

585

# Simulate occasional failures for testing

586

import random

587

if random.random() < 0.2: # 20% failure rate

588

raise Exception(f"Simulated failure in {self.task_id}")

589

590

with self.output().open('w') as f:

591

f.write(f"Critical task {self.task_id} completed")

592

593

# Execute with health monitoring

594

if __name__ == '__main__':

595

result = luigi.build([MonitoredPipeline()], local_scheduler=True)

596

health_monitor.check_health(result)

597

```