or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifacts-files.mdassertions.mdcli.mdconfiguration.mdcontext-cleanup.mdevents.mdexecution-control.mdindex.mdparameterization.mdtest-definition.md

events.mddocs/

0

# Event System

1

2

Comprehensive event system for plugin development and test lifecycle monitoring.

3

4

## Capabilities

5

6

### Base Event Class

7

8

Foundation for all events in the vedro event system.

9

10

```python { .api }

11

class Event:

12

"""

13

Abstract base class for all vedro events.

14

15

Events are used throughout the vedro lifecycle to notify plugins

16

about various stages of test execution, allowing for customization

17

and extension of framework behavior.

18

"""

19

20

def __eq__(self, other) -> bool: ...

21

def __repr__(self) -> str: ...

22

```

23

24

### Configuration Events

25

26

Events related to configuration loading and argument parsing.

27

28

```python { .api }

29

class ConfigLoadedEvent:

30

"""

31

Fired when configuration is loaded from vedro.cfg.py.

32

33

Attributes:

34

config_path (Path): Path to the configuration file

35

config (ConfigType): The loaded configuration object

36

"""

37

config_path: Path

38

config: ConfigType

39

40

class ArgParseEvent:

41

"""

42

Fired when command-line arguments are being parsed.

43

44

Allows plugins to add custom command-line arguments.

45

46

Attributes:

47

arg_parser (ArgumentParser): The argument parser instance

48

"""

49

arg_parser: ArgumentParser

50

51

class ArgParsedEvent:

52

"""

53

Fired after command-line arguments have been parsed.

54

55

Allows plugins to access and react to parsed arguments.

56

57

Attributes:

58

args (Namespace): The parsed arguments namespace

59

"""

60

args: Namespace

61

```

62

63

### Lifecycle Events

64

65

Events that mark major phases in test execution lifecycle.

66

67

```python { .api }

68

class StartupEvent:

69

"""

70

Fired at the beginning of test execution.

71

72

Provides access to the scenario scheduler containing discovered scenarios.

73

74

Attributes:

75

scheduler (ScenarioScheduler): The scenario scheduler instance

76

"""

77

scheduler: ScenarioScheduler

78

79

class CleanupEvent:

80

"""

81

Fired at the end of test execution.

82

83

Provides access to the complete test execution report.

84

85

Attributes:

86

report (Report): Complete test execution report with results and statistics

87

"""

88

report: Report

89

```

90

91

### Scenario Events

92

93

Events tracking individual scenario execution states.

94

95

```python { .api }

96

class ScenarioRunEvent:

97

"""

98

Fired when a scenario starts running.

99

100

Attributes:

101

scenario_result (ScenarioResult): The scenario result object being populated

102

"""

103

scenario_result: ScenarioResult

104

105

class ScenarioPassedEvent:

106

"""

107

Fired when a scenario completes successfully.

108

109

Attributes:

110

scenario_result (ScenarioResult): The completed scenario result

111

"""

112

scenario_result: ScenarioResult

113

114

class ScenarioFailedEvent:

115

"""

116

Fired when a scenario fails during execution.

117

118

Attributes:

119

scenario_result (ScenarioResult): The failed scenario result with error information

120

"""

121

scenario_result: ScenarioResult

122

123

class ScenarioSkippedEvent:

124

"""

125

Fired when a scenario is skipped.

126

127

Attributes:

128

scenario_result (ScenarioResult): The skipped scenario result

129

"""

130

scenario_result: ScenarioResult

131

132

class ScenarioReportedEvent:

133

"""

134

Fired after a scenario is reported (post-processing phase).

135

136

Attributes:

137

aggregated_result (AggregatedResult): Extended scenario result with aggregated data

138

"""

139

aggregated_result: AggregatedResult

140

```

141

142

### Step Events

143

144

Events monitoring individual step execution within scenarios.

145

146

```python { .api }

147

class StepRunEvent:

148

"""

149

Fired when a step starts running.

150

151

Attributes:

152

step_result (StepResult): The step result object being populated

153

"""

154

step_result: StepResult

155

156

class StepPassedEvent:

157

"""

158

Fired when a step completes successfully.

159

160

Attributes:

161

step_result (StepResult): The completed step result

162

"""

163

step_result: StepResult

164

165

class StepFailedEvent:

166

"""

167

Fired when a step fails during execution.

168

169

Attributes:

170

step_result (StepResult): The failed step result with error information

171

"""

172

step_result: StepResult

173

```

174

175

### Exception Events

176

177

Events for handling exceptions raised during test execution.

178

179

```python { .api }

180

class ExceptionRaisedEvent:

181

"""

182

Fired when any exception is raised during test execution.

183

184

Provides global exception handling and logging capabilities.

185

186

Attributes:

187

exc_info (ExcInfo): Exception information including type, value, and traceback

188

"""

189

exc_info: ExcInfo

190

```

