Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration. The operator system provides the foundation for implementing ML algorithms in Flink.
Base class for all algorithm operators providing common functionality for output table management, parameter handling, and schema operations.
/**
* Base class for algorithm operators with output table management
*/
public abstract class AlgoOperator<T extends AlgoOperator<T>>
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
/** Default constructor */
public AlgoOperator();
/** Constructor with parameters */
public AlgoOperator(Params params);
/** Get parameters */
public Params getParams();
/**
* Get primary output table
* @return Primary output table
*/
public Table getOutput();
/**
* Get side output tables
* @return Array of side output tables
*/
public Table[] getSideOutputs();
/**
* Get output column names
* @return Array of column names
*/
public String[] getColNames();
/**
* Get output column types
* @return Array of column type information
*/
public TypeInformation<?>[] getColTypes();
/**
* Get side output column names by index
* @param index Side output index
* @return Array of column names for specified side output
*/
public String[] getSideOutputColNames(int index);
/**
* Get side output column types by index
* @param index Side output index
* @return Array of column types for specified side output
*/
public TypeInformation<?>[] getSideOutputColTypes(int index);
/**
* Get output table schema
* @return TableSchema of primary output
*/
public TableSchema getSchema();
/**
* String representation of operator
* @return String description
*/
public String toString();
/** Set output table (protected) */
protected void setOutput(Table output);
/** Set side output tables (protected) */
protected void setSideOutputs(Table[] sideOutputs);
}Base class for batch algorithm operators providing linking capabilities and batch-specific functionality.
/**
* Base class for batch algorithm operators
*/
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
/** Default constructor */
public BatchOperator();
/** Constructor with parameters */
public BatchOperator(Params params);
/**
* Link to next batch operator in chain
* @param next Next operator to link to
* @return The next operator for method chaining
*/
public <B extends BatchOperator<?>> B link(B next);
/**
* Link from input batch operators (abstract - implemented by subclasses)
* @param inputs Input batch operators
* @return This operator instance configured with inputs
*/
public abstract T linkFrom(BatchOperator<?>... inputs);
/**
* Create batch operator from table
* @param table Input table
* @return BatchOperator wrapping the table
*/
public static BatchOperator<?> fromTable(Table table);
/**
* Validate and get first input operator (protected utility)
* @param inputs Input operators
* @return First input operator
*/
protected static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
}Usage Examples:
import org.apache.flink.ml.operator.batch.BatchOperator;
import org.apache.flink.table.api.Table;
// Example custom batch operator implementation
public class MyBatchOperator extends BatchOperator<MyBatchOperator> {
public MyBatchOperator() {
super();
}
public MyBatchOperator(Params params) {
super(params);
}
@Override
public MyBatchOperator linkFrom(BatchOperator<?>... inputs) {
// Validate inputs
BatchOperator<?> input = checkAndGetFirst(inputs);
// Get input table
Table inputTable = input.getOutput();
// Process the table (example transformation)
Table outputTable = inputTable.select("*"); // Your processing logic here
// Set the output
this.setOutput(outputTable);
return this;
}
}
// Usage - operator chaining
BatchOperator<?> source = BatchOperator.fromTable(inputTable);
MyBatchOperator operator1 = new MyBatchOperator()
.setParam("parameter1", "value1");
MyBatchOperator operator2 = new MyBatchOperator()
.setParam("parameter2", "value2");
// Chain operators
Table result = source
.link(operator1.linkFrom(source))
.link(operator2.linkFrom(operator1))
.getOutput();Base class for stream algorithm operators providing linking capabilities and stream-specific functionality.
/**
* Base class for stream algorithm operators
*/
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
/** Default constructor */
public StreamOperator();
/** Constructor with parameters */
public StreamOperator(Params params);
/**
* Link to next stream operator in chain
* @param next Next operator to link to
* @return The next operator for method chaining
*/
public <S extends StreamOperator<?>> S link(S next);
/**
* Link from input stream operators (abstract - implemented by subclasses)
* @param inputs Input stream operators
* @return This operator instance configured with inputs
*/
public abstract T linkFrom(StreamOperator<?>... inputs);
/**
* Create stream operator from table
* @param table Input table
* @return StreamOperator wrapping the table
*/
public static StreamOperator<?> fromTable(Table table);
}Usage Examples:
import org.apache.flink.ml.operator.stream.StreamOperator;
import org.apache.flink.table.api.Table;
// Example custom stream operator implementation
public class MyStreamOperator extends StreamOperator<MyStreamOperator> {
public MyStreamOperator() {
super();
}
@Override
public MyStreamOperator linkFrom(StreamOperator<?>... inputs) {
StreamOperator<?> input = inputs[0]; // Get first input
// Get input table
Table inputTable = input.getOutput();
// Process the streaming table
Table outputTable = inputTable.select("*"); // Your streaming logic here
// Set the output
this.setOutput(outputTable);
return this;
}
}
// Usage - stream processing
StreamOperator<?> streamSource = StreamOperator.fromTable(streamingTable);
MyStreamOperator streamProcessor = new MyStreamOperator();
Table streamResult = streamSource
.link(streamProcessor.linkFrom(streamSource))
.getOutput();Specialized operators for creating operators from table sources in both batch and stream contexts.
/**
* Batch operator for table sources
*/
public class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
/**
* Constructor from table
* @param table Source table
*/
public TableSourceBatchOp(Table table);
@Override
public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
}
/**
* Stream operator for table sources
*/
public class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
/**
* Constructor from table
* @param table Source table
*/
public TableSourceStreamOp(Table table);
@Override
public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
}Usage Examples:
import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
// Create operators from tables
Table batchTable = getBatchTable();
Table streamTable = getStreamTable();
// Batch table source
TableSourceBatchOp batchSource = new TableSourceBatchOp(batchTable);
// Stream table source
TableSourceStreamOp streamSource = new TableSourceStreamOp(streamTable);
// Use in operator chains
MyBatchOperator batchProcessor = new MyBatchOperator();
Table batchResult = batchSource
.link(batchProcessor.linkFrom(batchSource))
.getOutput();
MyStreamOperator streamProcessor = new MyStreamOperator();
Table streamResult = streamSource
.link(streamProcessor.linkFrom(streamSource))
.getOutput();Common patterns for implementing algorithm operators with proper input validation and output management.
Usage Examples:
// Multi-input operator example
public class JoinOperator extends BatchOperator<JoinOperator> {
@Override
public JoinOperator linkFrom(BatchOperator<?>... inputs) {
// Validate multiple inputs
if (inputs.length != 2) {
throw new IllegalArgumentException("JoinOperator requires exactly 2 inputs");
}
BatchOperator<?> left = inputs[0];
BatchOperator<?> right = inputs[1];
// Get input tables
Table leftTable = left.getOutput();
Table rightTable = right.getOutput();
// Perform join operation
String joinCondition = getParams().get(JOIN_CONDITION);
Table joinedTable = leftTable.join(rightTable, joinCondition);
// Set output
this.setOutput(joinedTable);
return this;
}
}
// Side output operator example
public class SplitOperator extends BatchOperator<SplitOperator> {
@Override
public SplitOperator linkFrom(BatchOperator<?>... inputs) {
BatchOperator<?> input = checkAndGetFirst(inputs);
Table inputTable = input.getOutput();
// Split condition
String condition = getParams().get(SPLIT_CONDITION);
// Create main and side outputs
Table mainOutput = inputTable.filter(condition);
Table sideOutput = inputTable.filter("!(" + condition + ")");
// Set outputs
this.setOutput(mainOutput);
this.setSideOutputs(new Table[]{sideOutput});
return this;
}
}
// Parameter-driven operator example
public class SelectOperator extends BatchOperator<SelectOperator>
implements HasSelectedCols<SelectOperator> {
@Override
public SelectOperator linkFrom(BatchOperator<?>... inputs) {
BatchOperator<?> input = checkAndGetFirst(inputs);
Table inputTable = input.getOutput();
// Get selected columns from parameters
String[] selectedCols = getSelectedCols();
if (selectedCols == null || selectedCols.length == 0) {
selectedCols = inputTable.getSchema().getFieldNames();
}
// Select columns
Table selectedTable = inputTable.select(String.join(",", selectedCols));
this.setOutput(selectedTable);
return this;
}
}
// Usage
SelectOperator selector = new SelectOperator()
.setSelectedCols(new String[]{"col1", "col2", "col3"});
JoinOperator joiner = new JoinOperator()
.set(JOIN_CONDITION, "a.id = b.id");
SplitOperator splitter = new SplitOperator()
.set(SPLIT_CONDITION, "age > 18");
// Build pipeline
Table result = source
.link(selector.linkFrom(source))
.link(joiner.linkFrom(selector, anotherSource))
.link(splitter.linkFrom(joiner))
.getOutput();
// Access side outputs
Table sideOutput = splitter.getSideOutputs()[0];