CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-testtools

Extensions to the Python standard library unit testing framework

Overview
Eval results
Files

test-results.mddocs/

Test Result Management

Comprehensive test result handling including extended result APIs, stream results for real-time reporting, result decorators, and multi-format output support for flexible test result processing.

Capabilities

Enhanced Test Results

Extended TestResult class with additional capabilities beyond unittest.TestResult.

class TestResult(unittest.TestResult):
    """
    Enhanced test result with extended API.
    
    Extends unittest.TestResult with additional methods for
    detailed test result tracking and content attachment support.
    """
    
    def addError(self, test, err, details=None):
        """
        Record a test error with optional details.
        
        Args:
            test: Test case that errored
            err: Exception information (type, value, traceback)
            details (dict): Optional detailed error information
        """
    
    def addFailure(self, test, err, details=None):
        """
        Record a test failure with optional details.
        
        Args:
            test: Test case that failed
            err: Exception information (type, value, traceback)
            details (dict): Optional detailed failure information
        """
    
    def addSuccess(self, test, details=None):
        """
        Record a successful test with optional details.
        
        Args:
            test: Test case that succeeded
            details (dict): Optional success details
        """
    
    def addSkip(self, test, reason, details=None):
        """
        Record a skipped test.
        
        Args:
            test: Test case that was skipped
            reason (str): Reason for skipping
            details (dict): Optional skip details
        """
    
    def addExpectedFailure(self, test, err, details=None):
        """
        Record an expected failure.
        
        Args:
            test: Test case with expected failure
            err: Exception information
            details (dict): Optional failure details
        """
    
    def addUnexpectedSuccess(self, test, details=None):
        """
        Record an unexpected success.
        
        Args:
            test: Test case that unexpectedly succeeded
            details (dict): Optional success details
        """

class TextTestResult(TestResult):
    """
    Text-based test result output formatter.
    
    Provides human-readable test result output with
    configurable verbosity and formatting options.
    """
    
    def __init__(self, stream, descriptions=True, verbosity=1):
        """
        Create text test result formatter.
        
        Args:
            stream: Output stream for results
            descriptions (bool): Include test descriptions
            verbosity (int): Output verbosity level (0-2)
        """

class TestResultDecorator(TestResult):
    """
    Base class for decorating/wrapping test results.
    
    Allows chaining multiple result processors and
    implementing result transformation pipelines.
    """
    
    def __init__(self, decorated):
        """
        Create result decorator.
        
        Args:
            decorated: TestResult to decorate
        """

Stream Results

Modern streaming test result system for real-time test reporting and parallel execution support.

class StreamResult:
    """
    Base streaming test result protocol.
    
    Provides event-based test result reporting for
    real-time processing and distributed test execution.
    """
    
    def startTestRun(self):
        """Signal start of test run."""
    
    def stopTestRun(self):
        """Signal end of test run."""
    
    def status(self, test_id=None, test_status=None, test_tags=None, 
               runnable=True, file_name=None, file_bytes=None, 
               eof=False, mime_type=None, route_code=None, timestamp=None):
        """
        Report test status change.
        
        Args:
            test_id (str): Unique test identifier
            test_status (str): Test status ('inprogress', 'success', 'fail', etc.)
            test_tags (set): Test tags
            runnable (bool): Whether test is runnable
            file_name (str): Attached file name
            file_bytes (bytes): Attached file content
            eof (bool): End of file marker
            mime_type (str): MIME type of attached content
            route_code (str): Routing information
            timestamp: Event timestamp
        """

class CopyStreamResult(StreamResult):
    """
    Copy/duplicate stream results to multiple destinations.
    
    Allows sending test results to multiple processors
    simultaneously for parallel result handling.
    """
    
    def __init__(self, targets):
        """
        Create stream result copier.
        
        Args:
            targets: List of StreamResult targets
        """

class StreamSummary(StreamResult):
    """
    Summarize and aggregate stream results.
    
    Collects summary statistics and provides
    overall test run reporting.
    """
    
    def __init__(self):
        """Create stream result summarizer."""
    
    def get_summary(self):
        """
        Get test run summary.
        
        Returns:
            dict: Summary statistics including counts and status
        """

