CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

testing.mddocs/

Testing Utilities

Tools and utilities for testing dataflows including test sources, sinks, and execution helpers. Provides deterministic testing environments and data capture utilities for validating stream processing logic.

Capabilities

Test Execution Functions

Functions for running dataflows in test environments with deterministic behavior.

def run_main(flow: Dataflow, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None): ...

def cluster_main(flow: Dataflow, addresses: List[str], proc_id: int, epoch_interval: Optional[timedelta] = None, recovery_config: Optional[RecoveryConfig] = None, worker_count_per_proc: int = 1): ...

Parameters same as runtime functions but designed for testing:

  • Single-threaded execution for predictable ordering
  • Synchronous operation for test assertions
  • No background processes or networking complexity

Usage Examples:

from bytewax.testing import run_main
from bytewax.dataflow import Dataflow
import bytewax.operators as op

def test_simple_dataflow():
    flow = Dataflow("test_flow")
    
    # Build test dataflow
    data = op.input("input", flow, TestingSource([1, 2, 3, 4, 5]))
    doubled = op.map("double", data, lambda x: x * 2)
    
    # Capture output
    captured = []
    op.output("output", doubled, TestingSink(captured))
    
    # Run and verify
    run_main(flow)
    assert captured == [2, 4, 6, 8, 10]

Test Sources

Sources that provide deterministic, controllable data for testing.

class TestingSource:
    def __init__(self, data: Iterable[X]): ...

Parameters:

  • data (Iterable[X]): Static data to emit in order

Usage Examples:

from bytewax.testing import TestingSource

# Simple list data
simple_source = TestingSource([1, 2, 3, 4, 5])

# Complex objects
events = [
    {"user": "alice", "action": "login", "timestamp": "2023-01-01T10:00:00Z"},
    {"user": "bob", "action": "purchase", "timestamp": "2023-01-01T10:05:00Z"},
    {"user": "alice", "action": "logout", "timestamp": "2023-01-01T10:30:00Z"}
]
event_source = TestingSource(events)

# Generator data
def generate_data():
    for i in range(100):
        yield {"id": i, "value": i * i}

generator_source = TestingSource(generate_data())

Test Sinks

Sinks that capture output data for test assertions.

class TestingSink:
    def __init__(self, capture_list: Optional[List] = None): ...
    def captured(self) -> List: ...

Parameters:

  • capture_list (List): Optional external list to append captured items to

Usage Examples:

from bytewax.testing import TestingSink

# Capture to external list
results = []
sink = TestingSink(results)

# Use in dataflow
op.output("test_output", processed_stream, sink)

# Run and assert
run_main(flow)
assert len(results) == 10
assert all(isinstance(item, dict) for item in results)

# Capture to internal list
sink = TestingSink()
op.output("test_output", processed_stream, sink)
run_main(flow)

captured_data = sink.captured()
assert captured_data[0]["processed"] is True

Time Testing Utilities

Tools for controlling time in tests, enabling deterministic testing of time-based operations.

class TimeTestingGetter:
    def __init__(self, now: datetime): ...
    def advance(self, td: timedelta) -> None: ...
    def get(self) -> datetime: ...

Methods:

  • advance(td): Move the current time forward by the given duration
  • get(): Get the current mock time

Usage Examples:

from bytewax.testing import TimeTestingGetter
from datetime import datetime, timedelta
import bytewax.operators.windowing as win

def test_windowing_with_mock_time():
    # Create mock time starting at a known point
    mock_time = TimeTestingGetter(datetime(2023, 1, 1, 12, 0, 0))
    
    flow = Dataflow("time_test")
    
    # Create events with timestamps
    events = [
        {"value": 1, "time": datetime(2023, 1, 1, 12, 0, 0)},
        {"value": 2, "time": datetime(2023, 1, 1, 12, 0, 30)},
        {"value": 3, "time": datetime(2023, 1, 1, 12, 1, 0)},
    ]
    
    stream = op.input("events", flow, TestingSource(events))
    keyed = op.key_on("key", stream, lambda x: "all")
    
    # Use mock time in windowing
    windowed = win.collect_window(
        "windows",
        keyed,
        win.EventClock(lambda e: e["time"]),
        win.TumblingWindower(timedelta(minutes=1))
    )
    
    results = []
    op.output("output", windowed, TestingSink(results))
    
    run_main(flow)
    
    # Verify window contents
    assert len(results) == 2  # Two windows
    assert len(results[0][1]) == 2  # First window has 2 events
    assert len(results[1][1]) == 1  # Second window has 1 event

Source Testing Utilities

Utilities for testing custom sources and controlling their behavior.

def poll_next_batch(source_part: StatefulSourcePartition, timeout: timedelta = timedelta(seconds=1)): ...

def ffwd_iter(iterator: Iterator[X], skip_to: int) -> Iterator[X]: ...

poll_next_batch Parameters:

  • source_part (StatefulSourcePartition): Source partition to poll
  • timeout (timedelta): Maximum time to wait for data

ffwd_iter Parameters:

  • iterator (Iterator): Iterator to fast-forward
  • skip_to (int): Number of items to skip

Usage Examples:

from bytewax.testing import poll_next_batch, ffwd_iter

def test_custom_source():
    # Test a custom source partition
    partition = MyCustomSourcePartition(test_data)
    
    # Poll for first batch
    batch1 = poll_next_batch(partition, timeout=timedelta(seconds=5))
    assert len(batch1) > 0
    
    # Advance to specific position
    iterator = iter(range(1000))
    advanced = ffwd_iter(iterator, skip_to=500)
    
    # First item should be 500
    assert next(advanced) == 500

