Apache Flink Machine Learning Library providing linear algebra operations, statistical utilities, pipeline abstractions, and ML algorithms for both batch and stream processing in the Flink ecosystem
npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml-lib_2-11@1.12.0Apache Flink Machine Learning Library provides comprehensive machine learning capabilities for the Flink stream and batch processing framework. The library includes linear algebra operations, statistical utilities, pipeline abstractions, and ML algorithms optimized for distributed processing.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml-lib_2.11</artifactId>
<version>1.12.7</version>
</dependency>import org.apache.flink.ml.common.MLEnvironment;
import org.apache.flink.ml.common.linalg.DenseVector;
import org.apache.flink.ml.common.linalg.DenseMatrix;
import org.apache.flink.ml.pipeline.EstimatorBase;
import org.apache.flink.ml.pipeline.TransformerBase;import org.apache.flink.ml.common.MLEnvironment;
import org.apache.flink.ml.common.linalg.DenseVector;
import org.apache.flink.ml.common.linalg.DenseMatrix;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
// Set up ML environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
MLEnvironment mlEnv = new MLEnvironment(env, tableEnv);
// Create and manipulate vectors
DenseVector vector1 = new DenseVector(new double[]{1.0, 2.0, 3.0});
DenseVector vector2 = new DenseVector(new double[]{4.0, 5.0, 6.0});
DenseVector result = vector1.plus(vector2);
// Create and manipulate matrices
DenseMatrix matrix = DenseMatrix.eye(3);
DenseVector matrixResult = matrix.multiplies(vector1);
// Use in ML pipeline
// EstimatorBase and TransformerBase provide foundation for ML algorithmsThe Flink ML library is built around several key architectural components:
Core execution environment management providing centralized context for ML operations in both batch and streaming scenarios.
public class MLEnvironment {
public MLEnvironment();
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv);
public MLEnvironment(StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
public ExecutionEnvironment getExecutionEnvironment();
public StreamExecutionEnvironment getStreamExecutionEnvironment();
public BatchTableEnvironment getBatchTableEnvironment();
public StreamTableEnvironment getStreamTableEnvironment();
}
public class MLEnvironmentFactory {
public static MLEnvironment get(Long mlEnvId);
public static MLEnvironment getDefault();
public static Long getNewMLEnvironmentId();
public static Long registerMLEnvironment(MLEnvironment env);
}Comprehensive linear algebra operations including dense and sparse vectors, dense matrices, and BLAS routines for high-performance mathematical computations.
public abstract class Vector implements Serializable {
public abstract int size();
public abstract double get(int i);
public abstract void set(int i, double val);
public abstract double normL1();
public abstract double normL2();
public abstract Vector scale(double v);
public abstract Vector plus(Vector vec);
public abstract Vector minus(Vector vec);
public abstract double dot(Vector vec);
}
public class DenseVector extends Vector {
public DenseVector(double[] data);
public static DenseVector ones(int n);
public static DenseVector zeros(int n);
public static DenseVector rand(int n);
public double[] getData();
}
public class DenseMatrix implements Serializable {
public DenseMatrix(int m, int n);
public DenseMatrix(double[][] data);
public static DenseMatrix eye(int n);
public static DenseMatrix zeros(int m, int n);
public DenseVector multiplies(DenseVector x);
public DenseMatrix multiplies(DenseMatrix mat);
}Estimator/Transformer pattern for building machine learning workflows with type-safe parameter management and support for both batch and stream processing.
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
extends PipelineStageBase<E> implements Estimator<E, M> {
public M fit(Table input);
public M fit(TableEnvironment tEnv, Table input);
}
public abstract class TransformerBase<T extends TransformerBase<T>>
extends PipelineStageBase<T> implements Transformer<T> {
public Table transform(Table input);
public Table transform(TableEnvironment tEnv, Table input);
}
public abstract class ModelBase<M extends ModelBase<M>>
extends TransformerBase<M> implements Model<M> {
public Table getModelData();
public M setModelData(Table modelData);
}Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration.
public abstract class AlgoOperator<T extends AlgoOperator<T>>
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
public Table getOutput();
public Table[] getSideOutputs();
public String[] getColNames();
public TypeInformation<?>[] getColTypes();
public TableSchema getSchema();
}
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
public <B extends BatchOperator<?>> B link(B next);
public abstract T linkFrom(BatchOperator<?>... inputs);
public static BatchOperator<?> fromTable(Table table);
}Utility functions for table operations, column management, type checking, and data format conversion between Flink table types.
public class TableUtil {
public static int findColIndex(String[] tableCols, String targetCol);
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
public static String[] getStringCols(TableSchema tableSchema);
public static String[] getNumericCols(TableSchema tableSchema);
}
public class VectorTypes {
public static final TypeInformation<DenseVector> DENSE_VECTOR;
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
public static final TypeInformation<Vector> VECTOR;
}Statistical utilities including multivariate Gaussian distributions for probabilistic machine learning applications.
public class MultivariateGaussian {
public MultivariateGaussian(DenseVector mean, DenseMatrix cov);
public double pdf(Vector x);
public double logpdf(Vector x);
}// Parameter interfaces for column management
public interface HasMLEnvironmentId<T> extends WithParams<T> {
default Long getMLEnvironmentId();
default T setMLEnvironmentId(Long value);
}
public interface HasOutputCol<T> extends WithParams<T> {
default String getOutputCol();
default T setOutputCol(String value);
}
public interface HasSelectedCols<T> extends WithParams<T> {
default String[] getSelectedCols();
default T setSelectedCols(String[] value);
}
// Vector iterator interface
public interface VectorIterator extends Serializable {
boolean hasNext();
void next();
int getIndex();
double getValue();
}
// Mapper interfaces
public abstract class Mapper implements Serializable {
public Mapper(TableSchema dataSchema, Params params);
public abstract Row map(Row row);
public abstract TableSchema getOutputSchema();
}
// Model source interface
public interface ModelSource extends Serializable {
List<Row> getModelRows(RuntimeContext runtimeContext);
}