0
# Algorithm Operators
1
2
Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration. The operator system provides the foundation for implementing ML algorithms in Flink.
3
4
## Capabilities
5
6
### AlgoOperator Abstract Class
7
8
Base class for all algorithm operators providing common functionality for output table management, parameter handling, and schema operations.
9
10
```java { .api }
11
/**
12
* Base class for algorithm operators with output table management
13
*/
14
public abstract class AlgoOperator<T extends AlgoOperator<T>>
15
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
16
17
/** Default constructor */
18
public AlgoOperator();
19
20
/** Constructor with parameters */
21
public AlgoOperator(Params params);
22
23
/** Get parameters */
24
public Params getParams();
25
26
/**
27
* Get primary output table
28
* @return Primary output table
29
*/
30
public Table getOutput();
31
32
/**
33
* Get side output tables
34
* @return Array of side output tables
35
*/
36
public Table[] getSideOutputs();
37
38
/**
39
* Get output column names
40
* @return Array of column names
41
*/
42
public String[] getColNames();
43
44
/**
45
* Get output column types
46
* @return Array of column type information
47
*/
48
public TypeInformation<?>[] getColTypes();
49
50
/**
51
* Get side output column names by index
52
* @param index Side output index
53
* @return Array of column names for specified side output
54
*/
55
public String[] getSideOutputColNames(int index);
56
57
/**
58
* Get side output column types by index
59
* @param index Side output index
60
* @return Array of column types for specified side output
61
*/
62
public TypeInformation<?>[] getSideOutputColTypes(int index);
63
64
/**
65
* Get output table schema
66
* @return TableSchema of primary output
67
*/
68
public TableSchema getSchema();
69
70
/**
71
* String representation of operator
72
* @return String description
73
*/
74
public String toString();
75
76
/** Set output table (protected) */
77
protected void setOutput(Table output);
78
79
/** Set side output tables (protected) */
80
protected void setSideOutputs(Table[] sideOutputs);
81
}
82
```
83
84
### BatchOperator Abstract Class
85
86
Base class for batch algorithm operators providing linking capabilities and batch-specific functionality.
87
88
```java { .api }
89
/**
90
* Base class for batch algorithm operators
91
*/
92
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
93
94
/** Default constructor */
95
public BatchOperator();
96
97
/** Constructor with parameters */
98
public BatchOperator(Params params);
99
100
/**
101
* Link to next batch operator in chain
102
* @param next Next operator to link to
103
* @return The next operator for method chaining
104
*/
105
public <B extends BatchOperator<?>> B link(B next);
106
107
/**
108
* Link from input batch operators (abstract - implemented by subclasses)
109
* @param inputs Input batch operators
110
* @return This operator instance configured with inputs
111
*/
112
public abstract T linkFrom(BatchOperator<?>... inputs);
113
114
/**
115
* Create batch operator from table
116
* @param table Input table
117
* @return BatchOperator wrapping the table
118
*/
119
public static BatchOperator<?> fromTable(Table table);
120
121
/**
122
* Validate and get first input operator (protected utility)
123
* @param inputs Input operators
124
* @return First input operator
125
*/
126
protected static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
127
}
128
```
129
130
**Usage Examples:**
131
132
```java
133
import org.apache.flink.ml.operator.batch.BatchOperator;
134
import org.apache.flink.table.api.Table;
135
136
// Example custom batch operator implementation
137
public class MyBatchOperator extends BatchOperator<MyBatchOperator> {
138
139
public MyBatchOperator() {
140
super();
141
}
142
143
public MyBatchOperator(Params params) {
144
super(params);
145
}
146
147
@Override
148
public MyBatchOperator linkFrom(BatchOperator<?>... inputs) {
149
// Validate inputs
150
BatchOperator<?> input = checkAndGetFirst(inputs);
151
152
// Get input table
153
Table inputTable = input.getOutput();
154
155
// Process the table (example transformation)
156
Table outputTable = inputTable.select("*"); // Your processing logic here
157
158
// Set the output
159
this.setOutput(outputTable);
160
161
return this;
162
}
163
}
164
165
// Usage - operator chaining
166
BatchOperator<?> source = BatchOperator.fromTable(inputTable);
167
168
MyBatchOperator operator1 = new MyBatchOperator()
169
.setParam("parameter1", "value1");
170
171
MyBatchOperator operator2 = new MyBatchOperator()
172
.setParam("parameter2", "value2");
173
174
// Chain operators
175
Table result = source
176
.link(operator1.linkFrom(source))
177
.link(operator2.linkFrom(operator1))
178
.getOutput();
179
```
180
181
### StreamOperator Abstract Class
182
183
Base class for stream algorithm operators providing linking capabilities and stream-specific functionality.
184
185
```java { .api }
186
/**
187
* Base class for stream algorithm operators
188
*/
189
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
190
191
/** Default constructor */
192
public StreamOperator();
193
194
/** Constructor with parameters */
195
public StreamOperator(Params params);
196
197
/**
198
* Link to next stream operator in chain
199
* @param next Next operator to link to
200
* @return The next operator for method chaining
201
*/
202
public <S extends StreamOperator<?>> S link(S next);
203
204
/**
205
* Link from input stream operators (abstract - implemented by subclasses)
206
* @param inputs Input stream operators
207
* @return This operator instance configured with inputs
208
*/
209
public abstract T linkFrom(StreamOperator<?>... inputs);
210
211
/**
212
* Create stream operator from table
213
* @param table Input table
214
* @return StreamOperator wrapping the table
215
*/
216
public static StreamOperator<?> fromTable(Table table);
217
}
218
```
219
220
**Usage Examples:**
221
222
```java
223
import org.apache.flink.ml.operator.stream.StreamOperator;
224
import org.apache.flink.table.api.Table;
225
226
// Example custom stream operator implementation
227
public class MyStreamOperator extends StreamOperator<MyStreamOperator> {
228
229
public MyStreamOperator() {
230
super();
231
}
232
233
@Override
234
public MyStreamOperator linkFrom(StreamOperator<?>... inputs) {
235
StreamOperator<?> input = inputs[0]; // Get first input
236
237
// Get input table
238
Table inputTable = input.getOutput();
239
240
// Process the streaming table
241
Table outputTable = inputTable.select("*"); // Your streaming logic here
242
243
// Set the output
244
this.setOutput(outputTable);
245
246
return this;
247
}
248
}
249
250
// Usage - stream processing
251
StreamOperator<?> streamSource = StreamOperator.fromTable(streamingTable);
252
253
MyStreamOperator streamProcessor = new MyStreamOperator();
254
255
Table streamResult = streamSource
256
.link(streamProcessor.linkFrom(streamSource))
257
.getOutput();
258
```
259
260
### Table Source Operators
261
262
Specialized operators for creating operators from table sources in both batch and stream contexts.
263
264
```java { .api }
265
/**
266
* Batch operator for table sources
267
*/
268
public class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
269
/**
270
* Constructor from table
271
* @param table Source table
272
*/
273
public TableSourceBatchOp(Table table);
274
275
@Override
276
public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
277
}
278
279
/**
280
* Stream operator for table sources
281
*/
282
public class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
283
/**
284
* Constructor from table
285
* @param table Source table
286
*/
287
public TableSourceStreamOp(Table table);
288
289
@Override
290
public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
291
}
292
```
293
294
**Usage Examples:**
295
296
```java
297
import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
298
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
299
300
// Create operators from tables
301
Table batchTable = getBatchTable();
302
Table streamTable = getStreamTable();
303
304
// Batch table source
305
TableSourceBatchOp batchSource = new TableSourceBatchOp(batchTable);
306
307
// Stream table source
308
TableSourceStreamOp streamSource = new TableSourceStreamOp(streamTable);
309
310
// Use in operator chains
311
MyBatchOperator batchProcessor = new MyBatchOperator();
312
Table batchResult = batchSource
313
.link(batchProcessor.linkFrom(batchSource))
314
.getOutput();
315
316
MyStreamOperator streamProcessor = new MyStreamOperator();
317
Table streamResult = streamSource
318
.link(streamProcessor.linkFrom(streamSource))
319
.getOutput();
320
```
321
322
### Operator Pattern Examples
323
324
Common patterns for implementing algorithm operators with proper input validation and output management.
325
326
**Usage Examples:**
327
328
```java
329
// Multi-input operator example
330
public class JoinOperator extends BatchOperator<JoinOperator> {
331
332
@Override
333
public JoinOperator linkFrom(BatchOperator<?>... inputs) {
334
// Validate multiple inputs
335
if (inputs.length != 2) {
336
throw new IllegalArgumentException("JoinOperator requires exactly 2 inputs");
337
}
338
339
BatchOperator<?> left = inputs[0];
340
BatchOperator<?> right = inputs[1];
341
342
// Get input tables
343
Table leftTable = left.getOutput();
344
Table rightTable = right.getOutput();
345
346
// Perform join operation
347
String joinCondition = getParams().get(JOIN_CONDITION);
348
Table joinedTable = leftTable.join(rightTable, joinCondition);
349
350
// Set output
351
this.setOutput(joinedTable);
352
353
return this;
354
}
355
}
356
357
// Side output operator example
358
public class SplitOperator extends BatchOperator<SplitOperator> {
359
360
@Override
361
public SplitOperator linkFrom(BatchOperator<?>... inputs) {
362
BatchOperator<?> input = checkAndGetFirst(inputs);
363
Table inputTable = input.getOutput();
364
365
// Split condition
366
String condition = getParams().get(SPLIT_CONDITION);
367
368
// Create main and side outputs
369
Table mainOutput = inputTable.filter(condition);
370
Table sideOutput = inputTable.filter("!(" + condition + ")");
371
372
// Set outputs
373
this.setOutput(mainOutput);
374
this.setSideOutputs(new Table[]{sideOutput});
375
376
return this;
377
}
378
}
379
380
// Parameter-driven operator example
381
public class SelectOperator extends BatchOperator<SelectOperator>
382
implements HasSelectedCols<SelectOperator> {
383
384
@Override
385
public SelectOperator linkFrom(BatchOperator<?>... inputs) {
386
BatchOperator<?> input = checkAndGetFirst(inputs);
387
Table inputTable = input.getOutput();
388
389
// Get selected columns from parameters
390
String[] selectedCols = getSelectedCols();
391
if (selectedCols == null || selectedCols.length == 0) {
392
selectedCols = inputTable.getSchema().getFieldNames();
393
}
394
395
// Select columns
396
Table selectedTable = inputTable.select(String.join(",", selectedCols));
397
398
this.setOutput(selectedTable);
399
return this;
400
}
401
}
402
403
// Usage
404
SelectOperator selector = new SelectOperator()
405
.setSelectedCols(new String[]{"col1", "col2", "col3"});
406
407
JoinOperator joiner = new JoinOperator()
408
.set(JOIN_CONDITION, "a.id = b.id");
409
410
SplitOperator splitter = new SplitOperator()
411
.set(SPLIT_CONDITION, "age > 18");
412
413
// Build pipeline
414
Table result = source
415
.link(selector.linkFrom(source))
416
.link(joiner.linkFrom(selector, anotherSource))
417
.link(splitter.linkFrom(joiner))
418
.getOutput();
419
420
// Access side outputs
421
Table sideOutput = splitter.getSideOutputs()[0];
422
```