0
# Apache Flink ML Library
1
2
Apache Flink ML is a comprehensive machine learning library for Apache Flink that enables developers to build and deploy ML pipelines on Flink's distributed stream processing platform. It combines both the ML API and implementation libraries into a single uber jar, offering a complete toolkit for machine learning workflows including data preprocessing, feature engineering, model training, and inference capabilities.
3
4
## Package Information
5
6
- **Package Name**: flink-ml-uber_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-ml-uber_2.11</artifactId>
15
<version>1.12.7</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
import org.apache.flink.ml.api.core.Pipeline;
23
import org.apache.flink.ml.api.core.Estimator;
24
import org.apache.flink.ml.api.core.Transformer;
25
import org.apache.flink.ml.api.core.Model;
26
import org.apache.flink.ml.api.misc.param.Params;
27
import org.apache.flink.ml.common.linalg.DenseVector;
28
import org.apache.flink.ml.common.linalg.DenseMatrix;
29
import org.apache.flink.ml.common.MLEnvironment;
30
import org.apache.flink.ml.pipeline.EstimatorBase;
31
import org.apache.flink.ml.pipeline.TransformerBase;
32
```
33
34
## Basic Usage
35
36
```java
37
import org.apache.flink.ml.common.MLEnvironmentFactory;
38
import org.apache.flink.table.api.Table;
39
import org.apache.flink.table.api.TableEnvironment;
40
41
// Create ML environment
42
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
43
TableEnvironment tEnv = mlEnv.getBatchTableEnvironment();
44
45
// Create a pipeline with estimators and transformers
46
Pipeline pipeline = new Pipeline()
47
.appendStage(new MyFeatureTransformer())
48
.appendStage(new MyModelEstimator());
49
50
// Train the pipeline on data
51
Table trainingData = // ... your training data table
52
Pipeline trainedPipeline = pipeline.fit(tEnv, trainingData);
53
54
// Apply trained pipeline to new data
55
Table newData = // ... new data to transform
56
Table predictions = trainedPipeline.transform(tEnv, newData);
57
```
58
59
## Architecture
60
61
Apache Flink ML is built around several key architectural components:
62
63
- **Pipeline Framework**: Core abstractions (`Estimator`, `Transformer`, `Model`, `Pipeline`) for building ML workflows
64
- **Parameter System**: Type-safe parameter management with validation and JSON serialization
65
- **Linear Algebra Engine**: Comprehensive vector and matrix operations with native BLAS integration
66
- **Operator Framework**: Batch and stream processing operators with linking capabilities
67
- **Environment Management**: ML execution context management for Flink environments
68
- **Mapper Framework**: Row-wise transformation abstractions for data processing
69
70
## Capabilities
71
72
### Pipeline Management
73
74
Core ML pipeline abstractions for building, training, and deploying machine learning workflows. Provides type-safe composition of estimators and transformers.
75
76
```java { .api }
77
public final class Pipeline implements Estimator<Pipeline, Pipeline>,
78
Transformer<Pipeline>,
79
Model<Pipeline> {
80
public Pipeline appendStage(PipelineStage stage);
81
public List<PipelineStage> getStages();
82
public boolean needFit();
83
public Pipeline fit(TableEnvironment tEnv, Table input);
84
public Table transform(TableEnvironment tEnv, Table input);
85
}
86
```
87
88
[Pipeline Framework](./pipeline-framework.md)
89
90
### Parameter Management
91
92
Type-safe parameter system with validation, default values, and JSON serialization support. Essential for configuring ML algorithms and components.
93
94
```java { .api }
95
public class Params implements Serializable, Cloneable {
96
public <V> V get(ParamInfo<V> info);
97
public <V> Params set(ParamInfo<V> info, V value);
98
public <V> boolean contains(ParamInfo<V> info);
99
public String toJson();
100
public void loadJson(String json);
101
public Params merge(Params otherParams);
102
public Params clone();
103
}
104
105
public class ParamInfo<V> {
106
public String getName();
107
public String getDescription();
108
public boolean isOptional();
109
public V getDefaultValue();
110
public ParamValidator<V> getValidator();
111
}
112
```
113
114
[Parameter System](./parameter-system.md)
115
116
### Linear Algebra Operations
117
118
Comprehensive linear algebra library with dense and sparse vectors, matrices, and BLAS operations. Essential for numerical computations in ML algorithms.
119
120
```java { .api }
121
public abstract class Vector implements Serializable {
122
public abstract int size();
123
public abstract double get(int i);
124
public abstract void set(int i, double val);
125
public abstract double dot(Vector vec);
126
public abstract Vector plus(Vector vec);
127
public abstract Vector scale(double v);
128
public abstract double normL2();
129
}
130
131
public class DenseMatrix implements Serializable {
132
public double get(int i, int j);
133
public void set(int i, int j, double s);
134
public DenseMatrix transpose();
135
public DenseMatrix multiplies(DenseMatrix mat);
136
public DenseVector multiplies(DenseVector x);
137
}
138
```
139
140
[Linear Algebra](./linear-algebra.md)
141
142
### Algorithm Operators
143
144
Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API.
145
146
```java { .api }
147
public abstract class BatchOperator<T extends BatchOperator<T>>
148
extends AlgoOperator<T> {
149
public <B extends BatchOperator<?>> B link(B next);
150
public abstract T linkFrom(BatchOperator<?>... inputs);
151
public static BatchOperator<?> fromTable(Table table);
152
}
153
154
public abstract class StreamOperator<T extends StreamOperator<T>>
155
extends AlgoOperator<T> {
156
public <S extends StreamOperator<?>> S link(S next);
157
public abstract T linkFrom(StreamOperator<?>... inputs);
158
public static StreamOperator<?> fromTable(Table table);
159
}
160
```
161
162
[Algorithm Operators](./algorithm-operators.md)
163
164
### Environment Management
165
166
ML execution context management for Flink batch and stream environments. Provides centralized access to execution contexts and table environments.
167
168
```java { .api }
169
public class MLEnvironment {
170
public ExecutionEnvironment getExecutionEnvironment();
171
public StreamExecutionEnvironment getStreamExecutionEnvironment();
172
public BatchTableEnvironment getBatchTableEnvironment();
173
public StreamTableEnvironment getStreamTableEnvironment();
174
}
175
176
public class MLEnvironmentFactory {
177
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
178
public static MLEnvironment get(Long mlEnvId);
179
public static MLEnvironment getDefault();
180
public static Long registerMLEnvironment(MLEnvironment env);
181
}
182
```
183
184
[Environment Management](./environment-management.md)
185
186
### Pipeline Base Classes
187
188
Abstract base classes for implementing custom estimators, transformers, and models. Provides common functionality and integration patterns.
189
190
```java { .api }
191
public abstract class EstimatorBase<E extends EstimatorBase<E, M>,
192
M extends ModelBase<M>>
193
extends PipelineStageBase<E>
194
implements Estimator<E, M> {
195
public M fit(TableEnvironment tEnv, Table input);
196
public M fit(Table input);
197
protected abstract M fit(BatchOperator input);
198
}
199
200
public abstract class TransformerBase<T extends TransformerBase<T>>
201
extends PipelineStageBase<T>
202
implements Transformer<T> {
203
public Table transform(TableEnvironment tEnv, Table input);
204
public Table transform(Table input);
205
protected abstract BatchOperator transform(BatchOperator input);
206
protected abstract StreamOperator transform(StreamOperator input);
207
}
208
```
209
210
[Pipeline Base Classes](./pipeline-base-classes.md)
211
212
### Utility Libraries
213
214
Comprehensive utility libraries for table operations, vector parsing, data conversion, and statistical operations.
215
216
```java { .api }
217
public class TableUtil {
218
public static int findColIndex(TableSchema tableSchema, String targetCol);
219
public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);
220
public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);
221
public static String format(String[] colNames, List<Row> data);
222
}
223
224
public class VectorUtil {
225
public static Vector parse(String str);
226
public static String toString(Vector vector);
227
public static DenseVector parseDense(String str);
228
public static SparseVector parseSparse(String str);
229
}
230
```
231
232
[Utility Libraries](./utility-libraries.md)
233
234
## Types
235
236
### Core Interfaces
237
238
```java { .api }
239
public interface PipelineStage {
240
String toJson();
241
void loadJson(String json);
242
}
243
244
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>>
245
extends PipelineStage {
246
M fit(TableEnvironment tEnv, Table input);
247
}
248
249
public interface Transformer<T extends Transformer<T>> extends PipelineStage {
250
Table transform(TableEnvironment tEnv, Table input);
251
}
252
253
public interface Model<M extends Model<M>> extends Transformer<M> {
254
}
255
256
public interface WithParams<T> {
257
Params getParams();
258
<V> T set(ParamInfo<V> info, V value);
259
<V> V get(ParamInfo<V> info);
260
}
261
```
262
263
### Parameter Types
264
265
```java { .api }
266
public interface ParamValidator<V> extends Serializable {
267
boolean validate(V value);
268
}
269
270
public static class ParamInfoBuilder<V> {
271
public ParamInfoBuilder<V> setDescription(String description);
272
public ParamInfoBuilder<V> setOptional();
273
public ParamInfoBuilder<V> setHasDefaultValue(V defaultValue);
274
public ParamInfoBuilder<V> setValidator(ParamValidator<V> validator);
275
public ParamInfo<V> build();
276
}
277
```
278
279
### Linear Algebra Types
280
281
```java { .api }
282
public interface VectorIterator extends Serializable {
283
boolean hasNext();
284
void next();
285
int getIndex();
286
double getValue();
287
}
288
289
public class VectorTypes {
290
public static final TypeInformation<DenseVector> DENSE_VECTOR;
291
public static final TypeInformation<SparseVector> SPARSE_VECTOR;
292
public static final TypeInformation<Vector> VECTOR;
293
}
294
```