Pragmatic Testing Framework for Python with BDD-style syntax and pluggable architecture
49
Pending
Does it follow best practices?
Impact
49%
1.08xAverage score across 10 eval scenarios
Pending
The risk profile of this skill
Comprehensive event system for plugin development and test lifecycle monitoring.
Foundation for all events in the vedro event system.
class Event:
"""
Abstract base class for all vedro events.
Events are used throughout the vedro lifecycle to notify plugins
about various stages of test execution, allowing for customization
and extension of framework behavior.
"""
def __eq__(self, other) -> bool: ...
def __repr__(self) -> str: ...Events related to configuration loading and argument parsing.
class ConfigLoadedEvent:
"""
Fired when configuration is loaded from vedro.cfg.py.
Attributes:
config_path (Path): Path to the configuration file
config (ConfigType): The loaded configuration object
"""
config_path: Path
config: ConfigType
class ArgParseEvent:
"""
Fired when command-line arguments are being parsed.
Allows plugins to add custom command-line arguments.
Attributes:
arg_parser (ArgumentParser): The argument parser instance
"""
arg_parser: ArgumentParser
class ArgParsedEvent:
"""
Fired after command-line arguments have been parsed.
Allows plugins to access and react to parsed arguments.
Attributes:
args (Namespace): The parsed arguments namespace
"""
args: NamespaceEvents that mark major phases in test execution lifecycle.
class StartupEvent:
"""
Fired at the beginning of test execution.
Provides access to the scenario scheduler containing discovered scenarios.
Attributes:
scheduler (ScenarioScheduler): The scenario scheduler instance
"""
scheduler: ScenarioScheduler
class CleanupEvent:
"""
Fired at the end of test execution.
Provides access to the complete test execution report.
Attributes:
report (Report): Complete test execution report with results and statistics
"""
report: ReportEvents tracking individual scenario execution states.
class ScenarioRunEvent:
"""
Fired when a scenario starts running.
Attributes:
scenario_result (ScenarioResult): The scenario result object being populated
"""
scenario_result: ScenarioResult
class ScenarioPassedEvent:
"""
Fired when a scenario completes successfully.
Attributes:
scenario_result (ScenarioResult): The completed scenario result
"""
scenario_result: ScenarioResult
class ScenarioFailedEvent:
"""
Fired when a scenario fails during execution.
Attributes:
scenario_result (ScenarioResult): The failed scenario result with error information
"""
scenario_result: ScenarioResult
class ScenarioSkippedEvent:
"""
Fired when a scenario is skipped.
Attributes:
scenario_result (ScenarioResult): The skipped scenario result
"""
scenario_result: ScenarioResult
class ScenarioReportedEvent:
"""
Fired after a scenario is reported (post-processing phase).
Attributes:
aggregated_result (AggregatedResult): Extended scenario result with aggregated data
"""
aggregated_result: AggregatedResultEvents monitoring individual step execution within scenarios.
class StepRunEvent:
"""
Fired when a step starts running.
Attributes:
step_result (StepResult): The step result object being populated
"""
step_result: StepResult
class StepPassedEvent:
"""
Fired when a step completes successfully.
Attributes:
step_result (StepResult): The completed step result
"""
step_result: StepResult
class StepFailedEvent:
"""
Fired when a step fails during execution.
Attributes:
step_result (StepResult): The failed step result with error information
"""
step_result: StepResultEvents for handling exceptions raised during test execution.
class ExceptionRaisedEvent:
"""
Fired when any exception is raised during test execution.
Provides global exception handling and logging capabilities.
Attributes:
exc_info (ExcInfo): Exception information including type, value, and traceback
"""
exc_info: ExcInfofrom vedro.core import Plugin, PluginConfig
from vedro.events import ScenarioPassedEvent, ScenarioFailedEvent
import time
class TestTimerPlugin(Plugin):
"""Plugin that measures and reports test execution times."""
def __init__(self, config: "TestTimerConfig"):
super().__init__(config)
self._start_times = {}
def subscribe(self, dispatcher):
dispatcher.listen(ScenarioRunEvent, self.on_scenario_start)
dispatcher.listen(ScenarioPassedEvent, self.on_scenario_end)
dispatcher.listen(ScenarioFailedEvent, self.on_scenario_end)
def on_scenario_start(self, event: ScenarioRunEvent):
scenario_id = event.scenario_result.scenario.unique_id
self._start_times[scenario_id] = time.time()
def on_scenario_end(self, event):
scenario_id = event.scenario_result.scenario.unique_id
if scenario_id in self._start_times:
duration = time.time() - self._start_times[scenario_id]
print(f"Scenario {scenario_id} took {duration:.2f} seconds")
del self._start_times[scenario_id]
class TestTimerConfig(PluginConfig):
plugin = TestTimerPlugin
enabled = Truefrom vedro.events import *
import json
import logging
class ComprehensiveLoggingPlugin(Plugin):
"""Plugin that provides comprehensive logging of all test events."""
def __init__(self, config: "ComprehensiveLoggingConfig"):
super().__init__(config)
self.logger = logging.getLogger("vedro.comprehensive")
self._test_session = {
"start_time": None,
"scenarios": {},
"global_artifacts": []
}
def subscribe(self, dispatcher):
# Configuration events
dispatcher.listen(ConfigLoadedEvent, self.on_config_loaded)
dispatcher.listen(ArgParsedEvent, self.on_args_parsed)
# Lifecycle events
dispatcher.listen(StartupEvent, self.on_startup)
dispatcher.listen(CleanupEvent, self.on_cleanup)
# Scenario events
dispatcher.listen(ScenarioRunEvent, self.on_scenario_run)
dispatcher.listen(ScenarioPassedEvent, self.on_scenario_passed)
dispatcher.listen(ScenarioFailedEvent, self.on_scenario_failed)
dispatcher.listen(ScenarioSkippedEvent, self.on_scenario_skipped)
# Step events
dispatcher.listen(StepRunEvent, self.on_step_run)
dispatcher.listen(StepPassedEvent, self.on_step_passed)
dispatcher.listen(StepFailedEvent, self.on_step_failed)
# Exception events
dispatcher.listen(ExceptionRaisedEvent, self.on_exception_raised)
def on_config_loaded(self, event: ConfigLoadedEvent):
self.logger.info(f"Configuration loaded from {event.config_path}")
def on_args_parsed(self, event: ArgParsedEvent):
self.logger.info(f"Command line arguments: {vars(event.args)}")
def on_startup(self, event: StartupEvent):
self._test_session["start_time"] = time.time()
scenario_count = len(event.scheduler.discovered)
self.logger.info(f"Test session starting with {scenario_count} scenarios")
def on_scenario_run(self, event: ScenarioRunEvent):
scenario_id = event.scenario_result.scenario.unique_id
self._test_session["scenarios"][scenario_id] = {
"status": "running",
"start_time": time.time(),
"steps": []
}
self.logger.info(f"Scenario started: {event.scenario_result.scenario.subject}")
def on_step_run(self, event: StepRunEvent):
scenario_id = event.step_result.scenario_result.scenario.unique_id
step_info = {
"name": event.step_result.step_name,
"status": "running",
"start_time": time.time()
}
if scenario_id in self._test_session["scenarios"]:
self._test_session["scenarios"][scenario_id]["steps"].append(step_info)
def on_step_passed(self, event: StepPassedEvent):
self._update_step_status(event.step_result, "passed")
def on_step_failed(self, event: StepFailedEvent):
self._update_step_status(event.step_result, "failed", event.step_result.exc_info)
def on_scenario_passed(self, event: ScenarioPassedEvent):
self._update_scenario_status(event.scenario_result, "passed")
def on_scenario_failed(self, event: ScenarioFailedEvent):
self._update_scenario_status(event.scenario_result, "failed")
def on_scenario_skipped(self, event: ScenarioSkippedEvent):
self._update_scenario_status(event.scenario_result, "skipped")
def on_exception_raised(self, event: ExceptionRaisedEvent):
self.logger.error(f"Exception raised: {event.exc_info.type.__name__}: {event.exc_info.value}")
def on_cleanup(self, event: CleanupEvent):
session_duration = time.time() - self._test_session["start_time"]
summary = {
"total_duration": session_duration,
"total_scenarios": len(self._test_session["scenarios"]),
"passed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "passed"]),
"failed": len([s for s in self._test_session["scenarios"].values() if s["status"] == "failed"]),
"skipped": len([s for s in self._test_session["scenarios"].values() if s["status"] == "skipped"])
}
self.logger.info(f"Test session completed: {json.dumps(summary, indent=2)}")
def _update_step_status(self, step_result, status, exc_info=None):
scenario_id = step_result.scenario_result.scenario.unique_id
if scenario_id in self._test_session["scenarios"]:
for step in self._test_session["scenarios"][scenario_id]["steps"]:
if step["name"] == step_result.step_name and step["status"] == "running":
step["status"] = status
step["end_time"] = time.time()
step["duration"] = step["end_time"] - step["start_time"]
if exc_info:
step["error"] = str(exc_info.value)
break
def _update_scenario_status(self, scenario_result, status):
scenario_id = scenario_result.scenario.unique_id
if scenario_id in self._test_session["scenarios"]:
scenario_data = self._test_session["scenarios"][scenario_id]
scenario_data["status"] = status
scenario_data["end_time"] = time.time()
scenario_data["duration"] = scenario_data["end_time"] - scenario_data["start_time"]
class ComprehensiveLoggingConfig(PluginConfig):
plugin = ComprehensiveLoggingPlugin
enabled = False # Enable when needed for debuggingclass ConditionalEventPlugin(Plugin):
"""Plugin demonstrating conditional event processing."""
def __init__(self, config: "ConditionalEventConfig"):
super().__init__(config)
self.config = config
def subscribe(self, dispatcher):
# Only listen to events if certain conditions are met
if self.config.monitor_slow_tests:
dispatcher.listen(ScenarioPassedEvent, self.check_slow_scenario)
dispatcher.listen(ScenarioFailedEvent, self.check_slow_scenario)
if self.config.log_exceptions:
dispatcher.listen(ExceptionRaisedEvent, self.log_exception)
if self.config.track_artifacts:
dispatcher.listen(ScenarioReportedEvent, self.analyze_artifacts)
def check_slow_scenario(self, event):
"""Alert on scenarios that take too long."""
duration = event.scenario_result.elapsed
if duration > self.config.slow_threshold:
print(f"SLOW TEST ALERT: {event.scenario_result.scenario.subject} took {duration:.2f}s")
def log_exception(self, event: ExceptionRaisedEvent):
"""Log exceptions with context."""
exc_info = event.exc_info
print(f"Exception in test: {exc_info.type.__name__}: {exc_info.value}")
# Only log full traceback for certain exception types
if exc_info.type in (AssertionError, ValueError, TypeError):
import traceback
traceback.print_exception(exc_info.type, exc_info.value, exc_info.traceback)
def analyze_artifacts(self, event: ScenarioReportedEvent):
"""Analyze artifacts attached to scenarios."""
artifacts = event.aggregated_result.artifacts
if len(artifacts) > self.config.max_artifacts:
print(f"WARNING: Scenario has {len(artifacts)} artifacts (max: {self.config.max_artifacts})")
class ConditionalEventConfig(PluginConfig):
plugin = ConditionalEventPlugin
enabled = True
monitor_slow_tests: bool = True
slow_threshold: float = 5.0 # seconds
log_exceptions: bool = True
track_artifacts: bool = True
max_artifacts: int = 10Key data structures used in events:
from pathlib import Path
from argparse import ArgumentParser, Namespace
from typing import Any
# Configuration types
ConfigType = Any # Actual config class type
Path = Path # pathlib.Path
# Argument parsing types
ArgumentParser = ArgumentParser
Namespace = Namespace
# Core result types (referenced from other modules)
ScenarioResult = Any # vedro.core.ScenarioResult
AggregatedResult = Any # vedro.core.AggregatedResult
StepResult = Any # vedro.core.StepResult
ExcInfo = Any # vedro.core.ExcInfo
Report = Any # vedro.core.Report
ScenarioScheduler = Any # vedro.core.ScenarioSchedulerCollect and analyze events across multiple scenarios:
class EventAggregatorPlugin(Plugin):
"""Plugin that aggregates events for analysis."""
def __init__(self, config):
super().__init__(config)
self.event_counts = defaultdict(int)
self.scenario_timings = []
self.step_timings = defaultdict(list)
def subscribe(self, dispatcher):
# Count all event types
for event_type in [ScenarioRunEvent, ScenarioPassedEvent, ScenarioFailedEvent,
StepRunEvent, StepPassedEvent, StepFailedEvent]:
dispatcher.listen(event_type, lambda event, et=event_type: self._count_event(et))
dispatcher.listen(ScenarioPassedEvent, self.collect_scenario_timing)
dispatcher.listen(ScenarioFailedEvent, self.collect_scenario_timing)
dispatcher.listen(StepPassedEvent, self.collect_step_timing)
dispatcher.listen(CleanupEvent, self.report_aggregated_data)
def _count_event(self, event_type):
self.event_counts[event_type.__name__] += 1
def collect_scenario_timing(self, event):
self.scenario_timings.append(event.scenario_result.elapsed)
def collect_step_timing(self, event):
step_name = event.step_result.step_name
self.step_timings[step_name].append(event.step_result.elapsed)
def report_aggregated_data(self, event: CleanupEvent):
print("\n=== Event Analysis ===")
print("Event counts:")
for event_type, count in self.event_counts.items():
print(f" {event_type}: {count}")
if self.scenario_timings:
import statistics
print(f"\nScenario timing statistics:")
print(f" Average: {statistics.mean(self.scenario_timings):.2f}s")
print(f" Median: {statistics.median(self.scenario_timings):.2f}s")
print(f" Max: {max(self.scenario_timings):.2f}s")
print(f" Min: {min(self.scenario_timings):.2f}s")Track sequences of events for workflow analysis:
class EventChainAnalyzer(Plugin):
"""Analyze event sequences for workflow insights."""
def __init__(self, config):
super().__init__(config)
self.event_chains = defaultdict(list)
self.current_scenario = None
def subscribe(self, dispatcher):
# Track all major events in order
events_to_track = [
ScenarioRunEvent, StepRunEvent, StepPassedEvent, StepFailedEvent,
ScenarioPassedEvent, ScenarioFailedEvent, ScenarioSkippedEvent
]
for event_type in events_to_track:
dispatcher.listen(event_type, lambda event, et=event_type: self._track_event(et, event))
def _track_event(self, event_type, event):
if hasattr(event, 'scenario_result'):
scenario_id = event.scenario_result.scenario.unique_id
elif hasattr(event, 'step_result'):
scenario_id = event.step_result.scenario_result.scenario.unique_id
else:
return
self.event_chains[scenario_id].append({
"event_type": event_type.__name__,
"timestamp": time.time(),
"event": event
})
def analyze_chains(self):
"""Analyze common event patterns."""
patterns = defaultdict(int)
for scenario_id, chain in self.event_chains.items():
# Extract event type sequence
sequence = [event["event_type"] for event in chain]
pattern = " -> ".join(sequence)
patterns[pattern] += 1
print("\n=== Event Chain Patterns ===")
for pattern, count in sorted(patterns.items(), key=lambda x: x[1], reverse=True):
print(f"{count}x: {pattern}")docs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10