0
# Pipeline Framework
1
2
Core ML pipeline abstractions for building, training, and deploying machine learning workflows. The pipeline framework provides type-safe composition of estimators and transformers with support for serialization and complex workflow management.
3
4
## Capabilities
5
6
### Pipeline Class
7
8
The main pipeline orchestration class that combines estimators and transformers into executable workflows.
9
10
```java { .api }
11
/**
12
* Linear workflow for combining estimators and transformers
13
* Supports both training (fit) and transformation (transform) operations
14
*/
15
public final class Pipeline implements Estimator<Pipeline, Pipeline>,
16
Transformer<Pipeline>,
17
Model<Pipeline> {
18
19
/** Create empty pipeline */
20
public Pipeline();
21
22
/** Create pipeline from JSON representation */
23
public Pipeline(String pipelineJson);
24
25
/** Create pipeline from list of stages */
26
public Pipeline(List<PipelineStage> stages);
27
28
/** Add a stage to the end of the pipeline */
29
public Pipeline appendStage(PipelineStage stage);
30
31
/** Get immutable list of all pipeline stages */
32
public List<PipelineStage> getStages();
33
34
/** Check if pipeline contains any estimators that need training */
35
public boolean needFit();
36
37
/** Get pipeline parameters configuration */
38
public Params getParams();
39
40
/** Train pipeline on input data, producing trained pipeline */
41
public Pipeline fit(TableEnvironment tEnv, Table input);
42
43
/** Apply pipeline transformation to input data */
44
public Table transform(TableEnvironment tEnv, Table input);
45
46
/** Serialize pipeline to JSON string */
47
public String toJson();
48
49
/** Load pipeline from JSON string */
50
public void loadJson(String json);
51
}
52
```
53
54
**Usage Examples:**
55
56
```java
57
import org.apache.flink.ml.api.core.Pipeline;
58
import org.apache.flink.table.api.Table;
59
60
// Create and build pipeline
61
Pipeline pipeline = new Pipeline()
62
.appendStage(new FeatureNormalizer())
63
.appendStage(new LinearRegression())
64
.appendStage(new PredictionTransformer());
65
66
// Check if training is needed
67
if (pipeline.needFit()) {
68
// Train the pipeline
69
Pipeline trainedPipeline = pipeline.fit(tableEnv, trainingData);
70
71
// Apply to new data
72
Table predictions = trainedPipeline.transform(tableEnv, testData);
73
}
74
75
// Serialize pipeline
76
String pipelineJson = pipeline.toJson();
77
78
// Load pipeline from JSON
79
Pipeline loadedPipeline = new Pipeline(pipelineJson);
80
```
81
82
### PipelineStage Interface
83
84
Base interface for all components that can be included in a pipeline.
85
86
```java { .api }
87
/**
88
* Base interface for pipeline components
89
* All estimators, transformers, and models implement this interface
90
*/
91
public interface PipelineStage {
92
/** Serialize stage to JSON representation */
93
String toJson();
94
95
/** Load stage configuration from JSON */
96
void loadJson(String json);
97
}
98
```
99
100
### Estimator Interface
101
102
Interface for ML components that can be trained on data to produce models.
103
104
```java { .api }
105
/**
106
* Machine learning estimators that train models from data
107
* @param <E> The concrete estimator type
108
* @param <M> The model type produced by this estimator
109
*/
110
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>>
111
extends PipelineStage {
112
113
/** Train estimator on input data and produce a trained model */
114
M fit(TableEnvironment tEnv, Table input);
115
}
116
```
117
118
**Usage Example:**
119
120
```java
121
public class MyLinearRegression implements Estimator<MyLinearRegression, MyLinearModel> {
122
@Override
123
public MyLinearModel fit(TableEnvironment tEnv, Table input) {
124
// Training logic here
125
return new MyLinearModel(/* trained parameters */);
126
}
127
}
128
```
129
130
### Transformer Interface
131
132
Interface for components that transform data without requiring training.
133
134
```java { .api }
135
/**
136
* Data transformation components that modify input data
137
* @param <T> The concrete transformer type
138
*/
139
public interface Transformer<T extends Transformer<T>> extends PipelineStage {
140
141
/** Apply transformation to input data */
142
Table transform(TableEnvironment tEnv, Table input);
143
}
144
```
145
146
**Usage Example:**
147
148
```java
149
public class FeatureNormalizer implements Transformer<FeatureNormalizer> {
150
@Override
151
public Table transform(TableEnvironment tEnv, Table input) {
152
// Normalization logic here
153
return normalizedTable;
154
}
155
}
156
```
157
158
### Model Interface
159
160
Interface for trained machine learning models that can transform data.
161
162
```java { .api }
163
/**
164
* Trained machine learning models
165
* Models are transformers that have been produced by training an estimator
166
* @param <M> The concrete model type
167
*/
168
public interface Model<M extends Model<M>> extends Transformer<M> {
169
// Inherits transform() method from Transformer
170
// Additional model-specific functionality can be added here
171
}
172
```
173
174
**Usage Example:**
175
176
```java
177
public class MyLinearModel implements Model<MyLinearModel> {
178
private DenseVector weights;
179
private double bias;
180
181
public MyLinearModel(DenseVector weights, double bias) {
182
this.weights = weights;
183
this.bias = bias;
184
}
185
186
@Override
187
public Table transform(TableEnvironment tEnv, Table input) {
188
// Apply model to make predictions
189
return predictionsTable;
190
}
191
}
192
```
193
194
## Pipeline Execution Flow
195
196
The pipeline framework follows a specific execution pattern:
197
198
1. **Construction**: Build pipeline by appending stages
199
2. **Training Check**: Use `needFit()` to determine if training is required
200
3. **Training**: If needed, call `fit()` to train estimators and produce models
201
4. **Transformation**: Call `transform()` to apply the pipeline to data
202
203
```java
204
// Example execution flow
205
Pipeline pipeline = new Pipeline()
206
.appendStage(preprocessor) // Transformer - no training needed
207
.appendStage(featureSelector) // Estimator - needs training
208
.appendStage(classifier); // Estimator - needs training
209
210
// Check if training needed (true, because of estimators)
211
boolean needsTraining = pipeline.needFit();
212
213
if (needsTraining) {
214
// This will:
215
// 1. Apply preprocessor transform
216
// 2. Train featureSelector on preprocessed data
217
// 3. Apply featureSelector transform
218
// 4. Train classifier on selected features
219
Pipeline trainedPipeline = pipeline.fit(tEnv, trainingData);
220
221
// Now apply full trained pipeline
222
Table results = trainedPipeline.transform(tEnv, newData);
223
}
224
```
225
226
## Type Safety
227
228
The pipeline framework uses generics to maintain type safety:
229
230
- `Estimator<E, M>`: Ensures estimators produce the correct model type
231
- `Transformer<T>`: Enables method chaining with correct return types
232
- `Model<M>`: Models maintain their specific type information
233
234
This prevents runtime errors and enables better IDE support and refactoring capabilities.