191

192

## Usage Examples

193

194

### Basic Event Handling

195

196

```python

197

from vedro.core import Plugin, PluginConfig

198

from vedro.events import ScenarioPassedEvent, ScenarioFailedEvent

199

import time

200

201

class TestTimerPlugin(Plugin):

202

"""Plugin that measures and reports test execution times."""

203

204

def __init__(self, config: "TestTimerConfig"):

205

super().__init__(config)

206

self._start_times = {}

207

208

def subscribe(self, dispatcher):

209

dispatcher.listen(ScenarioRunEvent, self.on_scenario_start)

210

dispatcher.listen(ScenarioPassedEvent, self.on_scenario_end)

211

dispatcher.listen(ScenarioFailedEvent, self.on_scenario_end)

212

213

def on_scenario_start(self, event: ScenarioRunEvent):

214

scenario_id = event.scenario_result.scenario.unique_id

215

self._start_times[scenario_id] = time.time()

216

217

def on_scenario_end(self, event):

218

scenario_id = event.scenario_result.scenario.unique_id

219

if scenario_id in self._start_times:

220

duration = time.time() - self._start_times[scenario_id]

221

print(f"Scenario {scenario_id} took {duration:.2f} seconds")

222

del self._start_times[scenario_id]

223

224

class TestTimerConfig(PluginConfig):

225

plugin = TestTimerPlugin

226

enabled = True

227

```

228

229

### Advanced Event Processing

230

231

```python

232

from vedro.events import *

233

import json

234

import logging

235

236

class ComprehensiveLoggingPlugin(Plugin):

237

"""Plugin that provides comprehensive logging of all test events."""

238

239

def __init__(self, config: "ComprehensiveLoggingConfig"):

240

super().__init__(config)

241

self.logger = logging.getLogger("vedro.comprehensive")

242

self._test_session = {

243

"start_time": None,

244

"scenarios": {},

245

"global_artifacts": []

246

}

247

248

def subscribe(self, dispatcher):

249

# Configuration events

250

dispatcher.listen(ConfigLoadedEvent, self.on_config_loaded)

251

dispatcher.listen(ArgParsedEvent, self.on_args_parsed)

252

253

# Lifecycle events

254

dispatcher.listen(StartupEvent, self.on_startup)

255

dispatcher.listen(CleanupEvent, self.on_cleanup)

256

257

# Scenario events

258

dispatcher.listen(ScenarioRunEvent, self.on_scenario_run)

259

dispatcher.listen(ScenarioPassedEvent, self.on_scenario_passed)

260

dispatcher.listen(ScenarioFailedEvent, self.on_scenario_failed)

261

dispatcher.listen(ScenarioSkippedEvent, self.on_scenario_skipped)

262

263

# Step events

264

dispatcher.listen(StepRunEvent, self.on_step_run)

265

dispatcher.listen(StepPassedEvent, self.on_step_passed)

266

dispatcher.listen(StepFailedEvent, self.on_step_failed)

267

268

# Exception events

269

dispatcher.listen(ExceptionRaisedEvent, self.on_exception_raised)

270

271

def on_config_loaded(self, event: ConfigLoadedEvent):

272

self.logger.info(f"Configuration loaded from {event.config_path}")

273

274

def on_args_parsed(self, event: ArgParsedEvent):

275

self.logger.info(f"Command line arguments: {vars(event.args)}")

276

277

def on_startup(self, event: StartupEvent):

278

self._test_session["start_time"] = time.time()

279

scenario_count = len(event.scheduler.discovered)

280

self.logger.info(f"Test session starting with {scenario_count} scenarios")

281

282

def on_scenario_run(self, event: ScenarioRunEvent):

283

scenario_id = event.scenario_result.scenario.unique_id

284

self._test_session["scenarios"][scenario_id] = {

285

"status": "running",

286

"start_time": time.time(),

287

"steps": []

288

}

289

self.logger.info(f"Scenario started: {event.scenario_result.scenario.subject}")

290

291

def on_step_run(self, event: StepRunEvent):

292

scenario_id = event.step_result.scenario_result.scenario.unique_id

293

step_info = {

294

"name": event.step_result.step_name,

295

"status": "running",

296

"start_time": time.time()

297

}

298

299

if scenario_id in self._test_session["scenarios"]:

300

self._test_session["scenarios"][scenario_id]["steps"].append(step_info)

301

302

def on_step_passed(self, event: StepPassedEvent):

303

self._update_step_status(event.step_result, "passed")

304

305

def on_step_failed(self, event: StepFailedEvent):

306

self._update_step_status(event.step_result, "failed", event.step_result.exc_info)

307

308

def on_scenario_passed(self, event: ScenarioPassedEvent):

309

self._update_scenario_status(event.scenario_result, "passed")

310

311

def on_scenario_failed(self, event: ScenarioFailedEvent):

312

self._update_scenario_status(event.scenario_result, "failed")

313

314

def on_scenario_skipped(self, event: ScenarioSkippedEvent):

315

self._update_scenario_status(event.scenario_result, "skipped")

316

317

def on_exception_raised(self, event: ExceptionRaisedEvent):

318

self.logger.error(f"Exception raised: {event.exc_info.type.__name__}: {event.exc_info.value}")

319

320

def on_cleanup(self, event: CleanupEvent):

321

session_duration = time.time() - self._test_session["start_time"]

322

323

summary = {

324

"total_duration": session_duration,

325

"total_scenarios": len(self._test_session["scenarios"]),

326

"passed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "passed"]),

327

"failed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "failed"]),

328

"skipped": len([s for s in self._test_session["scenarios"].values() if s["status"] == "skipped"])

329

}

330

331

self.logger.info(f"Test session completed: {json.dumps(summary, indent=2)}")

332

333

def _update_step_status(self, step_result, status, exc_info=None):

334

scenario_id = step_result.scenario_result.scenario.unique_id

335

if scenario_id in self._test_session["scenarios"]:

336

for step in self._test_session["scenarios"][scenario_id]["steps"]:

337

if step["name"] == step_result.step_name and step["status"] == "running":

338

step["status"] = status

339

step["end_time"] = time.time()

340

step["duration"] = step["end_time"] - step["start_time"]

341

if exc_info:

342

step["error"] = str(exc_info.value)

343

break

344

345

def _update_scenario_status(self, scenario_result, status):

346

scenario_id = scenario_result.scenario.unique_id

347

if scenario_id in self._test_session["scenarios"]:

348

scenario_data = self._test_session["scenarios"][scenario_id]

349

scenario_data["status"] = status

350

scenario_data["end_time"] = time.time()

351

scenario_data["duration"] = scenario_data["end_time"] - scenario_data["start_time"]

352

353

class ComprehensiveLoggingConfig(PluginConfig):

354

plugin = ComprehensiveLoggingPlugin

355

enabled = False # Enable when needed for debugging

356

```