Testing Patterns

Unit Testing Dataflow Components:

import unittest
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource, TestingSink, run_main
import bytewax.operators as op

class TestDataProcessing(unittest.TestCase):
    
    def test_data_transformation(self):
        """Test basic data transformation logic."""
        flow = Dataflow("transform_test")
        
        # Input data
        input_data = [
            {"value": 10, "type": "A"},
            {"value": 20, "type": "B"},
            {"value": 30, "type": "A"}
        ]
        
        # Build dataflow
        stream = op.input("input", flow, TestingSource(input_data))
        
        # Filter only type A
        type_a = op.filter("filter_a", stream, lambda x: x["type"] == "A")
        
        # Double the values
        doubled = op.map("double", type_a, lambda x: {**x, "value": x["value"] * 2})
        
        # Capture results
        results = []
        op.output("output", doubled, TestingSink(results))
        
        # Execute and verify
        run_main(flow)
        
        expected = [
            {"value": 20, "type": "A"},
            {"value": 60, "type": "A"}
        ]
        self.assertEqual(results, expected)
    
    def test_stateful_processing(self):
        """Test stateful operators maintain state correctly."""
        flow = Dataflow("stateful_test")
        
        # Input events
        events = [
            ("user1", 5),
            ("user2", 3),
            ("user1", 7),
            ("user2", 2),
            ("user1", 1)
        ]
        
        stream = op.input("input", flow, TestingSource(events))
        
        # Running sum per user
        def running_sum(state, value):
            new_state = (state or 0) + value
            return new_state, new_state
        
        sums = op.stateful_map("running_sum", stream, running_sum)
        
        results = []
        op.output("output", sums, TestingSink(results))
        
        run_main(flow)
        
        expected = [
            ("user1", 5),   # 5
            ("user2", 3),   # 3  
            ("user1", 12),  # 5 + 7
            ("user2", 5),   # 3 + 2
            ("user1", 13)   # 12 + 1
        ]
        self.assertEqual(results, expected)

if __name__ == '__main__':
    unittest.main()

Integration Testing with Multiple Sources:

def test_multi_source_integration():
    """Test dataflow with multiple input sources."""
    flow = Dataflow("multi_source_test")
    
    # User events
    user_events = [
        {"user_id": "u1", "event": "login", "timestamp": 1000},
        {"user_id": "u2", "event": "login", "timestamp": 1001},
        {"user_id": "u1", "event": "purchase", "timestamp": 1002}
    ]
    
    # User profiles  
    user_profiles = [
        {"user_id": "u1", "name": "Alice", "tier": "premium"},
        {"user_id": "u2", "name": "Bob", "tier": "basic"}
    ]
    
    # Set up streams
    events = op.input("events", flow, TestingSource(user_events))
    profiles = op.input("profiles", flow, TestingSource(user_profiles))
    
    # Key both streams
    keyed_events = op.key_on("key_events", events, lambda e: e["user_id"])
    keyed_profiles = op.key_on("key_profiles", profiles, lambda p: p["user_id"])
    
    # Join streams
    joined = op.join("join_user_data", keyed_events, keyed_profiles)
    
    # Process joined data
    def enrich_event(joined_data):
        event, profile = joined_data
        return {
            **event,
            "user_name": profile["name"],
            "user_tier": profile["tier"]
        }
    
    enriched = op.map("enrich", joined, lambda x: enrich_event(x[1]))
    
    results = []
    op.output("output", enriched, TestingSink(results))
    
    run_main(flow)
    
    # Verify enriched events
    assert len(results) >= 2  # At least login events should be enriched
    assert all("user_name" in event for event in results)
    assert all("user_tier" in event for event in results)

Error Handling Tests:

def test_error_handling():
    """Test dataflow handles errors gracefully."""
    flow = Dataflow("error_test")
    
    # Data with some invalid items
    mixed_data = [
        {"value": 10},
        {"invalid": "data"},  # Missing "value" key
        {"value": 20},
        None,  # Invalid item
        {"value": 30}
    ]
    
    stream = op.input("input", flow, TestingSource(mixed_data))
    
    # Safe processing with error handling
    def safe_process(item):
        if item is None or "value" not in item:
            return None  # Filter out invalid items
        return item["value"] * 2
    
    processed = op.filter_map("safe_process", stream, safe_process)
    
    results = []
    op.output("output", processed, TestingSink(results))
    
    run_main(flow)
    
    # Should only have valid processed items
    expected = [20, 40, 60]  # 10*2, 20*2, 30*2
    assert results == expected

Performance Testing:

import time

def test_throughput():
    """Test dataflow throughput with large dataset."""
    flow = Dataflow("throughput_test")
    
    # Generate large dataset
    large_dataset = list(range(100000))
    
    stream = op.input("input", flow, TestingSource(large_dataset))
    processed = op.map("process", stream, lambda x: x * 2)
    
    results = []
    op.output("output", processed, TestingSink(results))
    
    # Measure execution time
    start_time = time.time()
    run_main(flow)
    end_time = time.time()
    
    # Verify results and performance
    assert len(results) == 100000
    assert results[0] == 0
    assert results[-1] == 199998
    
    duration = end_time - start_time
    throughput = len(results) / duration
    
    print(f"Processed {len(results)} items in {duration:.2f}s ({throughput:.0f} items/sec)")
    
    # Assert minimum throughput (adjust based on requirements)
    assert throughput > 10000  # At least 10k items/sec

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json