CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-bytewax

Python Stateful Stream Processing Framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

operators.mddocs/

Stream Processing Operators

Essential operators for transforming, filtering, and routing data through processing pipelines. These operators provide the building blocks for most stream processing use cases and form the core vocabulary of Bytewax dataflows.

Capabilities

Input/Output Operations

Operators for introducing data into dataflows and writing results to external systems.

def input(step_id: str, flow: Dataflow, source: Source[X]) -> Stream[X]: ...

def output(step_id: str, up: Stream[X], sink: Sink[X]) -> None: ...

input Parameters:

  • step_id (str): Unique identifier for this operator step
  • flow (Dataflow): The dataflow to add input to
  • source (Source[X]): Source to read items from

output Parameters:

  • step_id (str): Unique identifier for this operator step
  • up (Stream[X]): Stream of items to write
  • sink (Sink[X]): Sink to write items to

Usage Example:

from bytewax.testing import TestingSource
from bytewax.connectors.stdio import StdOutSink

# Add input to dataflow
stream = op.input("data_input", flow, TestingSource([1, 2, 3]))

# Output results
op.output("results", processed_stream, StdOutSink())

Transformation Operations

Core operators for transforming data items one-by-one or one-to-many.

def map(step_id: str, up: Stream[X], mapper: Callable[[X], Y]) -> Stream[Y]: ...

def map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], W]) -> KeyedStream[W]: ...

def flat_map(step_id: str, up: Stream[X], mapper: Callable[[X], Iterable[Y]]) -> Stream[Y]: ...

def flat_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Iterable[W]]) -> KeyedStream[W]: ...

def flat_map_batch(step_id: str, up: Stream[X], mapper: Callable[[List[X]], Iterable[Y]]) -> Stream[Y]: ...

map Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • mapper (Callable[[X], Y]): Function to transform each item

flat_map Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • mapper (Callable[[X], Iterable[Y]]): Function returning iterable of items

Usage Examples:

# Transform each item
doubled = op.map("double", numbers, lambda x: x * 2)

# Transform values in keyed stream
processed = op.map_value("process", keyed_stream, lambda val: val.upper())

# One-to-many transformation
words = op.flat_map("split", sentences, lambda s: s.split())

# Batch processing for efficiency
processed = op.flat_map_batch("batch_process", stream, lambda batch: [expensive_transform(x) for x in batch])

Filtering Operations

Operators for selectively keeping or transforming items based on predicates.

