0
# Stream Operators
1
2
Stream operators are the internal execution components that implement the actual stream processing logic in Flink. They handle element processing, state management, and coordination with the runtime system.
3
4
## Core Operator Interfaces
5
6
### StreamOperator<OUT>
7
8
Base interface for all streaming operators.
9
10
```java { .api }
11
public interface StreamOperator<OUT> extends Serializable {
12
// Lifecycle methods
13
void setup(Output<StreamRecord<OUT>> output, RuntimeContext runtimeContext);
14
void open(Configuration parameters) throws Exception;
15
void close() throws Exception;
16
17
// Configuration
18
void setChainingStrategy(ChainingStrategy strategy);
19
ChainingStrategy getChainingStrategy();
20
}
21
```
22
23
### OneInputStreamOperator<IN, OUT>
24
25
Interface for operators that process a single input stream.
26
27
```java { .api }
28
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
29
void processElement(StreamRecord<IN> element) throws Exception;
30
void processWatermark(Watermark mark) throws Exception;
31
}
32
```
33
34
### TwoInputStreamOperator<IN1, IN2, OUT>
35
36
Interface for operators that process two input streams (e.g., for connected streams).
37
38
```java { .api }
39
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
40
void processElement1(StreamRecord<IN1> element) throws Exception;
41
void processElement2(StreamRecord<IN2> element) throws Exception;
42
void processWatermark1(Watermark mark) throws Exception;
43
void processWatermark2(Watermark mark) throws Exception;
44
}
45
```
46
47
## Function-based Operators
48
49
### StreamMap<IN, OUT>
50
51
Operator that applies a MapFunction to each element.
52
53
```java { .api }
54
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
55
implements OneInputStreamOperator<IN, OUT> {
56
57
public StreamMap(MapFunction<IN, OUT> mapper);
58
public void processElement(StreamRecord<IN> element) throws Exception;
59
}
60
```
61
62
### StreamFlatMap<IN, OUT>
63
64
Operator that applies a FlatMapFunction to each element.
65
66
```java { .api }
67
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
68
implements OneInputStreamOperator<IN, OUT> {
69
70
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper);
71
public void processElement(StreamRecord<IN> element) throws Exception;
72
}
73
```
74
75
### StreamFilter<T>
76
77
Operator that filters elements based on a FilterFunction.
78
79
```java { .api }
80
public class StreamFilter<T> extends AbstractUdfStreamOperator<T, FilterFunction<T>>
81
implements OneInputStreamOperator<T, T> {
82
83
public StreamFilter(FilterFunction<T> filter);
84
public void processElement(StreamRecord<T> element) throws Exception;
85
}
86
```
87
88
## Keyed Stream Operators
89
90
### StreamGroupedReduce<T>
91
92
Operator for keyed reduce operations.
93
94
```java { .api }
95
public class StreamGroupedReduce<T> extends AbstractUdfStreamOperator<T, ReduceFunction<T>>
96
implements OneInputStreamOperator<T, T> {
97
98
public StreamGroupedReduce(ReduceFunction<T> reducer, TypeSerializer<T> serializer);
99
public void processElement(StreamRecord<T> element) throws Exception;
100
}
101
```
102
103
### StreamGroupedFold<IN, OUT>
104
105
Operator for keyed fold operations.
106
107
```java { .api }
108
public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
109
implements OneInputStreamOperator<IN, OUT> {
110
111
public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue, TypeSerializer<OUT> serializer);
112
public void processElement(StreamRecord<IN> element) throws Exception;
113
}
114
```
115
116
## Source and Sink Operators
117
118
### StreamSource<OUT>
119
120
Operator that wraps a SourceFunction for data ingestion.
121
122
```java { .api }
123
public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>>
124
implements StreamOperator<OUT> {
125
126
public StreamSource(SourceFunction<OUT> sourceFunction);
127
public void run(Object lockingObject, Output<StreamRecord<OUT>> collector) throws Exception;
128
}
129
```
130
131
### StreamSink<IN>
132
133
Operator that wraps a SinkFunction for data output.
134
135
```java { .api }
136
public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
137
implements OneInputStreamOperator<IN, Object> {
138
139
public StreamSink(SinkFunction<IN> sinkFunction);
140
public void processElement(StreamRecord<IN> element) throws Exception;
141
}
142
```
143
144
## Connected Stream Operators
145
146
### CoStreamMap<IN1, IN2, OUT>
147
148
Operator for applying CoMapFunction to connected streams.
149
150
```java { .api }
151
public class CoStreamMap<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
152
implements TwoInputStreamOperator<IN1, IN2, OUT> {
153
154
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper);
155
public void processElement1(StreamRecord<IN1> element) throws Exception;
156
public void processElement2(StreamRecord<IN2> element) throws Exception;
157
}
158
```
159
160
### CoStreamFlatMap<IN1, IN2, OUT>
161
162
Operator for applying CoFlatMapFunction to connected streams.
163
164
```java { .api }
165
public class CoStreamFlatMap<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
166
implements TwoInputStreamOperator<IN1, IN2, OUT> {
167
168
public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper);
169
public void processElement1(StreamRecord<IN1> element) throws Exception;
170
public void processElement2(StreamRecord<IN2> element) throws Exception;
171
}
172
```
173
174
### CoStreamReduce<T>
175
176
Operator for applying CoReduceFunction to connected streams.
177
178
```java { .api }
179
public class CoStreamReduce<T> extends AbstractUdfStreamOperator<T, CoReduceFunction<T, T, T>>
180
implements TwoInputStreamOperator<T, T, T> {
181
182
public CoStreamReduce(CoReduceFunction<T, T, T> reducer);
183
public void processElement1(StreamRecord<T> element) throws Exception;
184
public void processElement2(StreamRecord<T> element) throws Exception;
185
}
186
```
187
188
## Windowing Operators
189
190
### WindowingOperator<T>
191
192
Operator that implements windowing logic.
193
194
```java { .api }
195
public class WindowingOperator<T> extends OneInputStreamOperator<T, StreamWindow<T>> {
196
public WindowingOperator(WindowingHelper<T> helper, TypeSerializer<T> serializer);
197
public void processElement(StreamRecord<T> element) throws Exception;
198
}
199
```
200
201
## Partitioning Operators
202
203
### StreamPartition<T>
204
205
Base class for partitioning operators that control data distribution.
206
207
```java { .api }
208
public abstract class StreamPartition<T> implements StreamOperator<T> {
209
// Partitioning logic implementation
210
}
211
```
212
213
### ShufflePartitioner<T>
214
215
Randomly distributes elements across parallel instances.
216
217
```java { .api }
218
public class ShufflePartitioner<T> extends StreamPartition<T> {
219
// Random partitioning implementation
220
}
221
```
222
223
### RebalancePartitioner<T>
224
225
Round-robin distribution of elements.
226
227
```java { .api }
228
public class RebalancePartitioner<T> extends StreamPartition<T> {
229
// Round-robin partitioning implementation
230
}
231
```
232
233
### BroadcastPartitioner<T>
234
235
Broadcasts elements to all parallel instances.
236
237
```java { .api }
238
public class BroadcastPartitioner<T> extends StreamPartition<T> {
239
// Broadcast partitioning implementation
240
}
241
```
242
243
## Base Classes
244
245
### AbstractStreamOperator<OUT>
246
247
Abstract base class providing common operator functionality.
248
249
```java { .api }
250
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT> {
251
protected Output<StreamRecord<OUT>> output;
252
protected RuntimeContext runtimeContext;
253
protected ChainingStrategy chainingStrategy;
254
255
public void setup(Output<StreamRecord<OUT>> output, RuntimeContext runtimeContext);
256
public void open(Configuration parameters) throws Exception;
257
public void close() throws Exception;
258
public void setChainingStrategy(ChainingStrategy strategy);
259
public ChainingStrategy getChainingStrategy();
260
}
261
```
262
263
### AbstractUdfStreamOperator<OUT, F>
264
265
Base class for operators that wrap user-defined functions.
266
267
```java { .api }
268
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
269
extends AbstractStreamOperator<OUT> {
270
271
protected final F userFunction;
272
273
public AbstractUdfStreamOperator(F userFunction);
274
public F getUserFunction();
275
}
276
```
277
278
## Types
279
280
```java { .api }
281
// Operator output interface
282
public interface Output<T> {
283
void collect(T record);
284
void emitWatermark(Watermark mark);
285
void close();
286
}
287
288
// Chaining strategy for operator connections
289
public enum ChainingStrategy {
290
ALWAYS, // Always try to chain with neighbors
291
NEVER, // Never chain with neighbors
292
HEAD // Can be chained to but not chain to others
293
}
294
295
// Stream record wrapper
296
public class StreamRecord<T> {
297
public T getValue();
298
public long getTimestamp();
299
public boolean hasTimestamp();
300
public StreamRecord<T> replace(T element);
301
public StreamRecord<T> replace(T element, long timestamp);
302
}
303
304
// Watermark for event time processing
305
public class Watermark {
306
public long getTimestamp();
307
public static final Watermark MAX_WATERMARK;
308
}
309
```