Extensions to the Python standard library unit testing framework
Comprehensive test result handling including extended result APIs, stream results for real-time reporting, result decorators, and multi-format output support for flexible test result processing.
Extended TestResult class with additional capabilities beyond unittest.TestResult.
class TestResult(unittest.TestResult):
"""
Enhanced test result with extended API.
Extends unittest.TestResult with additional methods for
detailed test result tracking and content attachment support.
"""
def addError(self, test, err, details=None):
"""
Record a test error with optional details.
Args:
test: Test case that errored
err: Exception information (type, value, traceback)
details (dict): Optional detailed error information
"""
def addFailure(self, test, err, details=None):
"""
Record a test failure with optional details.
Args:
test: Test case that failed
err: Exception information (type, value, traceback)
details (dict): Optional detailed failure information
"""
def addSuccess(self, test, details=None):
"""
Record a successful test with optional details.
Args:
test: Test case that succeeded
details (dict): Optional success details
"""
def addSkip(self, test, reason, details=None):
"""
Record a skipped test.
Args:
test: Test case that was skipped
reason (str): Reason for skipping
details (dict): Optional skip details
"""
def addExpectedFailure(self, test, err, details=None):
"""
Record an expected failure.
Args:
test: Test case with expected failure
err: Exception information
details (dict): Optional failure details
"""
def addUnexpectedSuccess(self, test, details=None):
"""
Record an unexpected success.
Args:
test: Test case that unexpectedly succeeded
details (dict): Optional success details
"""
class TextTestResult(TestResult):
"""
Text-based test result output formatter.
Provides human-readable test result output with
configurable verbosity and formatting options.
"""
def __init__(self, stream, descriptions=True, verbosity=1):
"""
Create text test result formatter.
Args:
stream: Output stream for results
descriptions (bool): Include test descriptions
verbosity (int): Output verbosity level (0-2)
"""
class TestResultDecorator(TestResult):
"""
Base class for decorating/wrapping test results.
Allows chaining multiple result processors and
implementing result transformation pipelines.
"""
def __init__(self, decorated):
"""
Create result decorator.
Args:
decorated: TestResult to decorate
"""Modern streaming test result system for real-time test reporting and parallel execution support.
class StreamResult:
"""
Base streaming test result protocol.
Provides event-based test result reporting for
real-time processing and distributed test execution.
"""
def startTestRun(self):
"""Signal start of test run."""
def stopTestRun(self):
"""Signal end of test run."""
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None,
eof=False, mime_type=None, route_code=None, timestamp=None):
"""
Report test status change.
Args:
test_id (str): Unique test identifier
test_status (str): Test status ('inprogress', 'success', 'fail', etc.)
test_tags (set): Test tags
runnable (bool): Whether test is runnable
file_name (str): Attached file name
file_bytes (bytes): Attached file content
eof (bool): End of file marker
mime_type (str): MIME type of attached content
route_code (str): Routing information
timestamp: Event timestamp
"""
class CopyStreamResult(StreamResult):
"""
Copy/duplicate stream results to multiple destinations.
Allows sending test results to multiple processors
simultaneously for parallel result handling.
"""
def __init__(self, targets):
"""
Create stream result copier.
Args:
targets: List of StreamResult targets
"""
class StreamSummary(StreamResult):
"""
Summarize and aggregate stream results.
Collects summary statistics and provides
overall test run reporting.
"""
def __init__(self):
"""Create stream result summarizer."""
def get_summary(self):
"""
Get test run summary.
Returns:
dict: Summary statistics including counts and status
"""
class StreamTagger(StreamResult):
"""
Add tags to stream results.
Allows dynamic tagging of test results based
on test execution context or conditions.
"""
def __init__(self, new_tags, gone_tags, target):
"""
Create stream result tagger.
Args:
new_tags (set): Tags to add
gone_tags (set): Tags to remove
target: Target StreamResult
"""
class StreamFailFast(StreamResult):
"""
Fail-fast implementation for stream results.
Stops test execution on first failure for
rapid feedback during development.
"""
def __init__(self, target):
"""
Create fail-fast stream result.
Args:
target: Target StreamResult
"""
class StreamResultRouter(StreamResult):
"""
Route stream results to multiple destinations based on criteria.
Enables conditional result routing for different
test types or execution contexts.
"""
def __init__(self):
"""Create stream result router."""
def add_rule(self, test_case_filter, target):
"""
Add routing rule.
Args:
test_case_filter: Function to filter test cases
target: Target StreamResult for matching tests
"""Classes for converting between different result formats and APIs.
class ExtendedToOriginalDecorator(TestResultDecorator):
"""
Convert extended test results to standard unittest format.
Enables compatibility with code expecting
standard unittest.TestResult interface.
"""
def __init__(self, decorated):
"""
Create extended-to-original converter.
Args:
decorated: Original TestResult to wrap
"""
class ExtendedToStreamDecorator(StreamResult):
"""
Convert extended test results to stream format.
Bridges extended TestResult API with
modern StreamResult protocol.
"""
def __init__(self, decorated):
"""
Create extended-to-stream converter.
Args:
decorated: Extended TestResult to convert
"""
class ResourcedToStreamDecorator(ExtendedToStreamDecorator):
"""
Convert resourced test results to stream format.
Reports testresources-related activity to StreamResult objects,
implementing the resource lifecycle TestResult protocol extension
supported by testresources.TestResourceManager class.
"""
def __init__(self, target):
"""
Create resourced-to-stream converter.
Args:
target: StreamResult to send resource lifecycle events to
"""
def startMakeResource(self, resource):
"""
Signal start of resource creation.
Args:
resource: Resource being created
"""
def stopMakeResource(self, resource):
"""
Signal completion of resource creation.
Args:
resource: Resource that was created
"""
def startCleanResource(self, resource):
"""
Signal start of resource cleanup.
Args:
resource: Resource being cleaned up
"""
def stopCleanResource(self, resource):
"""
Signal completion of resource cleanup.
Args:
resource: Resource that was cleaned up
"""
class StreamToExtendedDecorator(TestResult):
"""
Convert stream results to extended result format.
Allows using StreamResult sources with
extended TestResult consumers.
"""
def __init__(self, target):
"""
Create stream-to-extended converter.
Args:
target: StreamResult to convert from
"""
class StreamToDict(StreamResult):
"""
Convert stream results to dictionary format.
Provides structured data representation
of test results for analysis and storage.
"""
def __init__(self):
"""Create stream-to-dict converter."""
def get_results(self):
"""
Get test results as dictionary.
Returns:
dict: Structured test result data
"""
class StreamToQueue(StreamResult):
"""
Send stream results to a queue for processing.
Enables asynchronous result processing and
inter-process result communication.
"""
def __init__(self, queue):
"""
Create stream-to-queue forwarder.
Args:
queue: Queue object for result storage
"""Classes for combining and managing multiple test results.
class MultiTestResult(TestResult):
"""
Combine and aggregate multiple test results.
Allows sending test results to multiple
processors simultaneously.
"""
def __init__(self, *results):
"""
Create multi-result handler.
Args:
*results: TestResult instances to combine
"""
class TestByTestResult(TestResult):
"""
Result that provides per-test callbacks.
Enables custom processing logic for
individual test completion events.
"""
def __init__(self, on_test_callback):
"""
Create per-test callback result.
Args:
on_test_callback: Function called for each test
"""
class ThreadsafeForwardingResult(TestResult):
"""
Thread-safe test result forwarding.
Ensures safe result handling in concurrent
test execution environments.
"""
def __init__(self, target, semaphore):
"""
Create thread-safe forwarder.
Args:
target: Target TestResult
semaphore: Threading semaphore for synchronization
"""Classes for controlling test execution flow and lifecycle.
class TestControl:
"""
Control test execution flow and lifecycle.
Provides hooks for test start, stop, and
execution control decisions.
"""
def __init__(self):
"""Create test control instance."""
def shouldStop(self):
"""
Check if test execution should stop.
Returns:
bool: True if execution should stop
"""
class Tagger(TestResultDecorator):
"""
Add tags to test results for categorization.
Enables test result filtering and
organizational capabilities.
"""
def __init__(self, new_tags, gone_tags, decorated):
"""
Create result tagger.
Args:
new_tags (set): Tags to add to results
gone_tags (set): Tags to remove from results
decorated: TestResult to decorate
"""
class TimestampingStreamResult(StreamResult):
"""
Add timestamps to stream result events.
Provides timing information for performance
analysis and result correlation.
"""
def __init__(self, target):
"""
Create timestamping stream result.
Args:
target: Target StreamResult
"""import testtools
import sys
# Create enhanced test result
result = testtools.TestResult()
# Run tests with result capture
suite = testtools.TestSuite()
suite.addTest(MyTest('test_method'))
suite.run(result)
# Check results
print(f"Tests run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")import testtools
# Create stream result pipeline
summary = testtools.StreamSummary()
tagger = testtools.StreamTagger({'integration'}, set(), summary)
router = testtools.StreamResultRouter()
# Configure routing
router.add_rule(
lambda test: 'unit' in test.id(),
unit_result_handler
)
router.add_rule(
lambda test: 'integration' in test.id(),
tagger
)
# Use with test execution
result = testtools.StreamToExtendedDecorator(router)
suite.run(result)
# Get summary
stats = summary.get_summary()
print(f"Total tests: {stats['tests_run']}")class CustomTestResult(testtools.TestResult):
def __init__(self):
super().__init__()
self.custom_data = []
def addSuccess(self, test, details=None):
super().addSuccess(test, details)
# Custom success processing
self.custom_data.append({
'test': test.id(),
'status': 'success',
'details': details
})
def addFailure(self, test, err, details=None):
super().addFailure(test, err, details)
# Custom failure processing
self.custom_data.append({
'test': test.id(),
'status': 'failure',
'error': str(err[1]),
'details': details
})
# Use custom result
result = CustomTestResult()
suite.run(result)
# Process custom data
for entry in result.custom_data:
print(f"Test {entry['test']}: {entry['status']}")# Create multiple result handlers
file_result = FileTestResult('results.log')
db_result = DatabaseTestResult(connection)
console_result = testtools.TextTestResult(sys.stdout, verbosity=2)
# Combine results
multi_result = testtools.MultiTestResult(
file_result,
db_result,
console_result
)
# Results go to all handlers
suite.run(multi_result)import threading
# Thread-safe result handling
semaphore = threading.Semaphore()
safe_result = testtools.ThreadsafeForwardingResult(
base_result,
semaphore
)
# Use with concurrent test execution
concurrent_suite = testtools.ConcurrentTestSuite(
suite,
testtools.fork_runner
)
concurrent_suite.run(safe_result)Install with Tessl CLI
npx tessl i tessl/pypi-testtools