Python Stateful Stream Processing Framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.
Functions for running dataflows in test environments with deterministic behavior.
def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...
def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...Parameters same as runtime functions but designed for testing:
Usage Examples:
from bytewax.testing import run_main
from bytewax.dataflow import Dataflow
import bytewax.operators as op
def test_simple_dataflow():
flow = Dataflow("test_flow")
# Build test dataflow
data = op.input("input", flow, TestingSource([1, 2, 3, 4, 5]))
doubled = op.map("double", data, lambda x: x * 2)
# Capture output
captured = []
op.output("output", doubled, TestingSink(captured))
# Run and verify
run_main(flow)
assert captured == [2, 4, 6, 8, 10]Sources that provide deterministic, controllable data for testing.
class TestingSource:
def __init__(self, data: Iterable[X]): ...Parameters:
data (Iterable[X]): Static data to emit in orderUsage Examples:
from bytewax.testing import TestingSource
# Simple list data
simple_source = TestingSource([1, 2, 3, 4, 5])
# Complex objects
events = [
{"user": "alice", "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
{"user": "bob", "action": "purchase", "timestamp": "2023-01-01T10:05:00Z"},
{"user": "alice", "action": "logout", "timestamp": "2023-01-01T10:30:00Z"}
]
event_source = TestingSource(events)
# Generator data
def generate_data():
for i in range(100):
yield {"id": i, "value": i * i}
generator_source = TestingSource(generate_data())Sinks that capture output data for test assertions.
class TestingSink:
def __init__(self, capture_list: Optional[List] = None): ...
def captured(self) -> List: ...Parameters:
capture_list (List): Optional external list to append captured items toUsage Examples:
from bytewax.testing import TestingSink
# Capture to external list
results = []
sink = TestingSink(results)
# Use in dataflow
op.output("test_output", processed_stream, sink)
# Run and assert
run_main(flow)
assert len(results) == 10
assert all(isinstance(item, dict) for item in results)
# Capture to internal list
sink = TestingSink()
op.output("test_output", processed_stream, sink)
run_main(flow)
captured_data = sink.captured()
assert captured_data[0]["processed"] is TrueTools for controlling time in tests, enabling deterministic testing of time-based operations.
class TimeTestingGetter:
def __init__(self, now: datetime): ...
def advance(self, td: timedelta) -> None: ...
def get(self) -> datetime: ...Methods:
advance(td): Move the current time forward by the given durationget(): Get the current mock timeUsage Examples:
from bytewax.testing import TimeTestingGetter
from datetime import datetime, timedelta
import bytewax.operators.windowing as win
def test_windowing_with_mock_time():
# Create mock time starting at a known point
mock_time = TimeTestingGetter(datetime(2023, 1, 1, 12, 0, 0))
flow = Dataflow("time_test")
# Create events with timestamps
events = [
{"value": 1, "time": datetime(2023, 1, 1, 12, 0, 0)},
{"value": 2, "time": datetime(2023, 1, 1, 12, 0, 30)},
{"value": 3, "time": datetime(2023, 1, 1, 12, 1, 0)},
]
stream = op.input("events", flow, TestingSource(events))
keyed = op.key_on("key", stream, lambda x: "all")
# Use mock time in windowing
windowed = win.collect_window(
"windows",
keyed,
win.EventClock(lambda e: e["time"]),
win.TumblingWindower(timedelta(minutes=1))
)
results = []
op.output("output", windowed, TestingSink(results))
run_main(flow)
# Verify window contents
assert len(results) == 2 # Two windows
assert len(results[0][1]) == 2 # First window has 2 events
assert len(results[1][1]) == 1 # Second window has 1 eventUtilities for testing custom sources and controlling their behavior.
def poll_next_batch(source_part: StatefulSourcePartition, timeout: timedelta = timedelta(seconds=1)): ...
def ffwd_iter(iterator: Iterator[X], skip_to: int) -> Iterator[X]: ...poll_next_batch Parameters:
source_part (StatefulSourcePartition): Source partition to polltimeout (timedelta): Maximum time to wait for dataffwd_iter Parameters:
iterator (Iterator): Iterator to fast-forwardskip_to (int): Number of items to skipUsage Examples:
from bytewax.testing import poll_next_batch, ffwd_iter
def test_custom_source():
# Test a custom source partition
partition = MyCustomSourcePartition(test_data)
# Poll for first batch
batch1 = poll_next_batch(partition, timeout=timedelta(seconds=5))
assert len(batch1) > 0
# Advance to specific position
iterator = iter(range(1000))
advanced = ffwd_iter(iterator, skip_to=500)
# First item should be 500
assert next(advanced) == 500Unit Testing Dataflow Components:
import unittest
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource, TestingSink, run_main
import bytewax.operators as op
class TestDataProcessing(unittest.TestCase):
def test_data_transformation(self):
"""Test basic data transformation logic."""
flow = Dataflow("transform_test")
# Input data
input_data = [
{"value": 10, "type": "A"},
{"value": 20, "type": "B"},
{"value": 30, "type": "A"}
]
# Build dataflow
stream = op.input("input", flow, TestingSource(input_data))
# Filter only type A
type_a = op.filter("filter_a", stream, lambda x: x["type"] == "A")
# Double the values
doubled = op.map("double", type_a, lambda x: {**x, "value": x["value"] * 2})
# Capture results
results = []
op.output("output", doubled, TestingSink(results))
# Execute and verify
run_main(flow)
expected = [
{"value": 20, "type": "A"},
{"value": 60, "type": "A"}
]
self.assertEqual(results, expected)
def test_stateful_processing(self):
"""Test stateful operators maintain state correctly."""
flow = Dataflow("stateful_test")
# Input events
events = [
("user1", 5),
("user2", 3),
("user1", 7),
("user2", 2),
("user1", 1)
]
stream = op.input("input", flow, TestingSource(events))
# Running sum per user
def running_sum(state, value):
new_state = (state or 0) + value
return new_state, new_state
sums = op.stateful_map("running_sum", stream, running_sum)
results = []
op.output("output", sums, TestingSink(results))
run_main(flow)
expected = [
("user1", 5), # 5
("user2", 3), # 3
("user1", 12), # 5 + 7
("user2", 5), # 3 + 2
("user1", 13) # 12 + 1
]
self.assertEqual(results, expected)
if __name__ == '__main__':
unittest.main()Integration Testing with Multiple Sources:
def test_multi_source_integration():
"""Test dataflow with multiple input sources."""
flow = Dataflow("multi_source_test")
# User events
user_events = [
{"user_id": "u1", "event": "login", "timestamp": 1000},
{"user_id": "u2", "event": "login", "timestamp": 1001},
{"user_id": "u1", "event": "purchase", "timestamp": 1002}
]
# User profiles
user_profiles = [
{"user_id": "u1", "name": "Alice", "tier": "premium"},
{"user_id": "u2", "name": "Bob", "tier": "basic"}
]
# Set up streams
events = op.input("events", flow, TestingSource(user_events))
profiles = op.input("profiles", flow, TestingSource(user_profiles))
# Key both streams
keyed_events = op.key_on("key_events", events, lambda e: e["user_id"])
keyed_profiles = op.key_on("key_profiles", profiles, lambda p: p["user_id"])
# Join streams
joined = op.join("join_user_data", keyed_events, keyed_profiles)
# Process joined data
def enrich_event(joined_data):
event, profile = joined_data
return {
**event,
"user_name": profile["name"],
"user_tier": profile["tier"]
}
enriched = op.map("enrich", joined, lambda x: enrich_event(x[1]))
results = []
op.output("output", enriched, TestingSink(results))
run_main(flow)
# Verify enriched events
assert len(results) >= 2 # At least login events should be enriched
assert all("user_name" in event for event in results)
assert all("user_tier" in event for event in results)Error Handling Tests:
def test_error_handling():
"""Test dataflow handles errors gracefully."""
flow = Dataflow("error_test")
# Data with some invalid items
mixed_data = [
{"value": 10},
{"invalid": "data"}, # Missing "value" key
{"value": 20},
None, # Invalid item
{"value": 30}
]
stream = op.input("input", flow, TestingSource(mixed_data))
# Safe processing with error handling
def safe_process(item):
if item is None or "value" not in item:
return None # Filter out invalid items
return item["value"] * 2
processed = op.filter_map("safe_process", stream, safe_process)
results = []
op.output("output", processed, TestingSink(results))
run_main(flow)
# Should only have valid processed items
expected = [20, 40, 60] # 10*2, 20*2, 30*2
assert results == expectedPerformance Testing:
import time
def test_throughput():
"""Test dataflow throughput with large dataset."""
flow = Dataflow("throughput_test")
# Generate large dataset
large_dataset = list(range(100000))
stream = op.input("input", flow, TestingSource(large_dataset))
processed = op.map("process", stream, lambda x: x * 2)
results = []
op.output("output", processed, TestingSink(results))
# Measure execution time
start_time = time.time()
run_main(flow)
end_time = time.time()
# Verify results and performance
assert len(results) == 100000
assert results[0] == 0
assert results[-1] == 199998
duration = end_time - start_time
throughput = len(results) / duration
print(f"Processed {len(results)} items in {duration:.2f}s ({throughput:.0f} items/sec)")
# Assert minimum throughput (adjust based on requirements)
assert throughput > 10000 # At least 10k items/secInstall with Tessl CLI
npx tessl i tessl/pypi-bytewax