Apache Flink Machine Learning Library providing linear algebra operations, statistical utilities, pipeline abstractions, and ML algorithms for both batch and stream processing in the Flink ecosystem
npx @tessl/cli install tessl/maven-org-apache-flink--flink-ml-lib_2-11@1.12.00
# Apache Flink ML Library
1
2
Apache Flink Machine Learning Library provides comprehensive machine learning capabilities for the Flink stream and batch processing framework. The library includes linear algebra operations, statistical utilities, pipeline abstractions, and ML algorithms optimized for distributed processing.
3
4
## Package Information
5
6
- **Package Name**: flink-ml-lib_2.11
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Version**: 1.12.7
10
- **Group ID**: org.apache.flink
11
- **Installation**: Add to pom.xml:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-ml-lib_2.11</artifactId>
16
<version>1.12.7</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.ml.common.MLEnvironment;
24
import org.apache.flink.ml.common.linalg.DenseVector;
25
import org.apache.flink.ml.common.linalg.DenseMatrix;
26
import org.apache.flink.ml.pipeline.EstimatorBase;
27
import org.apache.flink.ml.pipeline.TransformerBase;
28
```
29
30
## Basic Usage
31
32
```java
33
import org.apache.flink.ml.common.MLEnvironment;
34
import org.apache.flink.ml.common.linalg.DenseVector;
35
import org.apache.flink.ml.common.linalg.DenseMatrix;
36
import org.apache.flink.api.java.ExecutionEnvironment;
37
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
38
39
// Set up ML environment
40
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
41
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
42
MLEnvironment mlEnv = new MLEnvironment(env, tableEnv);
43
44
// Create and manipulate vectors
45
DenseVector vector1 = new DenseVector(new double[]{1.0, 2.0, 3.0});
46
DenseVector vector2 = new DenseVector(new double[]{4.0, 5.0, 6.0});
47
DenseVector result = vector1.plus(vector2);
48
49
// Create and manipulate matrices
50
DenseMatrix matrix = DenseMatrix.eye(3);
51
DenseVector matrixResult = matrix.multiplies(vector1);
52
53
// Use in ML pipeline
54
// EstimatorBase and TransformerBase provide foundation for ML algorithms
55
```
56
57
## Architecture
58
59
The Flink ML library is built around several key architectural components:
60
61
- **ML Environment**: Centralized execution context management for both batch and stream processing
62
- **Linear Algebra**: Comprehensive vector and matrix operations with BLAS support for high-performance computations
63
- **Pipeline Framework**: Estimator/Transformer pattern for building ML workflows with fit() and transform() operations
64
- **Operator System**: Algorithm operators for both batch and stream processing with consistent APIs
65
- **Parameter Management**: Type-safe parameter handling for configurable ML components
66
- **Mapper System**: Row-to-row transformation abstractions for data preprocessing
67
68
## Capabilities
69
70
### ML Environment Management
71
72
Core execution environment management providing centralized context for ML operations in both batch and streaming scenarios.
73
74
```java { .api }
75
public class MLEnvironment {
76
public MLEnvironment();
77
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv);
78
public MLEnvironment(StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
79
public ExecutionEnvironment getExecutionEnvironment();
80
public StreamExecutionEnvironment getStreamExecutionEnvironment();
81
public BatchTableEnvironment getBatchTableEnvironment();
82
public StreamTableEnvironment getStreamTableEnvironment();
83
}
84
85
public class MLEnvironmentFactory {
86
public static MLEnvironment get(Long mlEnvId);
87
public static MLEnvironment getDefault();
88
public static Long getNewMLEnvironmentId();
89
public static Long registerMLEnvironment(MLEnvironment env);
90
}
91
```
92
93
[ML Environment Management](./ml-environment.md)
94
95
### Linear Algebra Operations
96
97
Comprehensive linear algebra operations including dense and sparse vectors, dense matrices, and BLAS routines for high-performance mathematical computations.
98
99
```java { .api }
100
public abstract class Vector implements Serializable {
101
public abstract int size();
102
public abstract double get(int i);
103
public abstract void set(int i, double val);
104
public abstract double normL1();
105
public abstract double normL2();
106
public abstract Vector scale(double v);
107
public abstract Vector plus(Vector vec);
108
public abstract Vector minus(Vector vec);
109
public abstract double dot(Vector vec);
110
}
111
112
public class DenseVector extends Vector {
113
public DenseVector(double[] data);
114
public static DenseVector ones(int n);
115
public static DenseVector zeros(int n);
116
public static DenseVector rand(int n);
117
public double[] getData();
118
}
119
120
public class DenseMatrix implements Serializable {
121
public DenseMatrix(int m, int n);
122
public DenseMatrix(double[][] data);
123
public static DenseMatrix eye(int n);
124
public static DenseMatrix zeros(int m, int n);
125
public DenseVector multiplies(DenseVector x);
126
public DenseMatrix multiplies(DenseMatrix mat);
127
}
128
```
129
130
[Linear Algebra](./linear-algebra.md)
131
132
### ML Pipeline Framework
133
134
Estimator/Transformer pattern for building machine learning workflows with type-safe parameter management and support for both batch and stream processing.
135
136
```java { .api }
137
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
138
extends PipelineStageBase<E> implements Estimator<E, M> {
139
public M fit(Table input);
140
public M fit(TableEnvironment tEnv, Table input);
141
}
142
143
public abstract class TransformerBase<T extends TransformerBase<T>>
144
extends PipelineStageBase<T> implements Transformer<T> {
145
public Table transform(Table input);
146
public Table transform(TableEnvironment tEnv, Table input);
147
}
148
149
public abstract class ModelBase<M extends ModelBase<M>>
150
extends TransformerBase<M> implements Model<M> {
151
public Table getModelData();
152
public M setModelData(Table modelData);
153
}
154
```
155
156
[ML Pipeline](./ml-pipeline.md)
157
158
### Algorithm Operators
159
160
Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration.
161
162
```java { .api }
163
public abstract class AlgoOperator<T extends AlgoOperator<T>>
164
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
165
public Table getOutput();
166
public Table[] getSideOutputs();
167
public String[] getColNames();
168
public TypeInformation<?>[] getColTypes();
169
public TableSchema getSchema();
170
}
171
172
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
173
public <B extends BatchOperator<?>> B link(B next);
174
public abstract T linkFrom(BatchOperator<?>... inputs);
175
public static BatchOperator<?> fromTable(Table table);
176
}
177
```
178
179
[Algorithm Operators](./algorithm-operators.md)
180
181
### Table Utilities
182
183
Utility functions for table operations, column management, type checking, and data format conversion between Flink table types.
184
185
```java { .api }
186
public class TableUtil {
187
public static int findColIndex(String[] tableCols, String targetCol);
188
public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);
189
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
190
public static void assertSelectedColExist(String[] tableCols, String... selectedCols);
191
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
192
public static String[] getStringCols(TableSchema tableSchema);
193
public static String[] getNumericCols(TableSchema tableSchema);
194
}
195
196
public class VectorTypes {
197
public static final TypeInformation<DenseVector> DENSE_VECTOR;
198
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
199
public static final TypeInformation<Vector> VECTOR;
200
}
201
```
202
203
[Table Utilities](./table-utilities.md)
204
205
### Statistical Operations
206
207
Statistical utilities including multivariate Gaussian distributions for probabilistic machine learning applications.
208
209
```java { .api }
210
public class MultivariateGaussian {
211
public MultivariateGaussian(DenseVector mean, DenseMatrix cov);
212
public double pdf(Vector x);
213
public double logpdf(Vector x);
214
}
215
```
216
217
[Statistical Operations](./statistical-operations.md)
218
219
## Types
220
221
```java { .api }
222
// Parameter interfaces for column management
223
public interface HasMLEnvironmentId<T> extends WithParams<T> {
224
default Long getMLEnvironmentId();
225
default T setMLEnvironmentId(Long value);
226
}
227
228
public interface HasOutputCol<T> extends WithParams<T> {
229
default String getOutputCol();
230
default T setOutputCol(String value);
231
}
232
233
public interface HasSelectedCols<T> extends WithParams<T> {
234
default String[] getSelectedCols();
235
default T setSelectedCols(String[] value);
236
}
237
238
// Vector iterator interface
239
public interface VectorIterator extends Serializable {
240
boolean hasNext();
241
void next();
242
int getIndex();
243
double getValue();
244
}
245
246
// Mapper interfaces
247
public abstract class Mapper implements Serializable {
248
public Mapper(TableSchema dataSchema, Params params);
249
public abstract Row map(Row row);
250
public abstract TableSchema getOutputSchema();
251
}
252
253
// Model source interface
254
public interface ModelSource extends Serializable {
255
List<Row> getModelRows(RuntimeContext runtimeContext);
256
}
257
```