or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-operators.mdindex.mdlinear-algebra.mdml-environment.mdml-pipeline.mdstatistical-operations.mdtable-utilities.md

index.mddocs/

0

# Apache Flink ML Library

1

2

Apache Flink Machine Learning Library provides comprehensive machine learning capabilities for the Flink stream and batch processing framework. The library includes linear algebra operations, statistical utilities, pipeline abstractions, and ML algorithms optimized for distributed processing.

3

4

## Package Information

5

6

- **Package Name**: flink-ml-lib_2.11

7

- **Package Type**: Maven

8

- **Language**: Java

9

- **Version**: 1.12.7

10

- **Group ID**: org.apache.flink

11

- **Installation**: Add to pom.xml:

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-ml-lib_2.11</artifactId>

16

<version>1.12.7</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import org.apache.flink.ml.common.MLEnvironment;

24

import org.apache.flink.ml.common.linalg.DenseVector;

25

import org.apache.flink.ml.common.linalg.DenseMatrix;

26

import org.apache.flink.ml.pipeline.EstimatorBase;

27

import org.apache.flink.ml.pipeline.TransformerBase;

28

```

29

30

## Basic Usage

31

32

```java

33

import org.apache.flink.ml.common.MLEnvironment;

34

import org.apache.flink.ml.common.linalg.DenseVector;

35

import org.apache.flink.ml.common.linalg.DenseMatrix;

36

import org.apache.flink.api.java.ExecutionEnvironment;

37

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

38

39

// Set up ML environment

40

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

41

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

42

MLEnvironment mlEnv = new MLEnvironment(env, tableEnv);

43

44

// Create and manipulate vectors

45

DenseVector vector1 = new DenseVector(new double[]{1.0, 2.0, 3.0});

46

DenseVector vector2 = new DenseVector(new double[]{4.0, 5.0, 6.0});

47

DenseVector result = vector1.plus(vector2);

48

49

// Create and manipulate matrices

50

DenseMatrix matrix = DenseMatrix.eye(3);

51

DenseVector matrixResult = matrix.multiplies(vector1);

52

53

// Use in ML pipeline

54

// EstimatorBase and TransformerBase provide foundation for ML algorithms

55

```

56

57

## Architecture

58

59

The Flink ML library is built around several key architectural components:

60

61

- **ML Environment**: Centralized execution context management for both batch and stream processing

62

- **Linear Algebra**: Comprehensive vector and matrix operations with BLAS support for high-performance computations

63

- **Pipeline Framework**: Estimator/Transformer pattern for building ML workflows with fit() and transform() operations

64

- **Operator System**: Algorithm operators for both batch and stream processing with consistent APIs

65

- **Parameter Management**: Type-safe parameter handling for configurable ML components

66

- **Mapper System**: Row-to-row transformation abstractions for data preprocessing

67

68

## Capabilities

69

70

### ML Environment Management

71

72

Core execution environment management providing centralized context for ML operations in both batch and streaming scenarios.

73

74

```java { .api }

75

public class MLEnvironment {

76

public MLEnvironment();

77

public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv);

78

public MLEnvironment(StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);

79

public ExecutionEnvironment getExecutionEnvironment();

80

public StreamExecutionEnvironment getStreamExecutionEnvironment();

81

public BatchTableEnvironment getBatchTableEnvironment();

82

public StreamTableEnvironment getStreamTableEnvironment();

83

}

84

85

public class MLEnvironmentFactory {

86

public static MLEnvironment get(Long mlEnvId);

87

public static MLEnvironment getDefault();

88

public static Long getNewMLEnvironmentId();

89

public static Long registerMLEnvironment(MLEnvironment env);

90

}

91

```

92

93

[ML Environment Management](./ml-environment.md)

94

95

### Linear Algebra Operations

96

97

Comprehensive linear algebra operations including dense and sparse vectors, dense matrices, and BLAS routines for high-performance mathematical computations.

98

99

```java { .api }

100

public abstract class Vector implements Serializable {

101

public abstract int size();

102

public abstract double get(int i);

103

public abstract void set(int i, double val);

104

public abstract double normL1();

105

public abstract double normL2();

106

public abstract Vector scale(double v);

107

public abstract Vector plus(Vector vec);

108

public abstract Vector minus(Vector vec);

109

public abstract double dot(Vector vec);

110

}

111

112

public class DenseVector extends Vector {

113

public DenseVector(double[] data);

114

public static DenseVector ones(int n);

115

public static DenseVector zeros(int n);

116

public static DenseVector rand(int n);

117

public double[] getData();

118

}

119

120