357

358

### Event Filtering and Conditional Processing

359

360

```python

361

class ConditionalEventPlugin(Plugin):

362

"""Plugin demonstrating conditional event processing."""

363

364

def __init__(self, config: "ConditionalEventConfig"):

365

super().__init__(config)

366

self.config = config

367

368

def subscribe(self, dispatcher):

369

# Only listen to events if certain conditions are met

370

if self.config.monitor_slow_tests:

371

dispatcher.listen(ScenarioPassedEvent, self.check_slow_scenario)

372

dispatcher.listen(ScenarioFailedEvent, self.check_slow_scenario)

373

374

if self.config.log_exceptions:

375

dispatcher.listen(ExceptionRaisedEvent, self.log_exception)

376

377

if self.config.track_artifacts:

378

dispatcher.listen(ScenarioReportedEvent, self.analyze_artifacts)

379

380

def check_slow_scenario(self, event):

381

"""Alert on scenarios that take too long."""

382

duration = event.scenario_result.elapsed

383

if duration > self.config.slow_threshold:

384

print(f"SLOW TEST ALERT: {event.scenario_result.scenario.subject} took {duration:.2f}s")

385

386

def log_exception(self, event: ExceptionRaisedEvent):

387

"""Log exceptions with context."""

388

exc_info = event.exc_info

389

print(f"Exception in test: {exc_info.type.__name__}: {exc_info.value}")

390

391

# Only log full traceback for certain exception types

392

if exc_info.type in (AssertionError, ValueError, TypeError):

393

import traceback

394

traceback.print_exception(exc_info.type, exc_info.value, exc_info.traceback)

395

396

def analyze_artifacts(self, event: ScenarioReportedEvent):

397

"""Analyze artifacts attached to scenarios."""

398

artifacts = event.aggregated_result.artifacts

399

if len(artifacts) > self.config.max_artifacts:

400

print(f"WARNING: Scenario has {len(artifacts)} artifacts (max: {self.config.max_artifacts})")

401

402

class ConditionalEventConfig(PluginConfig):

403

plugin = ConditionalEventPlugin

404

enabled = True

405

406

monitor_slow_tests: bool = True

407

slow_threshold: float = 5.0 # seconds

408

409

log_exceptions: bool = True

410

411

track_artifacts: bool = True

412

max_artifacts: int = 10

413

```

414

415

## Types and Data Structures

416

417

### Event Data Types

418

419