class StreamTagger(StreamResult):
    """
    Add tags to stream results.
    
    Allows dynamic tagging of test results based
    on test execution context or conditions.
    """
    
    def __init__(self, new_tags, gone_tags, target):
        """
        Create stream result tagger.
        
        Args:
            new_tags (set): Tags to add
            gone_tags (set): Tags to remove
            target: Target StreamResult
        """

class StreamFailFast(StreamResult):
    """
    Fail-fast implementation for stream results.
    
    Stops test execution on first failure for
    rapid feedback during development.
    """
    
    def __init__(self, target):
        """
        Create fail-fast stream result.
        
        Args:
            target: Target StreamResult
        """

class StreamResultRouter(StreamResult):
    """
    Route stream results to multiple destinations based on criteria.
    
    Enables conditional result routing for different
    test types or execution contexts.
    """
    
    def __init__(self):
        """Create stream result router."""
    
    def add_rule(self, test_case_filter, target):
        """
        Add routing rule.
        
        Args:
            test_case_filter: Function to filter test cases
            target: Target StreamResult for matching tests
        """

Result Converters

Classes for converting between different result formats and APIs.

class ExtendedToOriginalDecorator(TestResultDecorator):
    """
    Convert extended test results to standard unittest format.
    
    Enables compatibility with code expecting
    standard unittest.TestResult interface.
    """
    
    def __init__(self, decorated):
        """
        Create extended-to-original converter.
        
        Args:
            decorated: Original TestResult to wrap
        """

class ExtendedToStreamDecorator(StreamResult):
    """
    Convert extended test results to stream format.
    
    Bridges extended TestResult API with
    modern StreamResult protocol.
    """
    
    def __init__(self, decorated):
        """
        Create extended-to-stream converter.
        
        Args:
            decorated: Extended TestResult to convert
        """

class ResourcedToStreamDecorator(ExtendedToStreamDecorator):
    """
    Convert resourced test results to stream format.
    
    Reports testresources-related activity to StreamResult objects,
    implementing the resource lifecycle TestResult protocol extension
    supported by testresources.TestResourceManager class.
    """
    
    def __init__(self, target):
        """
        Create resourced-to-stream converter.
        
        Args:
            target: StreamResult to send resource lifecycle events to
        """
    
    def startMakeResource(self, resource):
        """
        Signal start of resource creation.
        
        Args:
            resource: Resource being created
        """
    
    def stopMakeResource(self, resource):
        """
        Signal completion of resource creation.
        
        Args:
            resource: Resource that was created
        """
    
    def startCleanResource(self, resource):
        """
        Signal start of resource cleanup.
        
        Args:
            resource: Resource being cleaned up
        """
    
    def stopCleanResource(self, resource):
        """
        Signal completion of resource cleanup.
        
        Args:
            resource: Resource that was cleaned up
        """

class StreamToExtendedDecorator(TestResult):
    """
    Convert stream results to extended result format.
    
    Allows using StreamResult sources with
    extended TestResult consumers.
    """
    
    def __init__(self, target):
        """
        Create stream-to-extended converter.
        
        Args:
            target: StreamResult to convert from
        """

class StreamToDict(StreamResult):
    """
    Convert stream results to dictionary format.
    
    Provides structured data representation
    of test results for analysis and storage.
    """
    
    def __init__(self):
        """Create stream-to-dict converter."""
    
    def get_results(self):
        """
        Get test results as dictionary.
        
        Returns:
            dict: Structured test result data
        """

class StreamToQueue(StreamResult):
    """
    Send stream results to a queue for processing.
    
    Enables asynchronous result processing and
    inter-process result communication.
    """
    
    def __init__(self, queue):
        """
        Create stream-to-queue forwarder.
        
        Args:
            queue: Queue object for result storage
        """

Multi-Result Handling

Classes for combining and managing multiple test results.

class MultiTestResult(TestResult):
    """
    Combine and aggregate multiple test results.
    
    Allows sending test results to multiple
    processors simultaneously.
    """
    
    def __init__(self, *results):
        """
        Create multi-result handler.
        
        Args:
            *results: TestResult instances to combine
        """

class TestByTestResult(TestResult):
    """
    Result that provides per-test callbacks.
    
    Enables custom processing logic for
    individual test completion events.
    """
    
    def __init__(self, on_test_callback):
        """
        Create per-test callback result.
        
        Args:
            on_test_callback: Function called for each test
        """

