0
# Algorithm Operators
1
2
Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API for scalable data processing workflows.
3
4
## Capabilities
5
6
### AlgoOperator Base Class
7
8
Abstract base class for all algorithm operators providing common functionality.
9
10
```java { .api }
11
/**
12
* Base class for algorithm operators with parameter support
13
* @param <T> The concrete operator type for method chaining
14
*/
15
public abstract class AlgoOperator<T extends AlgoOperator<T>>
16
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
17
18
/** Create operator with empty parameters */
19
public AlgoOperator();
20
21
/** Create operator with initial parameters */
22
public AlgoOperator(Params params);
23
24
/** Get all operator parameters */
25
public Params getParams();
26
27
/** Get primary output table */
28
public Table getOutput();
29
30
/** Get all side output tables */
31
public Table[] getSideOutputs();
32
33
/** Get output column names */
34
public String[] getColNames();
35
36
/** Get output column types */
37
public TypeInformation<?>[] getColTypes();
38
39
/** Get side output column names by index */
40
public String[] getSideOutputColNames(int index);
41
42
/** Get side output column types by index */
43
public TypeInformation<?>[] getSideOutputColTypes(int index);
44
45
/** Get output table schema */
46
public TableSchema getSchema();
47
48
/** Set side output tables (protected) */
49
protected void setSideOutputs(Table[] sideOutputs);
50
51
/** Set primary output table (protected) */
52
protected void setOutput(Table output);
53
54
/** Validate minimum number of input operators */
55
public static void checkMinOpSize(int size, AlgoOperator<?>... inputs);
56
57
/** Validate exact number of input operators */
58
public static void checkOpSize(int size, AlgoOperator<?>... inputs);
59
}
60
```
61
62
### BatchOperator Class
63
64
Base class for batch algorithm operators with linking and chaining capabilities.
65
66
```java { .api }
67
/**
68
* Base class for batch algorithm operators
69
* Provides operator linking and batch-specific functionality
70
* @param <T> The concrete batch operator type
71
*/
72
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
73
74
/** Create batch operator with empty parameters */
75
public BatchOperator();
76
77
/** Create batch operator with initial parameters */
78
public BatchOperator(Params params);
79
80
/** Link this operator to the next operator in chain */
81
public <B extends BatchOperator<?>> B link(B next);
82
83
/** Link this operator from input operators (abstract) */
84
public abstract T linkFrom(BatchOperator<?>... inputs);
85
86
/** Create batch operator from existing table */
87
public static BatchOperator<?> fromTable(Table table);
88
89
/** Get first input operator, validating inputs exist */
90
public static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
import org.apache.flink.ml.operator.batch.BatchOperator;
98
import org.apache.flink.table.api.Table;
99
100
// Create operators from tables
101
Table inputTable = // ... your input table
102
BatchOperator<?> source = BatchOperator.fromTable(inputTable);
103
104
// Chain operators
105
BatchOperator<?> result = source
106
.link(new MyBatchTransformer())
107
.link(new MyBatchEstimator())
108
.link(new MyBatchSink());
109
110
// Custom batch operator implementation
111
public class MyBatchTransformer extends BatchOperator<MyBatchTransformer> {
112
@Override
113
public MyBatchTransformer linkFrom(BatchOperator<?>... inputs) {
114
BatchOperator.checkOpSize(1, inputs);
115
BatchOperator<?> input = inputs[0];
116
117
// Process input table
118
Table processedTable = // ... transformation logic
119
this.setOutput(processedTable);
120
121
return this;
122
}
123
}
124
```
125
126
### StreamOperator Class
127
128
Base class for stream algorithm operators with real-time processing capabilities.
129
130
```java { .api }
131
/**
132
* Base class for stream algorithm operators
133
* Provides operator linking and stream-specific functionality
134
* @param <T> The concrete stream operator type
135
*/
136
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
137
138
/** Create stream operator with empty parameters */
139
public StreamOperator();
140
141
/** Create stream operator with initial parameters */
142
public StreamOperator(Params params);
143
144
/** Link this operator to the next operator in chain */
145
public <S extends StreamOperator<?>> S link(S next);
146
147
/** Link this operator from input operators (abstract) */
148
public abstract T linkFrom(StreamOperator<?>... inputs);
149
150
/** Create stream operator from existing table */
151
public static StreamOperator<?> fromTable(Table table);
152
153
/** Get first input operator, validating inputs exist */
154
public static StreamOperator<?> checkAndGetFirst(StreamOperator<?>... inputs);
155
}
156
```
157
158
**Usage Examples:**
159
160
```java
161
import org.apache.flink.ml.operator.stream.StreamOperator;
162
import org.apache.flink.table.api.Table;
163
164
// Create operators from streaming tables
165
Table streamingTable = // ... your streaming table
166
StreamOperator<?> source = StreamOperator.fromTable(streamingTable);
167
168
// Chain streaming operators
169
StreamOperator<?> result = source
170
.link(new MyStreamTransformer())
171
.link(new MyStreamProcessor())
172
.link(new MyStreamSink());
173
174
// Custom stream operator implementation
175
public class MyStreamTransformer extends StreamOperator<MyStreamTransformer> {
176
@Override
177
public MyStreamTransformer linkFrom(StreamOperator<?>... inputs) {
178
StreamOperator.checkOpSize(1, inputs);
179
StreamOperator<?> input = inputs[0];
180
181
// Process streaming table
182
Table processedStream = // ... streaming transformation logic
183
this.setOutput(processedStream);
184
185
return this;
186
}
187
}
188
```
189
190
### Source Operators
191
192
Specialized operators for converting Tables to operator chains.
193
194
#### TableSourceBatchOp
195
196
```java { .api }
197
/**
198
* Transform Table to batch source operator
199
* Entry point for batch operator chains
200
*/
201
public final class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
202
203
/** Create batch source from table */
204
public TableSourceBatchOp(Table table);
205
206
/** Not supported - throws UnsupportedOperationException */
207
public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
208
}
209
```
210
211
#### TableSourceStreamOp
212
213
```java { .api }
214
/**
215
* Transform Table to stream source operator
216
* Entry point for stream operator chains
217
*/
218
public final class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
219
220
/** Create stream source from table */
221
public TableSourceStreamOp(Table table);
222
223
/** Not supported - throws UnsupportedOperationException */
224
public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
225
}
226
```
227
228
**Usage Examples:**
229
230
```java
231
import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
232
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
233
234
// Batch processing chain
235
Table batchData = // ... your batch table
236
BatchOperator<?> batchChain = new TableSourceBatchOp(batchData)
237
.link(new FeatureScaler())
238
.link(new LinearRegression())
239
.link(new ModelEvaluator());
240
241
// Stream processing chain
242
Table streamData = // ... your streaming table
243
StreamOperator<?> streamChain = new TableSourceStreamOp(streamData)
244
.link(new StreamingFeatureTransformer())
245
.link(new OnlinePredictor())
246
.link(new AlertSystem());
247
```
248
249
## Operator Chaining Patterns
250
251
### Linear Chaining
252
253
Most common pattern where operators are linked in sequence:
254
255
```java
256
// Simple linear chain
257
BatchOperator<?> result = source
258
.link(preprocessor)
259
.link(featureExtractor)
260
.link(classifier);
261
```
262
263
### Multi-Input Operators
264
265
Operators that consume multiple inputs:
266
267
```java
268
public class JoinOperator extends BatchOperator<JoinOperator> {
269
@Override
270
public JoinOperator linkFrom(BatchOperator<?>... inputs) {
271
BatchOperator.checkOpSize(2, inputs); // Require exactly 2 inputs
272
273
BatchOperator<?> left = inputs[0];
274
BatchOperator<?> right = inputs[1];
275
276
// Join logic here
277
Table joined = // ... join left and right tables
278
this.setOutput(joined);
279
280
return this;
281
}
282
}
283
284
// Usage
285
BatchOperator<?> joined = new JoinOperator()
286
.linkFrom(leftOperator, rightOperator);
287
```
288
289
### Multi-Output Operators
290
291
Operators that produce multiple outputs:
292
293
```java
294
public class SplitOperator extends BatchOperator<SplitOperator> {
295
@Override
296
public SplitOperator linkFrom(BatchOperator<?>... inputs) {
297
BatchOperator.checkOpSize(1, inputs);
298
299
Table input = inputs[0].getOutput();
300
301
// Split logic
302
Table mainOutput = // ... main result
303
Table[] sideOutputs = new Table[]{
304
// ... additional outputs
305
};
306
307
this.setOutput(mainOutput);
308
this.setSideOutputs(sideOutputs);
309
310
return this;
311
}
312
}
313
314
// Usage
315
SplitOperator splitter = new SplitOperator().linkFrom(source);
316
Table main = splitter.getOutput();
317
Table[] sides = splitter.getSideOutputs();
318
```
319
320
### Parameter Configuration
321
322
All operators support parameter configuration through the WithParams interface:
323
324
```java
325
public class ConfigurableOperator extends BatchOperator<ConfigurableOperator> {
326
// Parameter definitions
327
public static final ParamInfo<Integer> NUM_ITERATIONS = ParamInfoFactory
328
.createParamInfo("numIterations", Integer.class)
329
.setHasDefaultValue(10)
330
.build();
331
332
public static final ParamInfo<Double> LEARNING_RATE = ParamInfoFactory
333
.createParamInfo("learningRate", Double.class)
334
.setHasDefaultValue(0.01)
335
.build();
336
337
// Convenience methods
338
public ConfigurableOperator setNumIterations(int numIter) {
339
return set(NUM_ITERATIONS, numIter);
340
}
341
342
public int getNumIterations() {
343
return get(NUM_ITERATIONS);
344
}
345
346
@Override
347
public ConfigurableOperator linkFrom(BatchOperator<?>... inputs) {
348
// Use parameters in processing
349
int numIter = getNumIterations();
350
double lr = get(LEARNING_RATE);
351
352
// Processing logic using parameters
353
// ...
354
355
return this;
356
}
357
}
358
359
// Usage with parameters
360
BatchOperator<?> configured = new ConfigurableOperator()
361
.setNumIterations(20)
362
.set(ConfigurableOperator.LEARNING_RATE, 0.001)
363
.linkFrom(source);
364
```
365
366
## Integration with Pipeline Framework
367
368
Algorithm operators can be integrated with the higher-level pipeline framework:
369
370
```java
371
public class MyEstimatorFromOperator extends EstimatorBase<MyEstimatorFromOperator, MyModelFromOperator> {
372
373
@Override
374
protected MyModelFromOperator fit(BatchOperator input) {
375
// Use batch operators for training
376
BatchOperator<?> trained = input
377
.link(new FeaturePreprocessor())
378
.link(new TrainingOperator())
379
.link(new ModelExtractor());
380
381
// Extract model data
382
Table modelData = trained.getOutput();
383
384
return new MyModelFromOperator(this.getParams()).setModelData(modelData);
385
}
386
}
387
388
public class MyModelFromOperator extends ModelBase<MyModelFromOperator> {
389
390
@Override
391
protected BatchOperator transform(BatchOperator input) {
392
// Use batch operators for prediction
393
return input
394
.link(new FeaturePreprocessor())
395
.link(new PredictionOperator().setModelData(this.getModelData()));
396
}
397
398
@Override
399
protected StreamOperator transform(StreamOperator input) {
400
// Use stream operators for real-time prediction
401
return input
402
.link(new StreamFeaturePreprocessor())
403
.link(new StreamPredictionOperator().setModelData(this.getModelData()));
404
}
405
}
406
```
407
408
This integration allows you to leverage the low-level operator framework within the high-level pipeline abstractions, providing flexibility for custom algorithm implementations while maintaining compatibility with the broader ML ecosystem.