or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddataflow.mdindex.mdoperators.mdrecovery.mdruntime.mdsinks.mdsources.mdstateful.mdtesting.mdtracing.mdwindowing.md

dataflow.mddocs/

0

# Dataflow Construction

1

2

Core functionality for building and configuring stream processing dataflows. The Dataflow class serves as the main container for defining processing topologies, while Stream represents typed data flows between operators.

3

4

## Capabilities

5

6

### Dataflow Management

7

8

The primary container class that manages the overall processing graph and provides the execution context for all operators.

9

10

```python { .api }

11

class Dataflow:

12

def __init__(self, flow_id: str): ...

13

```

14

15

**Parameters:**

16

- `flow_id` (str): Unique identifier for the dataflow, used for logging and monitoring

17

18

**Usage Example:**

19

```python

20

from bytewax.dataflow import Dataflow

21

22

# Create a new dataflow

23

flow = Dataflow("my_processing_pipeline")

24

```

25

26

### Stream Representation

27

28

Represents a typed stream of data flowing between operators in the dataflow graph.

29

30

```python { .api }

31

class Stream[X]:

32

def __init__(self, id: str, scope): ...

33

```

34

35

**Type Parameters:**

36

- `X`: The type of items flowing through the stream

37

38

**Parameters:**

39

- `id` (str): Internal identifier for the stream

40

- `scope`: Internal scope management object

41

42

**Note:** Stream objects are typically created by operators rather than instantiated directly.

43

44

### Keyed Streams

45

46

A specialized stream type for stateful operations that require data to be partitioned by key.

47

48

```python { .api }

49

KeyedStream = Stream[Tuple[str, V]]

50

```

51

52

**Type Definition:**

53

- `KeyedStream[V]`: A stream of (key, value) tuples where keys are strings and values are of type V

54

55

**Usage Pattern:**

56

```python

57

import bytewax.operators as op

58

59

# Convert a regular stream to a keyed stream

60

keyed_stream = op.key_on("add_keys", stream, lambda item: item.user_id)

61

62

# Now can use stateful operators

63

aggregated = op.reduce_final("sum", keyed_stream, lambda acc, val: acc + val.amount)

64

```

65

66

### Operator Decorator

67

68

Decorator for creating custom operators that integrate with the dataflow system.

69

70

```python { .api }

71

def operator(_core: bool = False): ...

72

```

73

74

**Parameters:**

75

- `_core` (bool): Internal flag indicating core operators (default: False)

76

77

**Usage Example:**

78

```python

79

from bytewax.dataflow import operator

80

81

@operator

82

def my_custom_operator(step_id: str, up: Stream[int]) -> Stream[str]:

83

"""Convert integers to strings with custom formatting."""

84

return op.map("format", up, lambda x: f"Value: {x}")

85

```

86

87

### Type Variables

88

89

Common type variables used throughout the dataflow system for generic type annotations.

90

91

```python { .api }

92

P = ParamSpec("P") # Signature of an operator function

93

R = TypeVar("R") # Return type of an operator function

94

N = TypeVar("N") # Type of name of each stream

95

X_co = TypeVar("X_co", covariant=True) # Type contained within a Stream

96

F = TypeVar("F", bound=Callable[..., Any]) # Function type bound

97

```

98

99

### Utility Functions

100

101

Helper functions for working with dataflow components.

102

103

```python { .api }

104

def f_repr(func: Callable) -> str: ...

105

```

106

107

**Parameters:**

108

- `func` (Callable): Function to get string representation of

109

110

**Returns:**

111

- `str`: Human-readable representation of the function

112

113

**Usage:** Used internally for error messages and debugging output.