0
# Test Result Management
1
2
Comprehensive test result handling including extended result APIs, stream results for real-time reporting, result decorators, and multi-format output support for flexible test result processing.
3
4
## Capabilities
5
6
### Enhanced Test Results
7
8
Extended TestResult class with additional capabilities beyond unittest.TestResult.
9
10
```python { .api }
11
class TestResult(unittest.TestResult):
12
"""
13
Enhanced test result with extended API.
14
15
Extends unittest.TestResult with additional methods for
16
detailed test result tracking and content attachment support.
17
"""
18
19
def addError(self, test, err, details=None):
20
"""
21
Record a test error with optional details.
22
23
Args:
24
test: Test case that errored
25
err: Exception information (type, value, traceback)
26
details (dict): Optional detailed error information
27
"""
28
29
def addFailure(self, test, err, details=None):
30
"""
31
Record a test failure with optional details.
32
33
Args:
34
test: Test case that failed
35
err: Exception information (type, value, traceback)
36
details (dict): Optional detailed failure information
37
"""
38
39
def addSuccess(self, test, details=None):
40
"""
41
Record a successful test with optional details.
42
43
Args:
44
test: Test case that succeeded
45
details (dict): Optional success details
46
"""
47
48
def addSkip(self, test, reason, details=None):
49
"""
50
Record a skipped test.
51
52
Args:
53
test: Test case that was skipped
54
reason (str): Reason for skipping
55
details (dict): Optional skip details
56
"""
57
58
def addExpectedFailure(self, test, err, details=None):
59
"""
60
Record an expected failure.
61
62
Args:
63
test: Test case with expected failure
64
err: Exception information
65
details (dict): Optional failure details
66
"""
67
68
def addUnexpectedSuccess(self, test, details=None):
69
"""
70
Record an unexpected success.
71
72
Args:
73
test: Test case that unexpectedly succeeded
74
details (dict): Optional success details
75
"""
76
77
class TextTestResult(TestResult):
78
"""
79
Text-based test result output formatter.
80
81
Provides human-readable test result output with
82
configurable verbosity and formatting options.
83
"""
84
85
def __init__(self, stream, descriptions=True, verbosity=1):
86
"""
87
Create text test result formatter.
88
89
Args:
90
stream: Output stream for results
91
descriptions (bool): Include test descriptions
92
verbosity (int): Output verbosity level (0-2)
93
"""
94
95
class TestResultDecorator(TestResult):
96
"""
97
Base class for decorating/wrapping test results.
98
99
Allows chaining multiple result processors and
100
implementing result transformation pipelines.
101
"""
102
103
def __init__(self, decorated):
104
"""
105
Create result decorator.
106
107
Args:
108
decorated: TestResult to decorate
109
"""
110
```
111
112
### Stream Results
113
114
Modern streaming test result system for real-time test reporting and parallel execution support.
115
116
```python { .api }
117
class StreamResult:
118
"""
119
Base streaming test result protocol.
120
121
Provides event-based test result reporting for
122
real-time processing and distributed test execution.
123
"""
124
125
def startTestRun(self):
126
"""Signal start of test run."""
127
128
def stopTestRun(self):
129
"""Signal end of test run."""
130
131
def status(self, test_id=None, test_status=None, test_tags=None,
132
runnable=True, file_name=None, file_bytes=None,
133
eof=False, mime_type=None, route_code=None, timestamp=None):
134
"""
135
Report test status change.
136
137
Args:
138
test_id (str): Unique test identifier
139
test_status (str): Test status ('inprogress', 'success', 'fail', etc.)
140
test_tags (set): Test tags
141
runnable (bool): Whether test is runnable
142
file_name (str): Attached file name
143
file_bytes (bytes): Attached file content
144
eof (bool): End of file marker
145
mime_type (str): MIME type of attached content
146
route_code (str): Routing information
147
timestamp: Event timestamp
148
"""
149
150
class CopyStreamResult(StreamResult):
151
"""
152
Copy/duplicate stream results to multiple destinations.
153
154
Allows sending test results to multiple processors
155
simultaneously for parallel result handling.
156
"""
157
158
def __init__(self, targets):
159
"""
160
Create stream result copier.
161
162
Args:
163
targets: List of StreamResult targets
164
"""
165
166
class StreamSummary(StreamResult):
167
"""
168
Summarize and aggregate stream results.
169
170
Collects summary statistics and provides
171
overall test run reporting.
172
"""
173
174
def __init__(self):
175
"""Create stream result summarizer."""
176
177
def get_summary(self):
178
"""
179
Get test run summary.
180
181
Returns:
182
dict: Summary statistics including counts and status
183
"""
184
185
class StreamTagger(StreamResult):
186
"""
187
Add tags to stream results.
188
189
Allows dynamic tagging of test results based
190
on test execution context or conditions.
191
"""
192
193
def __init__(self, new_tags, gone_tags, target):
194
"""
195
Create stream result tagger.
196
197
Args:
198
new_tags (set): Tags to add
199
gone_tags (set): Tags to remove
200
target: Target StreamResult
201
"""
202
203
class StreamFailFast(StreamResult):
204
"""
205
Fail-fast implementation for stream results.
206
207
Stops test execution on first failure for
208
rapid feedback during development.
209
"""
210
211
def __init__(self, target):
212
"""
213
Create fail-fast stream result.
214
215
Args:
216
target: Target StreamResult
217
"""
218
219
class StreamResultRouter(StreamResult):
220
"""
221
Route stream results to multiple destinations based on criteria.
222
223
Enables conditional result routing for different
224
test types or execution contexts.
225
"""
226
227
def __init__(self):
228
"""Create stream result router."""
229
230
def add_rule(self, test_case_filter, target):
231
"""
232
Add routing rule.
233
234
Args:
235
test_case_filter: Function to filter test cases
236
target: Target StreamResult for matching tests
237
"""
238
```
239
240
### Result Converters
241
242
Classes for converting between different result formats and APIs.
243
244
```python { .api }
245
class ExtendedToOriginalDecorator(TestResultDecorator):
246
"""
247
Convert extended test results to standard unittest format.
248
249
Enables compatibility with code expecting
250
standard unittest.TestResult interface.
251
"""
252
253
def __init__(self, decorated):
254
"""
255
Create extended-to-original converter.
256
257
Args:
258
decorated: Original TestResult to wrap
259
"""
260
261
class ExtendedToStreamDecorator(StreamResult):
262
"""
263
Convert extended test results to stream format.
264
265
Bridges extended TestResult API with
266
modern StreamResult protocol.
267
"""
268
269
def __init__(self, decorated):
270
"""
271
Create extended-to-stream converter.
272
273
Args:
274
decorated: Extended TestResult to convert
275
"""
276
277
class ResourcedToStreamDecorator(ExtendedToStreamDecorator):
278
"""
279
Convert resourced test results to stream format.
280
281
Reports testresources-related activity to StreamResult objects,
282
implementing the resource lifecycle TestResult protocol extension
283
supported by testresources.TestResourceManager class.
284
"""
285
286
def __init__(self, target):
287
"""
288
Create resourced-to-stream converter.
289
290
Args:
291
target: StreamResult to send resource lifecycle events to
292
"""
293
294
def startMakeResource(self, resource):
295
"""
296
Signal start of resource creation.
297
298
Args:
299
resource: Resource being created
300
"""
301
302
def stopMakeResource(self, resource):
303
"""
304
Signal completion of resource creation.
305
306
Args:
307
resource: Resource that was created
308
"""
309
310
def startCleanResource(self, resource):
311
"""
312
Signal start of resource cleanup.
313
314
Args:
315
resource: Resource being cleaned up
316
"""
317
318
def stopCleanResource(self, resource):
319
"""
320
Signal completion of resource cleanup.
321
322
Args:
323
resource: Resource that was cleaned up
324
"""
325
326
class StreamToExtendedDecorator(TestResult):
327
"""
328
Convert stream results to extended result format.
329
330
Allows using StreamResult sources with
331
extended TestResult consumers.
332
"""
333
334
def __init__(self, target):
335
"""
336
Create stream-to-extended converter.
337
338
Args:
339
target: StreamResult to convert from
340
"""
341
342
class StreamToDict(StreamResult):
343
"""
344
Convert stream results to dictionary format.
345
346
Provides structured data representation
347
of test results for analysis and storage.
348
"""
349
350
def __init__(self):
351
"""Create stream-to-dict converter."""
352
353
def get_results(self):
354
"""
355
Get test results as dictionary.
356
357
Returns:
358
dict: Structured test result data
359
"""
360
361
class StreamToQueue(StreamResult):
362
"""
363
Send stream results to a queue for processing.
364
365
Enables asynchronous result processing and
366
inter-process result communication.
367
"""
368
369
def __init__(self, queue):
370
"""
371
Create stream-to-queue forwarder.
372
373
Args:
374
queue: Queue object for result storage
375
"""
376
```
377
378
### Multi-Result Handling
379
380
Classes for combining and managing multiple test results.
381
382
```python { .api }
383
class MultiTestResult(TestResult):
384
"""
385
Combine and aggregate multiple test results.
386
387
Allows sending test results to multiple
388
processors simultaneously.
389
"""
390
391
def __init__(self, *results):
392
"""
393
Create multi-result handler.
394
395
Args:
396
*results: TestResult instances to combine
397
"""
398
399
class TestByTestResult(TestResult):
400
"""
401
Result that provides per-test callbacks.
402
403
Enables custom processing logic for
404
individual test completion events.
405
"""
406
407
def __init__(self, on_test_callback):
408
"""
409
Create per-test callback result.
410
411
Args:
412
on_test_callback: Function called for each test
413
"""
414
415
class ThreadsafeForwardingResult(TestResult):
416
"""
417
Thread-safe test result forwarding.
418
419
Ensures safe result handling in concurrent
420
test execution environments.
421
"""
422
423
def __init__(self, target, semaphore):
424
"""
425
Create thread-safe forwarder.
426
427
Args:
428
target: Target TestResult
429
semaphore: Threading semaphore for synchronization
430
"""
431
```
432
433
### Test Control and Execution
434
435
Classes for controlling test execution flow and lifecycle.
436
437
```python { .api }
438
class TestControl:
439
"""
440
Control test execution flow and lifecycle.
441
442
Provides hooks for test start, stop, and
443
execution control decisions.
444
"""
445
446
def __init__(self):
447
"""Create test control instance."""
448
449
def shouldStop(self):
450
"""
451
Check if test execution should stop.
452
453
Returns:
454
bool: True if execution should stop
455
"""
456
457
class Tagger(TestResultDecorator):
458
"""
459
Add tags to test results for categorization.
460
461
Enables test result filtering and
462
organizational capabilities.
463
"""
464
465
def __init__(self, new_tags, gone_tags, decorated):
466
"""
467
Create result tagger.
468
469
Args:
470
new_tags (set): Tags to add to results
471
gone_tags (set): Tags to remove from results
472
decorated: TestResult to decorate
473
"""
474
475
class TimestampingStreamResult(StreamResult):
476
"""
477
Add timestamps to stream result events.
478
479
Provides timing information for performance
480
analysis and result correlation.
481
"""
482
483
def __init__(self, target):
484
"""
485
Create timestamping stream result.
486
487
Args:
488
target: Target StreamResult
489
"""
490
```
491
492
## Usage Examples
493
494
### Basic Result Handling
495
496
```python
497
import testtools
498
import sys
499
500
# Create enhanced test result
501
result = testtools.TestResult()
502
503
# Run tests with result capture
504
suite = testtools.TestSuite()
505
suite.addTest(MyTest('test_method'))
506
suite.run(result)
507
508
# Check results
509
print(f"Tests run: {result.testsRun}")
510
print(f"Failures: {len(result.failures)}")
511
print(f"Errors: {len(result.errors)}")
512
```
513
514
### Stream Result Processing
515
516
```python
517
import testtools
518
519
# Create stream result pipeline
520
summary = testtools.StreamSummary()
521
tagger = testtools.StreamTagger({'integration'}, set(), summary)
522
router = testtools.StreamResultRouter()
523
524
# Configure routing
525
router.add_rule(
526
lambda test: 'unit' in test.id(),
527
unit_result_handler
528
)
529
router.add_rule(
530
lambda test: 'integration' in test.id(),
531
tagger
532
)
533
534
# Use with test execution
535
result = testtools.StreamToExtendedDecorator(router)
536
suite.run(result)
537
538
# Get summary
539
stats = summary.get_summary()
540
print(f"Total tests: {stats['tests_run']}")
541
```
542
543
### Custom Result Processing
544
545
```python
546
class CustomTestResult(testtools.TestResult):
547
def __init__(self):
548
super().__init__()
549
self.custom_data = []
550
551
def addSuccess(self, test, details=None):
552
super().addSuccess(test, details)
553
# Custom success processing
554
self.custom_data.append({
555
'test': test.id(),
556
'status': 'success',
557
'details': details
558
})
559
560
def addFailure(self, test, err, details=None):
561
super().addFailure(test, err, details)
562
# Custom failure processing
563
self.custom_data.append({
564
'test': test.id(),
565
'status': 'failure',
566
'error': str(err[1]),
567
'details': details
568
})
569
570
# Use custom result
571
result = CustomTestResult()
572
suite.run(result)
573
574
# Process custom data
575
for entry in result.custom_data:
576
print(f"Test {entry['test']}: {entry['status']}")
577
```
578
579
### Multi-Result Distribution
580
581
```python
582
# Create multiple result handlers
583
file_result = FileTestResult('results.log')
584
db_result = DatabaseTestResult(connection)
585
console_result = testtools.TextTestResult(sys.stdout, verbosity=2)
586
587
# Combine results
588
multi_result = testtools.MultiTestResult(
589
file_result,
590
db_result,
591
console_result
592
)
593
594
# Results go to all handlers
595
suite.run(multi_result)
596
```
597
598
### Concurrent Result Handling
599
600
```python
601
import threading
602
603
# Thread-safe result handling
604
semaphore = threading.Semaphore()
605
safe_result = testtools.ThreadsafeForwardingResult(
606
base_result,
607
semaphore
608
)
609
610
# Use with concurrent test execution
611
concurrent_suite = testtools.ConcurrentTestSuite(
612
suite,
613
testtools.fork_runner
614
)
615
concurrent_suite.run(safe_result)
616
```