public class DenseMatrix implements Serializable {

121

public DenseMatrix(int m, int n);

122

public DenseMatrix(double[][] data);

123

public static DenseMatrix eye(int n);

124

public static DenseMatrix zeros(int m, int n);

125

public DenseVector multiplies(DenseVector x);

126

public DenseMatrix multiplies(DenseMatrix mat);

127

}

128

```

129

130

[Linear Algebra](./linear-algebra.md)

131

132

### ML Pipeline Framework

133

134

Estimator/Transformer pattern for building machine learning workflows with type-safe parameter management and support for both batch and stream processing.

135

136

```java { .api }

137

public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>

138

extends PipelineStageBase<E> implements Estimator<E, M> {

139

public M fit(Table input);

140

public M fit(TableEnvironment tEnv, Table input);

141

}

142

143

public abstract class TransformerBase<T extends TransformerBase<T>>

144

extends PipelineStageBase<T> implements Transformer<T> {

145

public Table transform(Table input);

146

public Table transform(TableEnvironment tEnv, Table input);

147

}

148

149

public abstract class ModelBase<M extends ModelBase<M>>

150

extends TransformerBase<M> implements Model<M> {

151

public Table getModelData();

152

public M setModelData(Table modelData);

153

}

154

```

155

156

[ML Pipeline](./ml-pipeline.md)

157

158

### Algorithm Operators

159

160

Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration.

161

162

```java { .api }

163

public abstract class AlgoOperator<T extends AlgoOperator<T>>

164

implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {

165

public Table getOutput();

166

public Table[] getSideOutputs();

167

public String[] getColNames();

168

public TypeInformation<?>[] getColTypes();

169

public TableSchema getSchema();

170

}

171

172

public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {

173

public <B extends BatchOperator<?>> B link(B next);

174

public abstract T linkFrom(BatchOperator<?>... inputs);

175

public static BatchOperator<?> fromTable(Table table);

176

}

177

```

178

179

[Algorithm Operators](./algorithm-operators.md)

180

181

### Table Utilities

182

183

Utility functions for table operations, column management, type checking, and data format conversion between Flink table types.

184

185

```java { .api }

186

public class TableUtil {

187

public static int findColIndex(String[] tableCols, String targetCol);

188

public static int[] findColIndices(TableSchema tableSchema, String[] targetCols);

189

public static TypeInformation<?> findColType(TableSchema tableSchema, String targetCol);

190

public static void assertSelectedColExist(String[] tableCols, String... selectedCols);

191

public static void assertNumericalCols(TableSchema tableSchema, String... selectedCols);

192

public static String[] getStringCols(TableSchema tableSchema);

193

public static String[] getNumericCols(TableSchema tableSchema);

194

}

195

196

public class VectorTypes {

197

public static final TypeInformation<DenseVector> DENSE_VECTOR;

198

public static final TypeInformation<SparseVector> SPARSE_VECTOR;

199

public static final TypeInformation<Vector> VECTOR;

200

}

201

```

202

203

[Table Utilities](./table-utilities.md)

204

205

### Statistical Operations

206

207

Statistical utilities including multivariate Gaussian distributions for probabilistic machine learning applications.

208

209

```java { .api }

210

public class MultivariateGaussian {

211

public MultivariateGaussian(DenseVector mean, DenseMatrix cov);

212

public double pdf(Vector x);

213

public double logpdf(Vector x);

214

}

215

```

216

217

[Statistical Operations](./statistical-operations.md)

218

219

## Types

220

221

```java { .api }

222

// Parameter interfaces for column management

223

public interface HasMLEnvironmentId<T> extends WithParams<T> {

224

default Long getMLEnvironmentId();

225

default T setMLEnvironmentId(Long value);

226

}

227

228

public interface HasOutputCol<T> extends WithParams<T> {

229

default String getOutputCol();

230

default T setOutputCol(String value);

231

}

232

233

public interface HasSelectedCols<T> extends WithParams<T> {

234

default String[] getSelectedCols();

235

default T setSelectedCols(String[] value);

236

}

237

238

// Vector iterator interface

239

public interface VectorIterator extends Serializable {

240

boolean hasNext();

241

void next();

242

int getIndex();

243

double getValue();

244

}

245

246

// Mapper interfaces

247

public abstract class Mapper implements Serializable {

248

public Mapper(TableSchema dataSchema, Params params);

249

public abstract Row map(Row row);

250

public abstract TableSchema getOutputSchema();

251

}

252

253

// Model source interface

254

public interface ModelSource extends Serializable {

255

List<Row> getModelRows(RuntimeContext runtimeContext);

256

}

257

```