Key data structures used in events:

420

421

```python { .api }

422

from pathlib import Path

423

from argparse import ArgumentParser, Namespace

424

from typing import Any

425

426

# Configuration types

427

ConfigType = Any # Actual config class type

428

Path = Path # pathlib.Path

429

430

# Argument parsing types

431

ArgumentParser = ArgumentParser

432

Namespace = Namespace

433

434

# Core result types (referenced from other modules)

435

ScenarioResult = Any # vedro.core.ScenarioResult

436

AggregatedResult = Any # vedro.core.AggregatedResult

437

StepResult = Any # vedro.core.StepResult

438

ExcInfo = Any # vedro.core.ExcInfo

439

Report = Any # vedro.core.Report

440

ScenarioScheduler = Any # vedro.core.ScenarioScheduler

441

```

442

443

## Advanced Patterns

444

445

### Event Aggregation

446

447

Collect and analyze events across multiple scenarios:

448

449

```python

450

class EventAggregatorPlugin(Plugin):

451

"""Plugin that aggregates events for analysis."""

452

453

def __init__(self, config):

454

super().__init__(config)

455

self.event_counts = defaultdict(int)

456

self.scenario_timings = []

457

self.step_timings = defaultdict(list)

458

459

def subscribe(self, dispatcher):

460

# Count all event types

461

for event_type in [ScenarioRunEvent, ScenarioPassedEvent, ScenarioFailedEvent,

462

StepRunEvent, StepPassedEvent, StepFailedEvent]:

463

dispatcher.listen(event_type, lambda event, et=event_type: self._count_event(et))

464

465

dispatcher.listen(ScenarioPassedEvent, self.collect_scenario_timing)

466

dispatcher.listen(ScenarioFailedEvent, self.collect_scenario_timing)

467

dispatcher.listen(StepPassedEvent, self.collect_step_timing)

468

dispatcher.listen(CleanupEvent, self.report_aggregated_data)

469

470

def _count_event(self, event_type):

471

self.event_counts[event_type.__name__] += 1

472

473

def collect_scenario_timing(self, event):

474

self.scenario_timings.append(event.scenario_result.elapsed)

475

476

def collect_step_timing(self, event):

477

step_name = event.step_result.step_name

478

self.step_timings[step_name].append(event.step_result.elapsed)

479

480

def report_aggregated_data(self, event: CleanupEvent):

481

print("\n=== Event Analysis ===")

482

print("Event counts:")

483

for event_type, count in self.event_counts.items():

484

print(f" {event_type}: {count}")

485

486

if self.scenario_timings:

487

import statistics

488

print(f"\nScenario timing statistics:")

489

print(f" Average: {statistics.mean(self.scenario_timings):.2f}s")

490

print(f" Median: {statistics.median(self.scenario_timings):.2f}s")

491

print(f" Max: {max(self.scenario_timings):.2f}s")

492

print(f" Min: {min(self.scenario_timings):.2f}s")

493

```

494

495

### Event Chain Analysis

496

497

Track sequences of events for workflow analysis:

498

499

```python

500

class EventChainAnalyzer(Plugin):

501

"""Analyze event sequences for workflow insights."""

502

503

def __init__(self, config):

504

super().__init__(config)

505

self.event_chains = defaultdict(list)

506

self.current_scenario = None

507

508

def subscribe(self, dispatcher):

509

# Track all major events in order

510

events_to_track = [

511

ScenarioRunEvent, StepRunEvent, StepPassedEvent, StepFailedEvent,

512

ScenarioPassedEvent, ScenarioFailedEvent, ScenarioSkippedEvent

513

]

514

515

for event_type in events_to_track:

516

dispatcher.listen(event_type, lambda event, et=event_type: self._track_event(et, event))

517

518

def _track_event(self, event_type, event):

519

if hasattr(event, 'scenario_result'):

520

scenario_id = event.scenario_result.scenario.unique_id

521

elif hasattr(event, 'step_result'):

522

scenario_id = event.step_result.scenario_result.scenario.unique_id

523

else:

524

return

525

526

self.event_chains[scenario_id].append({

527

"event_type": event_type.__name__,

528

"timestamp": time.time(),

529

"event": event

530

})

531

532

def analyze_chains(self):

533

"""Analyze common event patterns."""

534

patterns = defaultdict(int)

535

536

for scenario_id, chain in self.event_chains.items():

537

# Extract event type sequence

538

sequence = [event["event_type"] for event in chain]

539

pattern = " -> ".join(sequence)

540

patterns[pattern] += 1

541

542

print("\n=== Event Chain Patterns ===")

543

for pattern, count in sorted(patterns.items(), key=lambda x: x[1], reverse=True):

544

print(f"{count}x: {pattern}")

545

```