Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.
—
Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API for scalable data processing workflows.
Abstract base class for all algorithm operators providing common functionality.
/**
* Base class for algorithm operators with parameter support
* @param <T> The concrete operator type for method chaining
*/
public abstract class AlgoOperator<T extends AlgoOperator<T>>
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
/** Create operator with empty parameters */
public AlgoOperator();
/** Create operator with initial parameters */
public AlgoOperator(Params params);
/** Get all operator parameters */
public Params getParams();
/** Get primary output table */
public Table getOutput();
/** Get all side output tables */
public Table[] getSideOutputs();
/** Get output column names */
public String[] getColNames();
/** Get output column types */
public TypeInformation<?>[] getColTypes();
/** Get side output column names by index */
public String[] getSideOutputColNames(int index);
/** Get side output column types by index */
public TypeInformation<?>[] getSideOutputColTypes(int index);
/** Get output table schema */
public TableSchema getSchema();
/** Set side output tables (protected) */
protected void setSideOutputs(Table[] sideOutputs);
/** Set primary output table (protected) */
protected void setOutput(Table output);
/** Validate minimum number of input operators */
public static void checkMinOpSize(int size, AlgoOperator<?>... inputs);
/** Validate exact number of input operators */
public static void checkOpSize(int size, AlgoOperator<?>... inputs);
}Base class for batch algorithm operators with linking and chaining capabilities.
/**
* Base class for batch algorithm operators
* Provides operator linking and batch-specific functionality
* @param <T> The concrete batch operator type
*/
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
/** Create batch operator with empty parameters */
public BatchOperator();
/** Create batch operator with initial parameters */
public BatchOperator(Params params);
/** Link this operator to the next operator in chain */
public <B extends BatchOperator<?>> B link(B next);
/** Link this operator from input operators (abstract) */
public abstract T linkFrom(BatchOperator<?>... inputs);
/** Create batch operator from existing table */
public static BatchOperator<?> fromTable(Table table);
/** Get first input operator, validating inputs exist */
public static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
}Usage Examples:
import org.apache.flink.ml.operator.batch.BatchOperator;
import org.apache.flink.table.api.Table;
// Create operators from tables
Table inputTable = // ... your input table
BatchOperator<?> source = BatchOperator.fromTable(inputTable);
// Chain operators
BatchOperator<?> result = source
.link(new MyBatchTransformer())
.link(new MyBatchEstimator())
.link(new MyBatchSink());
// Custom batch operator implementation
public class MyBatchTransformer extends BatchOperator<MyBatchTransformer> {
@Override
public MyBatchTransformer linkFrom(BatchOperator<?>... inputs) {
BatchOperator.checkOpSize(1, inputs);
BatchOperator<?> input = inputs[0];
// Process input table
Table processedTable = // ... transformation logic
this.setOutput(processedTable);
return this;
}
}Base class for stream algorithm operators with real-time processing capabilities.
/**
* Base class for stream algorithm operators
* Provides operator linking and stream-specific functionality
* @param <T> The concrete stream operator type
*/
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
/** Create stream operator with empty parameters */
public StreamOperator();
/** Create stream operator with initial parameters */
public StreamOperator(Params params);
/** Link this operator to the next operator in chain */
public <S extends StreamOperator<?>> S link(S next);
/** Link this operator from input operators (abstract) */
public abstract T linkFrom(StreamOperator<?>... inputs);
/** Create stream operator from existing table */
public static StreamOperator<?> fromTable(Table table);
/** Get first input operator, validating inputs exist */
public static StreamOperator<?> checkAndGetFirst(StreamOperator<?>... inputs);
}Usage Examples:
import org.apache.flink.ml.operator.stream.StreamOperator;
import org.apache.flink.table.api.Table;
// Create operators from streaming tables
Table streamingTable = // ... your streaming table
StreamOperator<?> source = StreamOperator.fromTable(streamingTable);
// Chain streaming operators
StreamOperator<?> result = source
.link(new MyStreamTransformer())
.link(new MyStreamProcessor())
.link(new MyStreamSink());
// Custom stream operator implementation
public class MyStreamTransformer extends StreamOperator<MyStreamTransformer> {
@Override
public MyStreamTransformer linkFrom(StreamOperator<?>... inputs) {
StreamOperator.checkOpSize(1, inputs);
StreamOperator<?> input = inputs[0];
// Process streaming table
Table processedStream = // ... streaming transformation logic
this.setOutput(processedStream);
return this;
}
}Specialized operators for converting Tables to operator chains.
/**
* Transform Table to batch source operator
* Entry point for batch operator chains
*/
public final class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
/** Create batch source from table */
public TableSourceBatchOp(Table table);
/** Not supported - throws UnsupportedOperationException */
public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
}/**
* Transform Table to stream source operator
* Entry point for stream operator chains
*/
public final class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
/** Create stream source from table */
public TableSourceStreamOp(Table table);
/** Not supported - throws UnsupportedOperationException */
public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
}Usage Examples:
import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
// Batch processing chain
Table batchData = // ... your batch table
BatchOperator<?> batchChain = new TableSourceBatchOp(batchData)
.link(new FeatureScaler())
.link(new LinearRegression())
.link(new ModelEvaluator());
// Stream processing chain
Table streamData = // ... your streaming table
StreamOperator<?> streamChain = new TableSourceStreamOp(streamData)
.link(new StreamingFeatureTransformer())
.link(new OnlinePredictor())
.link(new AlertSystem());Most common pattern where operators are linked in sequence:
// Simple linear chain
BatchOperator<?> result = source
.link(preprocessor)
.link(featureExtractor)
.link(classifier);Operators that consume multiple inputs:
public class JoinOperator extends BatchOperator<JoinOperator> {
@Override
public JoinOperator linkFrom(BatchOperator<?>... inputs) {
BatchOperator.checkOpSize(2, inputs); // Require exactly 2 inputs
BatchOperator<?> left = inputs[0];
BatchOperator<?> right = inputs[1];
// Join logic here
Table joined = // ... join left and right tables
this.setOutput(joined);
return this;
}
}
// Usage
BatchOperator<?> joined = new JoinOperator()
.linkFrom(leftOperator, rightOperator);Operators that produce multiple outputs:
public class SplitOperator extends BatchOperator<SplitOperator> {
@Override
public SplitOperator linkFrom(BatchOperator<?>... inputs) {
BatchOperator.checkOpSize(1, inputs);
Table input = inputs[0].getOutput();
// Split logic
Table mainOutput = // ... main result
Table[] sideOutputs = new Table[]{
// ... additional outputs
};
this.setOutput(mainOutput);
this.setSideOutputs(sideOutputs);
return this;
}
}
// Usage
SplitOperator splitter = new SplitOperator().linkFrom(source);
Table main = splitter.getOutput();
Table[] sides = splitter.getSideOutputs();All operators support parameter configuration through the WithParams interface:
public class ConfigurableOperator extends BatchOperator<ConfigurableOperator> {
// Parameter definitions
public static final ParamInfo<Integer> NUM_ITERATIONS = ParamInfoFactory
.createParamInfo("numIterations", Integer.class)
.setHasDefaultValue(10)
.build();
public static final ParamInfo<Double> LEARNING_RATE = ParamInfoFactory
.createParamInfo("learningRate", Double.class)
.setHasDefaultValue(0.01)
.build();
// Convenience methods
public ConfigurableOperator setNumIterations(int numIter) {
return set(NUM_ITERATIONS, numIter);
}
public int getNumIterations() {
return get(NUM_ITERATIONS);
}
@Override
public ConfigurableOperator linkFrom(BatchOperator<?>... inputs) {
// Use parameters in processing
int numIter = getNumIterations();
double lr = get(LEARNING_RATE);
// Processing logic using parameters
// ...
return this;
}
}
// Usage with parameters
BatchOperator<?> configured = new ConfigurableOperator()
.setNumIterations(20)
.set(ConfigurableOperator.LEARNING_RATE, 0.001)
.linkFrom(source);Algorithm operators can be integrated with the higher-level pipeline framework:
public class MyEstimatorFromOperator extends EstimatorBase<MyEstimatorFromOperator, MyModelFromOperator> {
@Override
protected MyModelFromOperator fit(BatchOperator input) {
// Use batch operators for training
BatchOperator<?> trained = input
.link(new FeaturePreprocessor())
.link(new TrainingOperator())
.link(new ModelExtractor());
// Extract model data
Table modelData = trained.getOutput();
return new MyModelFromOperator(this.getParams()).setModelData(modelData);
}
}
public class MyModelFromOperator extends ModelBase<MyModelFromOperator> {
@Override
protected BatchOperator transform(BatchOperator input) {
// Use batch operators for prediction
return input
.link(new FeaturePreprocessor())
.link(new PredictionOperator().setModelData(this.getModelData()));
}
@Override
protected StreamOperator transform(StreamOperator input) {
// Use stream operators for real-time prediction
return input
.link(new StreamFeaturePreprocessor())
.link(new StreamPredictionOperator().setModelData(this.getModelData()));
}
}This integration allows you to leverage the low-level operator framework within the high-level pipeline abstractions, providing flexibility for custom algorithm implementations while maintaining compatibility with the broader ML ecosystem.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-uber-2-11