0
# Dataflow Construction
1
2
Core functionality for building and configuring stream processing dataflows. The Dataflow class serves as the main container for defining processing topologies, while Stream represents typed data flows between operators.
3
4
## Capabilities
5
6
### Dataflow Management
7
8
The primary container class that manages the overall processing graph and provides the execution context for all operators.
9
10
```python { .api }
11
class Dataflow:
12
def __init__(self, flow_id: str): ...
13
```
14
15
**Parameters:**
16
- `flow_id` (str): Unique identifier for the dataflow, used for logging and monitoring
17
18
**Usage Example:**
19
```python
20
from bytewax.dataflow import Dataflow
21
22
# Create a new dataflow
23
flow = Dataflow("my_processing_pipeline")
24
```
25
26
### Stream Representation
27
28
Represents a typed stream of data flowing between operators in the dataflow graph.
29
30
```python { .api }
31
class Stream[X]:
32
def __init__(self, id: str, scope): ...
33
```
34
35
**Type Parameters:**
36
- `X`: The type of items flowing through the stream
37
38
**Parameters:**
39
- `id` (str): Internal identifier for the stream
40
- `scope`: Internal scope management object
41
42
**Note:** Stream objects are typically created by operators rather than instantiated directly.
43
44
### Keyed Streams
45
46
A specialized stream type for stateful operations that require data to be partitioned by key.
47
48
```python { .api }
49
KeyedStream = Stream[Tuple[str, V]]
50
```
51
52
**Type Definition:**
53
- `KeyedStream[V]`: A stream of (key, value) tuples where keys are strings and values are of type V
54
55
**Usage Pattern:**
56
```python
57
import bytewax.operators as op
58
59
# Convert a regular stream to a keyed stream
60
keyed_stream = op.key_on("add_keys", stream, lambda item: item.user_id)
61
62
# Now can use stateful operators
63
aggregated = op.reduce_final("sum", keyed_stream, lambda acc, val: acc + val.amount)
64
```
65
66
### Operator Decorator
67
68
Decorator for creating custom operators that integrate with the dataflow system.
69
70
```python { .api }
71
def operator(_core: bool = False): ...
72
```
73
74
**Parameters:**
75
- `_core` (bool): Internal flag indicating core operators (default: False)
76
77
**Usage Example:**
78
```python
79
from bytewax.dataflow import operator
80
81
@operator
82
def my_custom_operator(step_id: str, up: Stream[int]) -> Stream[str]:
83
"""Convert integers to strings with custom formatting."""
84
return op.map("format", up, lambda x: f"Value: {x}")
85
```
86
87
### Type Variables
88
89
Common type variables used throughout the dataflow system for generic type annotations.
90
91
```python { .api }
92
P = ParamSpec("P") # Signature of an operator function
93
R = TypeVar("R") # Return type of an operator function
94
N = TypeVar("N") # Type of name of each stream
95
X_co = TypeVar("X_co", covariant=True) # Type contained within a Stream
96
F = TypeVar("F", bound=Callable[..., Any]) # Function type bound
97
```
98
99
### Utility Functions
100
101
Helper functions for working with dataflow components.
102
103
```python { .api }
104
def f_repr(func: Callable) -> str: ...
105
```
106
107
**Parameters:**
108
- `func` (Callable): Function to get string representation of
109
110
**Returns:**
111
- `str`: Human-readable representation of the function
112
113
**Usage:** Used internally for error messages and debugging output.