def filter(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> Stream[X]: ...

def filter_value(step_id: str, up: KeyedStream[V], predicate: Callable[[V], bool]) -> KeyedStream[V]: ...

def filter_map(step_id: str, up: Stream[X], mapper: Callable[[X], Optional[Y]]) -> Stream[Y]: ...

def filter_map_value(step_id: str, up: KeyedStream[V], mapper: Callable[[V], Optional[W]]) -> KeyedStream[W]: ...

filter Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • predicate (Callable[[X], bool]): Function returning True to keep item

filter_map Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • mapper (Callable[[X], Optional[Y]]): Function returning None to filter out, value to keep

Usage Examples:

# Keep only even numbers
evens = op.filter("evens", numbers, lambda x: x % 2 == 0)

# Filter and transform in one step
valid_data = op.filter_map("validate", raw_data, lambda x: x.value if x.is_valid else None)

Stream Management Operations

Operators for splitting, combining, and redistributing streams.

def branch(step_id: str, up: Stream[X], predicate: Callable[[X], bool]) -> BranchOut[X, X]: ...

def merge(step_id: str, *ups: Stream[Any]) -> Stream[Any]: ...

def redistribute(step_id: str, up: Stream[X]) -> Stream[X]: ...

class BranchOut[X, Y]:
    trues: Stream[X]
    falses: Stream[Y]

branch Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream to split
  • predicate (Callable[[X], bool]): Function to determine which branch

merge Parameters:

  • step_id (str): Unique identifier
  • *ups (Stream[Any]): Multiple streams to combine

Usage Examples:

# Split stream based on condition
branches = op.branch("split_data", stream, lambda x: x.priority == "high")
high_priority = branches.trues
low_priority = branches.falses

# Combine multiple streams
combined = op.merge("combine", stream1, stream2, stream3)

# Redistribute for better parallelization
redistributed = op.redistribute("rebalance", stream)

Key Management Operations

Operators for working with keyed streams required for stateful operations.

def key_on(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[X]: ...

def key_rm(step_id: str, up: KeyedStream[X]) -> Stream[X]: ...

key_on Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • key (Callable[[X], str]): Function to extract key from each item

key_rm Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[X]): Keyed stream to remove keys from

Usage Examples:

# Add keys for stateful operations
keyed = op.key_on("by_user", events, lambda e: e.user_id)

# Remove keys when no longer needed
unkeyed = op.key_rm("remove_keys", keyed_results)

Flattening Operations

Operators for working with nested data structures.

def flatten(step_id: str, up: Stream[Iterable[X]]) -> Stream[X]: ...

Parameters:

  • step_id (str): Unique identifier
  • up (Stream[Iterable[X]]): Stream of iterables to flatten

Usage Example:

# Flatten nested lists
flattened = op.flatten("flatten", stream_of_lists)

Utility Operations

Operators for debugging, inspection, and error handling.

def inspect(step_id: str, up: Stream[X], inspector: Callable[[str, X], None] = _default_inspector) -> Stream[X]: ...

def inspect_debug(step_id: str, up: Stream[X], inspector: Callable[[str, X, int, int], None] = _default_debug_inspector) -> Stream[X]: ...

def raises(step_id: str, up: Stream[Any]) -> None: ...

inspect Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Stream to observe
  • inspector (Callable): Function called for each item (default prints to stdout)

inspect_debug Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Stream to observe
  • inspector (Callable): Function called with item, epoch, and worker info

Usage Examples:

# Debug stream contents
debugged = op.inspect("debug_point", stream)

# Detailed debugging with worker info
detailed = op.inspect_debug("detailed_debug", stream)

# Crash on any item (for testing)
op.raises("should_be_empty", error_stream)

Enrichment Operations

Operators for joining streams with external data sources.

def enrich_cached(step_id: str, up: Stream[X], getter: Callable[[DK], DV], mapper: Callable[[TTLCache[DK, DV], X], Y], ttl: timedelta = timedelta.max, _now_getter: Callable[[], datetime] = _get_system_utc) -> Stream[Y]: ...

class TTLCache[DK, DV]:
    def __init__(self, v_getter: Callable[[DK], DV], now_getter: Callable[[], datetime], ttl: timedelta): ...
    def get(self, k: DK) -> DV: ...
    def remove(self, k: DK) -> None: ...

enrich_cached Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Input stream
  • getter (Callable[[DK], DV]): Function to fetch data for cache misses
  • mapper (Callable[[TTLCache, X], Y]): Function to enrich each item using cache
  • ttl (timedelta): Time-to-live for cached values

Usage Example:

def lookup_user_data(user_id):
    # Fetch from database or API
    return user_database.get(user_id)

def enrich_event(cache, event):
    user_data = cache.get(event.user_id)
    return {**event, "user_name": user_data.name}

enriched = op.enrich_cached("enrich_users", events, lookup_user_data, enrich_event, ttl=timedelta(minutes=5))

Final Aggregation Operations

Operators for performing final aggregations on finite streams.

def count_final(step_id: str, up: Stream[X], key: Callable[[X], str]) -> KeyedStream[int]: ...

def max_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

def min_final(step_id: str, up: KeyedStream[V], by: Callable[[V], Any] = _identity) -> KeyedStream[V]: ...

def reduce_final(step_id: str, up: KeyedStream[V], reducer: Callable[[V, V], V]) -> KeyedStream[V]: ...

def fold_final(step_id: str, up: KeyedStream[V], builder: Callable[[], S], folder: Callable[[S, V], S]) -> KeyedStream[S]: ...

count_final Parameters:

  • step_id (str): Unique identifier
  • up (Stream[X]): Stream of items to count
  • key (Callable[[X], str]): Function to convert items to count keys

reduce_final Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Keyed stream to reduce
  • reducer (Callable[[V, V], V]): Function to combine two values

fold_final Parameters:

  • step_id (str): Unique identifier
  • up (KeyedStream[V]): Keyed stream to fold
  • builder (Callable[[], S]): Function to build initial accumulator
  • folder (Callable[[S, V], S]): Function to combine accumulator with value

Usage Examples:

# Count occurrences by key
counts = op.count_final("count_words", words, lambda w: w)

# Find maximum value per key
max_values = op.max_final("find_max", keyed_numbers)

# Find minimum with custom comparison
min_by_priority = op.min_final("min_priority", tasks, lambda t: t.priority)

# Sum all values per key
sums = op.reduce_final("sum", keyed_numbers, lambda a, b: a + b)

# Build custom accumulator
def build_list():
    return []

def add_to_list(lst, item):
    return lst + [item]

lists = op.fold_final("collect", keyed_items, build_list, add_to_list)

Install with Tessl CLI

npx tessl i tessl/pypi-bytewax

docs

connectors.md

dataflow.md

index.md

operators.md

recovery.md

runtime.md

sinks.md

sources.md

stateful.md

testing.md

tracing.md

windowing.md

tile.json