Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.
—
Core ML pipeline abstractions for building, training, and deploying machine learning workflows. The pipeline framework provides type-safe composition of estimators and transformers with support for serialization and complex workflow management.
The main pipeline orchestration class that combines estimators and transformers into executable workflows.
/**
* Linear workflow for combining estimators and transformers
* Supports both training (fit) and transformation (transform) operations
*/
public final class Pipeline implements Estimator<Pipeline, Pipeline>,
Transformer<Pipeline>,
Model<Pipeline> {
/** Create empty pipeline */
public Pipeline();
/** Create pipeline from JSON representation */
public Pipeline(String pipelineJson);
/** Create pipeline from list of stages */
public Pipeline(List<PipelineStage> stages);
/** Add a stage to the end of the pipeline */
public Pipeline appendStage(PipelineStage stage);
/** Get immutable list of all pipeline stages */
public List<PipelineStage> getStages();
/** Check if pipeline contains any estimators that need training */
public boolean needFit();
/** Get pipeline parameters configuration */
public Params getParams();
/** Train pipeline on input data, producing trained pipeline */
public Pipeline fit(TableEnvironment tEnv, Table input);
/** Apply pipeline transformation to input data */
public Table transform(TableEnvironment tEnv, Table input);
/** Serialize pipeline to JSON string */
public String toJson();
/** Load pipeline from JSON string */
public void loadJson(String json);
}Usage Examples:
import org.apache.flink.ml.api.core.Pipeline;
import org.apache.flink.table.api.Table;
// Create and build pipeline
Pipeline pipeline = new Pipeline()
.appendStage(new FeatureNormalizer())
.appendStage(new LinearRegression())
.appendStage(new PredictionTransformer());
// Check if training is needed
if (pipeline.needFit()) {
// Train the pipeline
Pipeline trainedPipeline = pipeline.fit(tableEnv, trainingData);
// Apply to new data
Table predictions = trainedPipeline.transform(tableEnv, testData);
}
// Serialize pipeline
String pipelineJson = pipeline.toJson();
// Load pipeline from JSON
Pipeline loadedPipeline = new Pipeline(pipelineJson);Base interface for all components that can be included in a pipeline.
/**
* Base interface for pipeline components
* All estimators, transformers, and models implement this interface
*/
public interface PipelineStage {
/** Serialize stage to JSON representation */
String toJson();
/** Load stage configuration from JSON */
void loadJson(String json);
}Interface for ML components that can be trained on data to produce models.
/**
* Machine learning estimators that train models from data
* @param <E> The concrete estimator type
* @param <M> The model type produced by this estimator
*/
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>>
extends PipelineStage {
/** Train estimator on input data and produce a trained model */
M fit(TableEnvironment tEnv, Table input);
}Usage Example:
public class MyLinearRegression implements Estimator<MyLinearRegression, MyLinearModel> {
@Override
public MyLinearModel fit(TableEnvironment tEnv, Table input) {
// Training logic here
return new MyLinearModel(/* trained parameters */);
}
}Interface for components that transform data without requiring training.
/**
* Data transformation components that modify input data
* @param <T> The concrete transformer type
*/
public interface Transformer<T extends Transformer<T>> extends PipelineStage {
/** Apply transformation to input data */
Table transform(TableEnvironment tEnv, Table input);
}Usage Example:
public class FeatureNormalizer implements Transformer<FeatureNormalizer> {
@Override
public Table transform(TableEnvironment tEnv, Table input) {
// Normalization logic here
return normalizedTable;
}
}Interface for trained machine learning models that can transform data.
/**
* Trained machine learning models
* Models are transformers that have been produced by training an estimator
* @param <M> The concrete model type
*/
public interface Model<M extends Model<M>> extends Transformer<M> {
// Inherits transform() method from Transformer
// Additional model-specific functionality can be added here
}Usage Example:
public class MyLinearModel implements Model<MyLinearModel> {
private DenseVector weights;
private double bias;
public MyLinearModel(DenseVector weights, double bias) {
this.weights = weights;
this.bias = bias;
}
@Override
public Table transform(TableEnvironment tEnv, Table input) {
// Apply model to make predictions
return predictionsTable;
}
}The pipeline framework follows a specific execution pattern:
needFit() to determine if training is requiredfit() to train estimators and produce modelstransform() to apply the pipeline to data// Example execution flow
Pipeline pipeline = new Pipeline()
.appendStage(preprocessor) // Transformer - no training needed
.appendStage(featureSelector) // Estimator - needs training
.appendStage(classifier); // Estimator - needs training
// Check if training needed (true, because of estimators)
boolean needsTraining = pipeline.needFit();
if (needsTraining) {
// This will:
// 1. Apply preprocessor transform
// 2. Train featureSelector on preprocessed data
// 3. Apply featureSelector transform
// 4. Train classifier on selected features
Pipeline trainedPipeline = pipeline.fit(tEnv, trainingData);
// Now apply full trained pipeline
Table results = trainedPipeline.transform(tEnv, newData);
}The pipeline framework uses generics to maintain type safety:
Estimator<E, M>: Ensures estimators produce the correct model typeTransformer<T>: Enables method chaining with correct return typesModel<M>: Models maintain their specific type informationThis prevents runtime errors and enables better IDE support and refactoring capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-uber-2-11