or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

checkpointing-state.mddatastream-operations.mdexecution-environment.mdindex.mdsources-and-sinks.mdstream-operators.mdwindowing.md

stream-operators.mddocs/

0

# Stream Operators

1

2

Stream operators are the internal execution components that implement the actual stream processing logic in Flink. They handle element processing, state management, and coordination with the runtime system.

3

4

## Core Operator Interfaces

5

6

### StreamOperator<OUT>

7

8

Base interface for all streaming operators.

9

10

```java { .api }

11

public interface StreamOperator<OUT> extends Serializable {

12

// Lifecycle methods

13

void setup(Output<StreamRecord<OUT>> output, RuntimeContext runtimeContext);

14

void open(Configuration parameters) throws Exception;

15

void close() throws Exception;

16

17

// Configuration

18

void setChainingStrategy(ChainingStrategy strategy);

19

ChainingStrategy getChainingStrategy();

20

}

21

```

22

23

### OneInputStreamOperator<IN, OUT>

24

25

Interface for operators that process a single input stream.

26

27

```java { .api }

28

public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {

29

void processElement(StreamRecord<IN> element) throws Exception;

30

void processWatermark(Watermark mark) throws Exception;

31

}

32

```

33

34

### TwoInputStreamOperator<IN1, IN2, OUT>

35

36

Interface for operators that process two input streams (e.g., for connected streams).

37

38

```java { .api }

39

public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

40

void processElement1(StreamRecord<IN1> element) throws Exception;

41

void processElement2(StreamRecord<IN2> element) throws Exception;

42

void processWatermark1(Watermark mark) throws Exception;

43

void processWatermark2(Watermark mark) throws Exception;

44

}

45

```

46

47

## Function-based Operators

48

49

### StreamMap<IN, OUT>

50

51

Operator that applies a MapFunction to each element.

52

53

```java { .api }

54

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>

55

implements OneInputStreamOperator<IN, OUT> {

56

57

public StreamMap(MapFunction<IN, OUT> mapper);

58

public void processElement(StreamRecord<IN> element) throws Exception;

59

}

60

```

61

62

### StreamFlatMap<IN, OUT>

63

64

Operator that applies a FlatMapFunction to each element.

65

66

```java { .api }

67

public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>

68

implements OneInputStreamOperator<IN, OUT> {

69

70

public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper);

71

public void processElement(StreamRecord<IN> element) throws Exception;

72

}

73

```

74

75

### StreamFilter<T>

76

77

Operator that filters elements based on a FilterFunction.

78

79

```java { .api }

80

public class StreamFilter<T> extends AbstractUdfStreamOperator<T, FilterFunction<T>>

81

implements OneInputStreamOperator<T, T> {

82

83

public StreamFilter(FilterFunction<T> filter);

84

public void processElement(StreamRecord<T> element) throws Exception;

85

}

86

```

87

88

## Keyed Stream Operators

89

90

### StreamGroupedReduce<T>

91

92

Operator for keyed reduce operations.

93

94

```java { .api }

95

public class StreamGroupedReduce<T> extends AbstractUdfStreamOperator<T, ReduceFunction<T>>

96

implements OneInputStreamOperator<T, T> {

97

98

public StreamGroupedReduce(ReduceFunction<T> reducer, TypeSerializer<T> serializer);

99

public void processElement(StreamRecord<T> element) throws Exception;

100

}

101

```

102

103

### StreamGroupedFold<IN, OUT>

104

105

Operator for keyed fold operations.

106

107

```java { .api }

108

public class StreamGroupedFold<IN, OUT> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>

109

implements OneInputStreamOperator<IN, OUT> {

110

111

public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue, TypeSerializer<OUT> serializer);

112

public void processElement(StreamRecord<IN> element) throws Exception;

113

}

114

```

115

116

## Source and Sink Operators

117

118

### StreamSource<OUT>

119

120

Operator that wraps a SourceFunction for data ingestion.

121

122

```java { .api }

123

public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>>

124

implements StreamOperator<OUT> {

125

126

public StreamSource(SourceFunction<OUT> sourceFunction);

127

public void run(Object lockingObject, Output<StreamRecord<OUT>> collector) throws Exception;

128

}

129

```

130

131

### StreamSink<IN>

132

133

Operator that wraps a SinkFunction for data output.

134

135

```java { .api }

136

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>

137

implements OneInputStreamOperator<IN, Object> {

138

139

public StreamSink(SinkFunction<IN> sinkFunction);

140

public void processElement(StreamRecord<IN> element) throws Exception;

141

}

142

```

143

144

## Connected Stream Operators

145

146

### CoStreamMap<IN1, IN2, OUT>

147

148

Operator for applying CoMapFunction to connected streams.

149

150

