Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml-uber-2-11@1.12.0Apache Flink ML is a comprehensive machine learning library for Apache Flink that enables developers to build and deploy ML pipelines on Flink's distributed stream processing platform. It combines both the ML API and implementation libraries into a single uber jar, offering a complete toolkit for machine learning workflows including data preprocessing, feature engineering, model training, and inference capabilities.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml-uber_2.11</artifactId>
<version>1.12.7</version>
</dependency>import org.apache.flink.ml.api.core.Pipeline;
import org.apache.flink.ml.api.core.Estimator;
import org.apache.flink.ml.api.core.Transformer;
import org.apache.flink.ml.api.core.Model;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.ml.common.linalg.DenseVector;
import org.apache.flink.ml.common.linalg.DenseMatrix;
import org.apache.flink.ml.common.MLEnvironment;
import org.apache.flink.ml.pipeline.EstimatorBase;
import org.apache.flink.ml.pipeline.TransformerBase;import org.apache.flink.ml.common.MLEnvironmentFactory;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
// Create ML environment
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
TableEnvironment tEnv = mlEnv.getBatchTableEnvironment();
// Create a pipeline with estimators and transformers
Pipeline pipeline = new Pipeline()
.appendStage(new MyFeatureTransformer())
.appendStage(new MyModelEstimator());
// Train the pipeline on data
Table trainingData = // ... your training data table
Pipeline trainedPipeline = pipeline.fit(tEnv, trainingData);
// Apply trained pipeline to new data
Table newData = // ... new data to transform
Table predictions = trainedPipeline.transform(tEnv, newData);Apache Flink ML is built around several key architectural components:
Estimator, Transformer, Model, Pipeline) for building ML workflowsCore ML pipeline abstractions for building, training, and deploying machine learning workflows. Provides type-safe composition of estimators and transformers.
public final class Pipeline implements Estimator<Pipeline, Pipeline>,
Transformer<Pipeline>,
Model<Pipeline> {
public Pipeline appendStage(PipelineStage stage);
public List<PipelineStage> getStages();
public boolean needFit();
public Pipeline fit(TableEnvironment tEnv, Table input);
public Table transform(TableEnvironment tEnv, Table input);
}Type-safe parameter system with validation, default values, and JSON serialization support. Essential for configuring ML algorithms and components.
public class Params implements Serializable, Cloneable {
public <V> V get(ParamInfo<V> info);
public <V> Params set(ParamInfo<V> info, V value);
public <V> boolean contains(ParamInfo<V> info);
public String toJson();
public void loadJson(String json);
public Params merge(Params otherParams);
public Params clone();
}
public class ParamInfo<V> {
public String getName();
public String getDescription();
public boolean isOptional();
public V getDefaultValue();
public ParamValidator<V> getValidator();
}Comprehensive linear algebra library with dense and sparse vectors, matrices, and BLAS operations. Essential for numerical computations in ML algorithms.
public abstract class Vector implements Serializable {
public abstract int size();
public abstract double get(int i);
public abstract void set(int i, double val);
public abstract double dot(Vector vec);
public abstract Vector plus(Vector vec);
public abstract Vector scale(double v);
public abstract double normL2();
}
public class DenseMatrix implements Serializable {
public double get(int i, int j);
public void set(int i, int j, double s);
public DenseMatrix transpose();
public DenseMatrix multiplies(DenseMatrix mat);
public DenseVector multiplies(DenseVector x);
}Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API.
public abstract class BatchOperator<T extends BatchOperator<T>>
extends AlgoOperator<T> {
public <B extends BatchOperator<?>> B link(B next);
public abstract T linkFrom(BatchOperator<?>... inputs);
public static BatchOperator<?> fromTable(Table table);
}
public abstract class StreamOperator<T extends StreamOperator<T>>
extends AlgoOperator<T> {
public <S extends StreamOperator<?>> S link(S next);
public abstract T linkFrom(StreamOperator<?>... inputs);
public static StreamOperator<?> fromTable(Table table);
}ML execution context management for Flink batch and stream environments. Provides centralized access to execution contexts and table environments.
public class MLEnvironment {
public ExecutionEnvironment getExecutionEnvironment();
public StreamExecutionEnvironment getStreamExecutionEnvironment();
public BatchTableEnvironment getBatchTableEnvironment();
public StreamTableEnvironment getStreamTableEnvironment();
}
public class MLEnvironmentFactory {
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
public static MLEnvironment get(Long mlEnvId);
public static MLEnvironment getDefault();
public static Long registerMLEnvironment(MLEnvironment env);
}Abstract base classes for implementing custom estimators, transformers, and models. Provides common functionality and integration patterns.
public abstract class EstimatorBase<E extends EstimatorBase<E, M>,
M extends ModelBase<M>>
extends PipelineStageBase<E>
implements Estimator<E, M> {
public M fit(TableEnvironment tEnv, Table input);
public M fit(Table input);
protected abstract M fit(BatchOperator input);
}
public abstract class TransformerBase<T extends TransformerBase<T>>
extends PipelineStageBase<T>
implements Transformer<T> {
public Table transform(TableEnvironment tEnv, Table input);
public Table transform(Table input);
protected abstract BatchOperator transform(BatchOperator input);
protected abstract StreamOperator transform(StreamOperator input);
}Comprehensive utility libraries for table operations, vector parsing, data conversion, and statistical operations.
public class TableUtil {
public static int findColIndex(TableSchema tableSchema, String targetCol);
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
public static String format(String[] colNames, List<Row> data);
}
public class VectorUtil {
public static Vector parse(String str);
public static String toString(Vector vector);
public static DenseVector parseDense(String str);
public static SparseVector parseSparse(String str);
}public interface PipelineStage {
String toJson();
void loadJson(String json);
}
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>>
extends PipelineStage {
M fit(TableEnvironment tEnv, Table input);
}
public interface Transformer<T extends Transformer<T>> extends PipelineStage {
Table transform(TableEnvironment tEnv, Table input);
}
public interface Model<M extends Model<M>> extends Transformer<M> {
}
public interface WithParams<T> {
Params getParams();
<V> T set(ParamInfo<V> info, V value);
<V> V get(ParamInfo<V> info);
}public interface ParamValidator<V> extends Serializable {
boolean validate(V value);
}
public static class ParamInfoBuilder<V> {
public ParamInfoBuilder<V> setDescription(String description);
public ParamInfoBuilder<V> setOptional();
public ParamInfoBuilder<V> setHasDefaultValue(V defaultValue);
public ParamInfoBuilder<V> setValidator(ParamValidator<V> validator);
public ParamInfo<V> build();
}public interface VectorIterator extends Serializable {
boolean hasNext();
void next();
int getIndex();
double getValue();
}
public class VectorTypes {
public static final TypeInformation<DenseVector> DENSE_VECTOR;
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
public static final TypeInformation<Vector> VECTOR;
}