0
# Testing Utilities
1
2
Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.
3
4
## Capabilities
5
6
### Test Execution Functions
7
8
Functions for running dataflows in test environments with deterministic behavior.
9
10
```python { .api }
11
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
12
13
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...
14
```
15
16
**Parameters same as runtime functions but designed for testing:**
17
- Single-threaded execution for predictable ordering
18
- Synchronous operation for test assertions
19
- No background processes or networking complexity
20
21
**Usage Examples:**
22
```python
23
from bytewax.testing import run_main
24
from bytewax.dataflow import Dataflow
25
import bytewax.operators as op
26
27
def test_simple_dataflow():
28
flow = Dataflow("test_flow")
29
30
# Build test dataflow
31
data = op.input("input", flow, TestingSource([1, 2, 3, 4, 5]))
32
doubled = op.map("double", data, lambda x: x * 2)
33
34
# Capture output
35
captured = []
36
op.output("output", doubled, TestingSink(captured))
37
38
# Run and verify
39
run_main(flow)
40
assert captured == [2, 4, 6, 8, 10]
41
```
42
43
### Test Sources
44
45
Sources that provide deterministic, controllable data for testing.
46
47
```python { .api }
48
class TestingSource:
49
def __init__(self, data: Iterable[X]): ...
50
```
51
52
**Parameters:**
53
- `data` (Iterable[X]): Static data to emit in order
54
55
**Usage Examples:**
56
```python
57
from bytewax.testing import TestingSource
58
59
# Simple list data
60
simple_source = TestingSource([1, 2, 3, 4, 5])
61
62
# Complex objects
63
events = [
64
{"user": "alice", "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
65
{"user": "bob", "action": "purchase", "timestamp": "2023-01-01T10:05:00Z"},
66
{"user": "alice", "action": "logout", "timestamp": "2023-01-01T10:30:00Z"}
67
]
68
event_source = TestingSource(events)
69
70
# Generator data
71
def generate_data():
72
for i in range(100):
73
yield {"id": i, "value": i * i}
74
75
generator_source = TestingSource(generate_data())
76
```
77
78
### Test Sinks
79
80
Sinks that capture output data for test assertions.
81
82
```python { .api }
83
class TestingSink:
84
def __init__(self, capture_list: Optional[List] = None): ...
85
def captured(self) -> List: ...
86
```
87
88
**Parameters:**
89
- `capture_list` (List): Optional external list to append captured items to
90
91
**Usage Examples:**
92
```python
93
from bytewax.testing import TestingSink
94
95
# Capture to external list
96
results = []
97
sink = TestingSink(results)
98
99
# Use in dataflow
100
op.output("test_output", processed_stream, sink)
101
102
# Run and assert
103
run_main(flow)
104
assert len(results) == 10
105
assert all(isinstance(item, dict) for item in results)
106
107
# Capture to internal list
108
sink = TestingSink()
109
op.output("test_output", processed_stream, sink)
110
run_main(flow)
111
112
captured_data = sink.captured()
113
assert captured_data[0]["processed"] is True
114
```
115
116
### Time Testing Utilities
117
118
Tools for controlling time in tests, enabling deterministic testing of time-based operations.
119
120
```python { .api }
121
class TimeTestingGetter:
122
def __init__(self, now: datetime): ...
123
def advance(self, td: timedelta) -> None: ...
124
def get(self) -> datetime: ...
125
```
126
127
**Methods:**
128
- `advance(td)`: Move the current time forward by the given duration
129
- `get()`: Get the current mock time
130
131
**Usage Examples:**
132
```python
133
from bytewax.testing import TimeTestingGetter
134
from datetime import datetime, timedelta
135
import bytewax.operators.windowing as win
136
137
def test_windowing_with_mock_time():
138
# Create mock time starting at a known point
139
mock_time = TimeTestingGetter(datetime(2023, 1, 1, 12, 0, 0))
140
141
flow = Dataflow("time_test")
142
143
# Create events with timestamps
144
events = [
145
{"value": 1, "time": datetime(2023, 1, 1, 12, 0, 0)},
146
{"value": 2, "time": datetime(2023, 1, 1, 12, 0, 30)},
147
{"value": 3, "time": datetime(2023, 1, 1, 12, 1, 0)},
148
]
149
150
stream = op.input("events", flow, TestingSource(events))
151
keyed = op.key_on("key", stream, lambda x: "all")
152
153
# Use mock time in windowing
154
windowed = win.collect_window(
155
"windows",
156
keyed,
157
win.EventClock(lambda e: e["time"]),
158
win.TumblingWindower(timedelta(minutes=1))
159
)
160
161
results = []
162
op.output("output", windowed, TestingSink(results))
163
164
run_main(flow)
165
166
# Verify window contents
167
assert len(results) == 2 # Two windows
168
assert len(results[0][1]) == 2 # First window has 2 events
169
assert len(results[1][1]) == 1 # Second window has 1 event
170
```
171
172
### Source Testing Utilities
173
174
Utilities for testing custom sources and controlling their behavior.
175
176
```python { .api }
177
def poll_next_batch(source_part: StatefulSourcePartition, timeout: timedelta = timedelta(seconds=1)): ...
178
179
def ffwd_iter(iterator: Iterator[X], skip_to: int) -> Iterator[X]: ...
180
```
181
182
**poll_next_batch Parameters:**
183
- `source_part` (StatefulSourcePartition): Source partition to poll
184
- `timeout` (timedelta): Maximum time to wait for data
185
186
**ffwd_iter Parameters:**
187
- `iterator` (Iterator): Iterator to fast-forward
188
- `skip_to` (int): Number of items to skip
189
190
**Usage Examples:**
191
```python
192
from bytewax.testing import poll_next_batch, ffwd_iter
193
194
def test_custom_source():
195
# Test a custom source partition
196
partition = MyCustomSourcePartition(test_data)
197
198
# Poll for first batch
199
batch1 = poll_next_batch(partition, timeout=timedelta(seconds=5))
200
assert len(batch1) > 0
201
202
# Advance to specific position
203
iterator = iter(range(1000))
204
advanced = ffwd_iter(iterator, skip_to=500)
205
206
# First item should be 500
207
assert next(advanced) == 500
208
```
209
210
### Testing Patterns
211
212
**Unit Testing Dataflow Components:**
213
```python
214
import unittest
215
from bytewax.dataflow import Dataflow
216
from bytewax.testing import TestingSource, TestingSink, run_main
217
import bytewax.operators as op
218
219
class TestDataProcessing(unittest.TestCase):
220
221
def test_data_transformation(self):
222
"""Test basic data transformation logic."""
223
flow = Dataflow("transform_test")
224
225
# Input data
226
input_data = [
227
{"value": 10, "type": "A"},
228
{"value": 20, "type": "B"},
229
{"value": 30, "type": "A"}
230
]
231
232
# Build dataflow
233
stream = op.input("input", flow, TestingSource(input_data))
234
235
# Filter only type A
236
type_a = op.filter("filter_a", stream, lambda x: x["type"] == "A")
237
238
# Double the values
239
doubled = op.map("double", type_a, lambda x: {**x, "value": x["value"] * 2})
240
241
# Capture results
242
results = []
243
op.output("output", doubled, TestingSink(results))
244
245
# Execute and verify
246
run_main(flow)
247
248
expected = [
249
{"value": 20, "type": "A"},
250
{"value": 60, "type": "A"}
251
]
252
self.assertEqual(results, expected)
253
254
def test_stateful_processing(self):
255
"""Test stateful operators maintain state correctly."""
256
flow = Dataflow("stateful_test")
257
258
# Input events
259
events = [
260
("user1", 5),
261
("user2", 3),
262
("user1", 7),
263
("user2", 2),
264
("user1", 1)
265
]
266
267
stream = op.input("input", flow, TestingSource(events))
268
269
# Running sum per user
270
def running_sum(state, value):
271
new_state = (state or 0) + value
272
return new_state, new_state
273
274
sums = op.stateful_map("running_sum", stream, running_sum)
275
276
results = []
277
op.output("output", sums, TestingSink(results))
278
279
run_main(flow)
280
281
expected = [
282
("user1", 5), # 5
283
("user2", 3), # 3
284
("user1", 12), # 5 + 7
285
("user2", 5), # 3 + 2
286
("user1", 13) # 12 + 1
287
]
288
self.assertEqual(results, expected)
289
290
if __name__ == '__main__':
291
unittest.main()
292
```
293
294
**Integration Testing with Multiple Sources:**
295
```python
296
def test_multi_source_integration():
297
"""Test dataflow with multiple input sources."""
298
flow = Dataflow("multi_source_test")
299
300
# User events
301
user_events = [
302
{"user_id": "u1", "event": "login", "timestamp": 1000},
303
{"user_id": "u2", "event": "login", "timestamp": 1001},
304
{"user_id": "u1", "event": "purchase", "timestamp": 1002}
305
]
306
307
# User profiles
308
user_profiles = [
309
{"user_id": "u1", "name": "Alice", "tier": "premium"},
310
{"user_id": "u2", "name": "Bob", "tier": "basic"}
311
]
312
313
# Set up streams
314
events = op.input("events", flow, TestingSource(user_events))
315
profiles = op.input("profiles", flow, TestingSource(user_profiles))
316
317
# Key both streams
318
keyed_events = op.key_on("key_events", events, lambda e: e["user_id"])
319
keyed_profiles = op.key_on("key_profiles", profiles, lambda p: p["user_id"])
320
321
# Join streams
322
joined = op.join("join_user_data", keyed_events, keyed_profiles)
323
324
# Process joined data
325
def enrich_event(joined_data):
326
event, profile = joined_data
327
return {
328
**event,
329
"user_name": profile["name"],
330
"user_tier": profile["tier"]
331
}
332
333
enriched = op.map("enrich", joined, lambda x: enrich_event(x[1]))
334
335
results = []
336
op.output("output", enriched, TestingSink(results))
337
338
run_main(flow)
339
340
# Verify enriched events
341
assert len(results) >= 2 # At least login events should be enriched
342
assert all("user_name" in event for event in results)
343
assert all("user_tier" in event for event in results)
344
```
345
346
**Error Handling Tests:**
347
```python
348
def test_error_handling():
349
"""Test dataflow handles errors gracefully."""
350
flow = Dataflow("error_test")
351
352
# Data with some invalid items
353
mixed_data = [
354
{"value": 10},
355
{"invalid": "data"}, # Missing "value" key
356
{"value": 20},
357
None, # Invalid item
358
{"value": 30}
359
]
360
361
stream = op.input("input", flow, TestingSource(mixed_data))
362
363
# Safe processing with error handling
364
def safe_process(item):
365
if item is None or "value" not in item:
366
return None # Filter out invalid items
367
return item["value"] * 2
368
369
processed = op.filter_map("safe_process", stream, safe_process)
370
371
results = []
372
op.output("output", processed, TestingSink(results))
373
374
run_main(flow)
375
376
# Should only have valid processed items
377
expected = [20, 40, 60] # 10*2, 20*2, 30*2
378
assert results == expected
379
```
380
381
**Performance Testing:**
382
```python
383
import time
384
385
def test_throughput():
386
"""Test dataflow throughput with large dataset."""
387
flow = Dataflow("throughput_test")
388
389
# Generate large dataset
390
large_dataset = list(range(100000))
391
392
stream = op.input("input", flow, TestingSource(large_dataset))
393
processed = op.map("process", stream, lambda x: x * 2)
394
395
results = []
396
op.output("output", processed, TestingSink(results))
397
398
# Measure execution time
399
start_time = time.time()
400
run_main(flow)
401
end_time = time.time()
402
403
# Verify results and performance
404
assert len(results) == 100000
405
assert results[0] == 0
406
assert results[-1] == 199998
407
408
duration = end_time - start_time
409
throughput = len(results) / duration
410
411
print(f"Processed {len(results)} items in {duration:.2f}s ({throughput:.0f} items/sec)")
412
413
# Assert minimum throughput (adjust based on requirements)
414
assert throughput > 10000 # At least 10k items/sec
415
```