0
# Event System
1
2
Comprehensive event system for plugin development and test lifecycle monitoring.
3
4
## Capabilities
5
6
### Base Event Class
7
8
Foundation for all events in the vedro event system.
9
10
```python { .api }
11
class Event:
12
"""
13
Abstract base class for all vedro events.
14
15
Events are used throughout the vedro lifecycle to notify plugins
16
about various stages of test execution, allowing for customization
17
and extension of framework behavior.
18
"""
19
20
def __eq__(self, other) -> bool: ...
21
def __repr__(self) -> str: ...
22
```
23
24
### Configuration Events
25
26
Events related to configuration loading and argument parsing.
27
28
```python { .api }
29
class ConfigLoadedEvent:
30
"""
31
Fired when configuration is loaded from vedro.cfg.py.
32
33
Attributes:
34
config_path (Path): Path to the configuration file
35
config (ConfigType): The loaded configuration object
36
"""
37
config_path: Path
38
config: ConfigType
39
40
class ArgParseEvent:
41
"""
42
Fired when command-line arguments are being parsed.
43
44
Allows plugins to add custom command-line arguments.
45
46
Attributes:
47
arg_parser (ArgumentParser): The argument parser instance
48
"""
49
arg_parser: ArgumentParser
50
51
class ArgParsedEvent:
52
"""
53
Fired after command-line arguments have been parsed.
54
55
Allows plugins to access and react to parsed arguments.
56
57
Attributes:
58
args (Namespace): The parsed arguments namespace
59
"""
60
args: Namespace
61
```
62
63
### Lifecycle Events
64
65
Events that mark major phases in test execution lifecycle.
66
67
```python { .api }
68
class StartupEvent:
69
"""
70
Fired at the beginning of test execution.
71
72
Provides access to the scenario scheduler containing discovered scenarios.
73
74
Attributes:
75
scheduler (ScenarioScheduler): The scenario scheduler instance
76
"""
77
scheduler: ScenarioScheduler
78
79
class CleanupEvent:
80
"""
81
Fired at the end of test execution.
82
83
Provides access to the complete test execution report.
84
85
Attributes:
86
report (Report): Complete test execution report with results and statistics
87
"""
88
report: Report
89
```
90
91
### Scenario Events
92
93
Events tracking individual scenario execution states.
94
95
```python { .api }
96
class ScenarioRunEvent:
97
"""
98
Fired when a scenario starts running.
99
100
Attributes:
101
scenario_result (ScenarioResult): The scenario result object being populated
102
"""
103
scenario_result: ScenarioResult
104
105
class ScenarioPassedEvent:
106
"""
107
Fired when a scenario completes successfully.
108
109
Attributes:
110
scenario_result (ScenarioResult): The completed scenario result
111
"""
112
scenario_result: ScenarioResult
113
114
class ScenarioFailedEvent:
115
"""
116
Fired when a scenario fails during execution.
117
118
Attributes:
119
scenario_result (ScenarioResult): The failed scenario result with error information
120
"""
121
scenario_result: ScenarioResult
122
123
class ScenarioSkippedEvent:
124
"""
125
Fired when a scenario is skipped.
126
127
Attributes:
128
scenario_result (ScenarioResult): The skipped scenario result
129
"""
130
scenario_result: ScenarioResult
131
132
class ScenarioReportedEvent:
133
"""
134
Fired after a scenario is reported (post-processing phase).
135
136
Attributes:
137
aggregated_result (AggregatedResult): Extended scenario result with aggregated data
138
"""
139
aggregated_result: AggregatedResult
140
```
141
142
### Step Events
143
144
Events monitoring individual step execution within scenarios.
145
146
```python { .api }
147
class StepRunEvent:
148
"""
149
Fired when a step starts running.
150
151
Attributes:
152
step_result (StepResult): The step result object being populated
153
"""
154
step_result: StepResult
155
156
class StepPassedEvent:
157
"""
158
Fired when a step completes successfully.
159
160
Attributes:
161
step_result (StepResult): The completed step result
162
"""
163
step_result: StepResult
164
165
class StepFailedEvent:
166
"""
167
Fired when a step fails during execution.
168
169
Attributes:
170
step_result (StepResult): The failed step result with error information
171
"""
172
step_result: StepResult
173
```
174
175
### Exception Events
176
177
Events for handling exceptions raised during test execution.
178
179
```python { .api }
180
class ExceptionRaisedEvent:
181
"""
182
Fired when any exception is raised during test execution.
183
184
Provides global exception handling and logging capabilities.
185
186
Attributes:
187
exc_info (ExcInfo): Exception information including type, value, and traceback
188
"""
189
exc_info: ExcInfo
190
```
191
192
## Usage Examples
193
194
### Basic Event Handling
195
196
```python
197
from vedro.core import Plugin, PluginConfig
198
from vedro.events import ScenarioPassedEvent, ScenarioFailedEvent
199
import time
200
201
class TestTimerPlugin(Plugin):
202
"""Plugin that measures and reports test execution times."""
203
204
def __init__(self, config: "TestTimerConfig"):
205
super().__init__(config)
206
self._start_times = {}
207
208
def subscribe(self, dispatcher):
209
dispatcher.listen(ScenarioRunEvent, self.on_scenario_start)
210
dispatcher.listen(ScenarioPassedEvent, self.on_scenario_end)
211
dispatcher.listen(ScenarioFailedEvent, self.on_scenario_end)
212
213
def on_scenario_start(self, event: ScenarioRunEvent):
214
scenario_id = event.scenario_result.scenario.unique_id
215
self._start_times[scenario_id] = time.time()
216
217
def on_scenario_end(self, event):
218
scenario_id = event.scenario_result.scenario.unique_id
219
if scenario_id in self._start_times:
220
duration = time.time() - self._start_times[scenario_id]
221
print(f"Scenario {scenario_id} took {duration:.2f} seconds")
222
del self._start_times[scenario_id]
223
224
class TestTimerConfig(PluginConfig):
225
plugin = TestTimerPlugin
226
enabled = True
227
```
228
229
### Advanced Event Processing
230
231
```python
232
from vedro.events import *
233
import json
234
import logging
235
236
class ComprehensiveLoggingPlugin(Plugin):
237
"""Plugin that provides comprehensive logging of all test events."""
238
239
def __init__(self, config: "ComprehensiveLoggingConfig"):
240
super().__init__(config)
241
self.logger = logging.getLogger("vedro.comprehensive")
242
self._test_session = {
243
"start_time": None,
244
"scenarios": {},
245
"global_artifacts": []
246
}
247
248
def subscribe(self, dispatcher):
249
# Configuration events
250
dispatcher.listen(ConfigLoadedEvent, self.on_config_loaded)
251
dispatcher.listen(ArgParsedEvent, self.on_args_parsed)
252
253
# Lifecycle events
254
dispatcher.listen(StartupEvent, self.on_startup)
255
dispatcher.listen(CleanupEvent, self.on_cleanup)
256
257
# Scenario events
258
dispatcher.listen(ScenarioRunEvent, self.on_scenario_run)
259
dispatcher.listen(ScenarioPassedEvent, self.on_scenario_passed)
260
dispatcher.listen(ScenarioFailedEvent, self.on_scenario_failed)
261
dispatcher.listen(ScenarioSkippedEvent, self.on_scenario_skipped)
262
263
# Step events
264
dispatcher.listen(StepRunEvent, self.on_step_run)
265
dispatcher.listen(StepPassedEvent, self.on_step_passed)
266
dispatcher.listen(StepFailedEvent, self.on_step_failed)
267
268
# Exception events
269
dispatcher.listen(ExceptionRaisedEvent, self.on_exception_raised)
270
271
def on_config_loaded(self, event: ConfigLoadedEvent):
272
self.logger.info(f"Configuration loaded from {event.config_path}")
273
274
def on_args_parsed(self, event: ArgParsedEvent):
275
self.logger.info(f"Command line arguments: {vars(event.args)}")
276
277
def on_startup(self, event: StartupEvent):
278
self._test_session["start_time"] = time.time()
279
scenario_count = len(event.scheduler.discovered)
280
self.logger.info(f"Test session starting with {scenario_count} scenarios")
281
282
def on_scenario_run(self, event: ScenarioRunEvent):
283
scenario_id = event.scenario_result.scenario.unique_id
284
self._test_session["scenarios"][scenario_id] = {
285
"status": "running",
286
"start_time": time.time(),
287
"steps": []
288
}
289
self.logger.info(f"Scenario started: {event.scenario_result.scenario.subject}")
290
291
def on_step_run(self, event: StepRunEvent):
292
scenario_id = event.step_result.scenario_result.scenario.unique_id
293
step_info = {
294
"name": event.step_result.step_name,
295
"status": "running",
296
"start_time": time.time()
297
}
298
299
if scenario_id in self._test_session["scenarios"]:
300
self._test_session["scenarios"][scenario_id]["steps"].append(step_info)
301
302
def on_step_passed(self, event: StepPassedEvent):
303
self._update_step_status(event.step_result, "passed")
304
305
def on_step_failed(self, event: StepFailedEvent):
306
self._update_step_status(event.step_result, "failed", event.step_result.exc_info)
307
308
def on_scenario_passed(self, event: ScenarioPassedEvent):
309
self._update_scenario_status(event.scenario_result, "passed")
310
311
def on_scenario_failed(self, event: ScenarioFailedEvent):
312
self._update_scenario_status(event.scenario_result, "failed")
313
314
def on_scenario_skipped(self, event: ScenarioSkippedEvent):
315
self._update_scenario_status(event.scenario_result, "skipped")
316
317
def on_exception_raised(self, event: ExceptionRaisedEvent):
318
self.logger.error(f"Exception raised: {event.exc_info.type.__name__}: {event.exc_info.value}")
319
320
def on_cleanup(self, event: CleanupEvent):
321
session_duration = time.time() - self._test_session["start_time"]
322
323
summary = {
324
"total_duration": session_duration,
325
"total_scenarios": len(self._test_session["scenarios"]),
326
"passed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "passed"]),
327
"failed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "failed"]),
328
"skipped": len([s for s in self._test_session["scenarios"].values() if s["status"] == "skipped"])
329
}
330
331
self.logger.info(f"Test session completed: {json.dumps(summary, indent=2)}")
332
333
def _update_step_status(self, step_result, status, exc_info=None):
334
scenario_id = step_result.scenario_result.scenario.unique_id
335
if scenario_id in self._test_session["scenarios"]:
336
for step in self._test_session["scenarios"][scenario_id]["steps"]:
337
if step["name"] == step_result.step_name and step["status"] == "running":
338
step["status"] = status
339
step["end_time"] = time.time()
340
step["duration"] = step["end_time"] - step["start_time"]
341
if exc_info:
342
step["error"] = str(exc_info.value)
343
break
344
345
def _update_scenario_status(self, scenario_result, status):
346
scenario_id = scenario_result.scenario.unique_id
347
if scenario_id in self._test_session["scenarios"]:
348
scenario_data = self._test_session["scenarios"][scenario_id]
349
scenario_data["status"] = status
350
scenario_data["end_time"] = time.time()
351
scenario_data["duration"] = scenario_data["end_time"] - scenario_data["start_time"]
352
353
class ComprehensiveLoggingConfig(PluginConfig):
354
plugin = ComprehensiveLoggingPlugin
355
enabled = False # Enable when needed for debugging
356
```
357
358
### Event Filtering and Conditional Processing
359
360
```python
361
class ConditionalEventPlugin(Plugin):
362
"""Plugin demonstrating conditional event processing."""
363
364
def __init__(self, config: "ConditionalEventConfig"):
365
super().__init__(config)
366
self.config = config
367
368
def subscribe(self, dispatcher):
369
# Only listen to events if certain conditions are met
370
if self.config.monitor_slow_tests:
371
dispatcher.listen(ScenarioPassedEvent, self.check_slow_scenario)
372
dispatcher.listen(ScenarioFailedEvent, self.check_slow_scenario)
373
374
if self.config.log_exceptions:
375
dispatcher.listen(ExceptionRaisedEvent, self.log_exception)
376
377
if self.config.track_artifacts:
378
dispatcher.listen(ScenarioReportedEvent, self.analyze_artifacts)
379
380
def check_slow_scenario(self, event):
381
"""Alert on scenarios that take too long."""
382
duration = event.scenario_result.elapsed
383
if duration > self.config.slow_threshold:
384
print(f"SLOW TEST ALERT: {event.scenario_result.scenario.subject} took {duration:.2f}s")
385
386
def log_exception(self, event: ExceptionRaisedEvent):
387
"""Log exceptions with context."""
388
exc_info = event.exc_info
389
print(f"Exception in test: {exc_info.type.__name__}: {exc_info.value}")
390
391
# Only log full traceback for certain exception types
392
if exc_info.type in (AssertionError, ValueError, TypeError):
393
import traceback
394
traceback.print_exception(exc_info.type, exc_info.value, exc_info.traceback)
395
396
def analyze_artifacts(self, event: ScenarioReportedEvent):
397
"""Analyze artifacts attached to scenarios."""
398
artifacts = event.aggregated_result.artifacts
399
if len(artifacts) > self.config.max_artifacts:
400
print(f"WARNING: Scenario has {len(artifacts)} artifacts (max: {self.config.max_artifacts})")
401
402
class ConditionalEventConfig(PluginConfig):
403
plugin = ConditionalEventPlugin
404
enabled = True
405
406
monitor_slow_tests: bool = True
407
slow_threshold: float = 5.0 # seconds
408
409
log_exceptions: bool = True
410
411
track_artifacts: bool = True
412
max_artifacts: int = 10
413
```
414
415
## Types and Data Structures
416
417
### Event Data Types
418
419
Key data structures used in events:
420
421
```python { .api }
422
from pathlib import Path
423
from argparse import ArgumentParser, Namespace
424
from typing import Any
425
426
# Configuration types
427
ConfigType = Any # Actual config class type
428
Path = Path # pathlib.Path
429
430
# Argument parsing types
431
ArgumentParser = ArgumentParser
432
Namespace = Namespace
433
434
# Core result types (referenced from other modules)
435
ScenarioResult = Any # vedro.core.ScenarioResult
436
AggregatedResult = Any # vedro.core.AggregatedResult
437
StepResult = Any # vedro.core.StepResult
438
ExcInfo = Any # vedro.core.ExcInfo
439
Report = Any # vedro.core.Report
440
ScenarioScheduler = Any # vedro.core.ScenarioScheduler
441
```
442
443
## Advanced Patterns
444
445
### Event Aggregation
446
447
Collect and analyze events across multiple scenarios:
448
449
```python
450
class EventAggregatorPlugin(Plugin):
451
"""Plugin that aggregates events for analysis."""
452
453
def __init__(self, config):
454
super().__init__(config)
455
self.event_counts = defaultdict(int)
456
self.scenario_timings = []
457
self.step_timings = defaultdict(list)
458
459
def subscribe(self, dispatcher):
460
# Count all event types
461
for event_type in [ScenarioRunEvent, ScenarioPassedEvent, ScenarioFailedEvent,
462
StepRunEvent, StepPassedEvent, StepFailedEvent]:
463
dispatcher.listen(event_type, lambda event, et=event_type: self._count_event(et))
464
465
dispatcher.listen(ScenarioPassedEvent, self.collect_scenario_timing)
466
dispatcher.listen(ScenarioFailedEvent, self.collect_scenario_timing)
467
dispatcher.listen(StepPassedEvent, self.collect_step_timing)
468
dispatcher.listen(CleanupEvent, self.report_aggregated_data)
469
470
def _count_event(self, event_type):
471
self.event_counts[event_type.__name__] += 1
472
473
def collect_scenario_timing(self, event):
474
self.scenario_timings.append(event.scenario_result.elapsed)
475
476
def collect_step_timing(self, event):
477
step_name = event.step_result.step_name
478
self.step_timings[step_name].append(event.step_result.elapsed)
479
480
def report_aggregated_data(self, event: CleanupEvent):
481
print("\n=== Event Analysis ===")
482
print("Event counts:")
483
for event_type, count in self.event_counts.items():
484
print(f" {event_type}: {count}")
485
486
if self.scenario_timings:
487
import statistics
488
print(f"\nScenario timing statistics:")
489
print(f" Average: {statistics.mean(self.scenario_timings):.2f}s")
490
print(f" Median: {statistics.median(self.scenario_timings):.2f}s")
491
print(f" Max: {max(self.scenario_timings):.2f}s")
492
print(f" Min: {min(self.scenario_timings):.2f}s")
493
```
494
495
### Event Chain Analysis
496
497
Track sequences of events for workflow analysis:
498
499
```python
500
class EventChainAnalyzer(Plugin):
501
"""Analyze event sequences for workflow insights."""
502
503
def __init__(self, config):
504
super().__init__(config)
505
self.event_chains = defaultdict(list)
506
self.current_scenario = None
507
508
def subscribe(self, dispatcher):
509
# Track all major events in order
510
events_to_track = [
511
ScenarioRunEvent, StepRunEvent, StepPassedEvent, StepFailedEvent,
512
ScenarioPassedEvent, ScenarioFailedEvent, ScenarioSkippedEvent
513
]
514
515
for event_type in events_to_track:
516
dispatcher.listen(event_type, lambda event, et=event_type: self._track_event(et, event))
517
518
def _track_event(self, event_type, event):
519
if hasattr(event, 'scenario_result'):
520
scenario_id = event.scenario_result.scenario.unique_id
521
elif hasattr(event, 'step_result'):
522
scenario_id = event.step_result.scenario_result.scenario.unique_id
523
else:
524
return
525
526
self.event_chains[scenario_id].append({
527
"event_type": event_type.__name__,
528
"timestamp": time.time(),
529
"event": event
530
})
531
532
def analyze_chains(self):
533
"""Analyze common event patterns."""
534
patterns = defaultdict(int)
535
536
for scenario_id, chain in self.event_chains.items():
537
# Extract event type sequence
538
sequence = [event["event_type"] for event in chain]
539
pattern = " -> ".join(sequence)
540
patterns[pattern] += 1
541
542
print("\n=== Event Chain Patterns ===")
543
for pattern, count in sorted(patterns.items(), key=lambda x: x[1], reverse=True):
544
print(f"{count}x: {pattern}")
545
```