class ThreadsafeForwardingResult(TestResult):
    """
    Thread-safe test result forwarding.
    
    Ensures safe result handling in concurrent
    test execution environments.
    """
    
    def __init__(self, target, semaphore):
        """
        Create thread-safe forwarder.
        
        Args:
            target: Target TestResult
            semaphore: Threading semaphore for synchronization
        """

Test Control and Execution

Classes for controlling test execution flow and lifecycle.

class TestControl:
    """
    Control test execution flow and lifecycle.
    
    Provides hooks for test start, stop, and
    execution control decisions.
    """
    
    def __init__(self):
        """Create test control instance."""
    
    def shouldStop(self):
        """
        Check if test execution should stop.
        
        Returns:
            bool: True if execution should stop
        """

class Tagger(TestResultDecorator):
    """
    Add tags to test results for categorization.
    
    Enables test result filtering and
    organizational capabilities.
    """
    
    def __init__(self, new_tags, gone_tags, decorated):
        """
        Create result tagger.
        
        Args:
            new_tags (set): Tags to add to results
            gone_tags (set): Tags to remove from results
            decorated: TestResult to decorate
        """

class TimestampingStreamResult(StreamResult):
    """
    Add timestamps to stream result events.
    
    Provides timing information for performance
    analysis and result correlation.
    """
    
    def __init__(self, target):
        """
        Create timestamping stream result.
        
        Args:
            target: Target StreamResult
        """

Usage Examples

Basic Result Handling

import testtools
import sys

# Create enhanced test result
result = testtools.TestResult()

# Run tests with result capture
suite = testtools.TestSuite()
suite.addTest(MyTest('test_method'))
suite.run(result)

# Check results
print(f"Tests run: {result.testsRun}")
print(f"Failures: {len(result.failures)}")
print(f"Errors: {len(result.errors)}")

Stream Result Processing

import testtools

# Create stream result pipeline
summary = testtools.StreamSummary()
tagger = testtools.StreamTagger({'integration'}, set(), summary)
router = testtools.StreamResultRouter()

# Configure routing
router.add_rule(
    lambda test: 'unit' in test.id(),
    unit_result_handler
)
router.add_rule(
    lambda test: 'integration' in test.id(), 
    tagger
)

# Use with test execution
result = testtools.StreamToExtendedDecorator(router)
suite.run(result)

# Get summary
stats = summary.get_summary()
print(f"Total tests: {stats['tests_run']}")

Custom Result Processing

class CustomTestResult(testtools.TestResult):
    def __init__(self):
        super().__init__()
        self.custom_data = []
    
    def addSuccess(self, test, details=None):
        super().addSuccess(test, details)
        # Custom success processing
        self.custom_data.append({
            'test': test.id(),
            'status': 'success',
            'details': details
        })
    
    def addFailure(self, test, err, details=None):
        super().addFailure(test, err, details)
        # Custom failure processing
        self.custom_data.append({
            'test': test.id(),
            'status': 'failure',
            'error': str(err[1]),
            'details': details
        })

# Use custom result
result = CustomTestResult()
suite.run(result)

# Process custom data
for entry in result.custom_data:
    print(f"Test {entry['test']}: {entry['status']}")

Multi-Result Distribution

# Create multiple result handlers
file_result = FileTestResult('results.log')
db_result = DatabaseTestResult(connection)
console_result = testtools.TextTestResult(sys.stdout, verbosity=2)

# Combine results
multi_result = testtools.MultiTestResult(
    file_result,
    db_result, 
    console_result
)

# Results go to all handlers
suite.run(multi_result)

Concurrent Result Handling

import threading

# Thread-safe result handling
semaphore = threading.Semaphore()
safe_result = testtools.ThreadsafeForwardingResult(
    base_result, 
    semaphore
)

# Use with concurrent test execution
concurrent_suite = testtools.ConcurrentTestSuite(
    suite,
    testtools.fork_runner
)
concurrent_suite.run(safe_result)

Install with Tessl CLI

npx tessl i tessl/pypi-testtools

docs

content.md

helpers.md

index.md

matchers.md

test-cases.md

test-execution.md

test-results.md

twisted-support.md

tile.json