```java { .api }

151

public class CoStreamMap<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>

152

implements TwoInputStreamOperator<IN1, IN2, OUT> {

153

154

public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper);

155

public void processElement1(StreamRecord<IN1> element) throws Exception;

156

public void processElement2(StreamRecord<IN2> element) throws Exception;

157

}

158

```

159

160

### CoStreamFlatMap<IN1, IN2, OUT>

161

162

Operator for applying CoFlatMapFunction to connected streams.

163

164

```java { .api }

165

public class CoStreamFlatMap<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>

166

implements TwoInputStreamOperator<IN1, IN2, OUT> {

167

168

public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper);

169

public void processElement1(StreamRecord<IN1> element) throws Exception;

170

public void processElement2(StreamRecord<IN2> element) throws Exception;

171

}

172

```

173

174

### CoStreamReduce<T>

175

176

Operator for applying CoReduceFunction to connected streams.

177

178

```java { .api }

179

public class CoStreamReduce<T> extends AbstractUdfStreamOperator<T, CoReduceFunction<T, T, T>>

180

implements TwoInputStreamOperator<T, T, T> {

181

182

public CoStreamReduce(CoReduceFunction<T, T, T> reducer);

183

public void processElement1(StreamRecord<T> element) throws Exception;

184

public void processElement2(StreamRecord<T> element) throws Exception;

185

}

186

```

187

188

## Windowing Operators

189

190

### WindowingOperator<T>

191

192

Operator that implements windowing logic.

193

194

```java { .api }

195

public class WindowingOperator<T> extends OneInputStreamOperator<T, StreamWindow<T>> {

196

public WindowingOperator(WindowingHelper<T> helper, TypeSerializer<T> serializer);

197

public void processElement(StreamRecord<T> element) throws Exception;

198

}

199

```

200

201

## Partitioning Operators

202

203

### StreamPartition<T>

204

205

Base class for partitioning operators that control data distribution.

206

207

```java { .api }

208

public abstract class StreamPartition<T> implements StreamOperator<T> {

209

// Partitioning logic implementation

210

}

211

```

212

213

### ShufflePartitioner<T>

214

215

Randomly distributes elements across parallel instances.

216

217

```java { .api }

218

public class ShufflePartitioner<T> extends StreamPartition<T> {

219

// Random partitioning implementation

220

}

221

```

222

223

### RebalancePartitioner<T>

224

225

Round-robin distribution of elements.

226

227

```java { .api }

228

public class RebalancePartitioner<T> extends StreamPartition<T> {

229

// Round-robin partitioning implementation

230

}

231

```

232

233

### BroadcastPartitioner<T>

234

235

Broadcasts elements to all parallel instances.

236

237

```java { .api }

238

public class BroadcastPartitioner<T> extends StreamPartition<T> {

239

// Broadcast partitioning implementation

240

}

241

```

242

243

## Base Classes

244

245

### AbstractStreamOperator<OUT>

246

247

Abstract base class providing common operator functionality.

248

249

```java { .api }

250

public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT> {

251

protected Output<StreamRecord<OUT>> output;

252

protected RuntimeContext runtimeContext;

253

protected ChainingStrategy chainingStrategy;

254

255

public void setup(Output<StreamRecord<OUT>> output, RuntimeContext runtimeContext);

256

public void open(Configuration parameters) throws Exception;

257

public void close() throws Exception;

258

public void setChainingStrategy(ChainingStrategy strategy);

259

public ChainingStrategy getChainingStrategy();

260

}

261

```

262

263

### AbstractUdfStreamOperator<OUT, F>

264

265

Base class for operators that wrap user-defined functions.

266

267

```java { .api }

268

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>

269

extends AbstractStreamOperator<OUT> {

270

271

protected final F userFunction;

272

273

public AbstractUdfStreamOperator(F userFunction);

274

public F getUserFunction();

275

}

276

```

277

278

## Types

279

280

```java { .api }

281

// Operator output interface

282

public interface Output<T> {

283

void collect(T record);

284

void emitWatermark(Watermark mark);

285

void close();

286

}

287

288

// Chaining strategy for operator connections

289

public enum ChainingStrategy {

290

ALWAYS, // Always try to chain with neighbors

291

NEVER, // Never chain with neighbors

292

HEAD // Can be chained to but not chain to others

293

}

294

295

// Stream record wrapper

296

public class StreamRecord<T> {

297

public T getValue();

298

public long getTimestamp();

299

public boolean hasTimestamp();

300

public StreamRecord<T> replace(T element);

301

public StreamRecord<T> replace(T element, long timestamp);

302

}

303

304

// Watermark for event time processing

305

public class Watermark {

306

public long getTimestamp();

307

public static final Watermark MAX_WATERMARK;

308

}

309

```