0
# Environment Management
1
2
ML execution context management for Flink batch and stream environments. Provides centralized access to execution contexts and table environments with support for multiple concurrent ML environments.
3
4
## Capabilities
5
6
### MLEnvironment Class
7
8
Core class that stores Flink execution contexts for both batch and stream processing.
9
10
```java { .api }
11
/**
12
* Stores Flink execution contexts for ML operations
13
* Provides access to both batch and stream environments
14
*/
15
public class MLEnvironment {
16
17
/** Create ML environment with default settings */
18
public MLEnvironment();
19
20
/** Create ML environment with batch-only contexts */
21
public MLEnvironment(ExecutionEnvironment batchEnv,
22
BatchTableEnvironment batchTableEnv);
23
24
/** Create ML environment with stream-only contexts */
25
public MLEnvironment(StreamExecutionEnvironment streamEnv,
26
StreamTableEnvironment streamTableEnv);
27
28
/** Create ML environment with both batch and stream contexts */
29
public MLEnvironment(ExecutionEnvironment batchEnv,
30
BatchTableEnvironment batchTableEnv,
31
StreamExecutionEnvironment streamEnv,
32
StreamTableEnvironment streamTableEnv);
33
34
/** Get batch execution environment */
35
public ExecutionEnvironment getExecutionEnvironment();
36
37
/** Get stream execution environment */
38
public StreamExecutionEnvironment getStreamExecutionEnvironment();
39
40
/** Get batch table environment */
41
public BatchTableEnvironment getBatchTableEnvironment();
42
43
/** Get stream table environment */
44
public StreamTableEnvironment getStreamTableEnvironment();
45
}
46
```
47
48
**Usage Examples:**
49
50
```java
51
import org.apache.flink.ml.common.MLEnvironment;
52
import org.apache.flink.api.java.ExecutionEnvironment;
53
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
54
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
55
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
56
57
// Create custom ML environment
58
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
59
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
60
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
61
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
62
63
MLEnvironment mlEnv = new MLEnvironment(batchEnv, batchTableEnv, streamEnv, streamTableEnv);
64
65
// Access environments
66
ExecutionEnvironment batchExecEnv = mlEnv.getExecutionEnvironment();
67
StreamExecutionEnvironment streamExecEnv = mlEnv.getStreamExecutionEnvironment();
68
BatchTableEnvironment batchTblEnv = mlEnv.getBatchTableEnvironment();
69
StreamTableEnvironment streamTblEnv = mlEnv.getStreamTableEnvironment();
70
71
// Use for ML operations
72
Table batchData = batchTblEnv.fromDataSet(/* dataset */);
73
Table streamData = streamTblEnv.fromDataStream(/* datastream */);
74
```
75
76
### MLEnvironmentFactory Class
77
78
Factory class for managing multiple ML environments with unique identifiers.
79
80
```java { .api }
81
/**
82
* Factory for managing MLEnvironment instances
83
* Supports multiple concurrent ML environments with unique IDs
84
*/
85
public class MLEnvironmentFactory {
86
87
/** Default ML environment ID */
88
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
89
90
/** Get ML environment by ID */
91
public static MLEnvironment get(Long mlEnvId);
92
93
/** Get default ML environment */
94
public static MLEnvironment getDefault();
95
96
/** Generate new unique ML environment ID */
97
public static Long getNewMLEnvironmentId();
98
99
/** Register ML environment and return its ID */
100
public static Long registerMLEnvironment(MLEnvironment env);
101
102
/** Remove ML environment and return removed instance */
103
public static MLEnvironment remove(Long mlEnvId);
104
}
105
```
106
107
**Usage Examples:**
108
109
```java
110
import org.apache.flink.ml.common.MLEnvironmentFactory;
111
112
// Use default environment
113
MLEnvironment defaultEnv = MLEnvironmentFactory.getDefault();
114
115
// Create and register custom environment
116
MLEnvironment customEnv = new MLEnvironment(/* custom settings */);
117
Long customEnvId = MLEnvironmentFactory.registerMLEnvironment(customEnv);
118
119
// Retrieve registered environment
120
MLEnvironment retrieved = MLEnvironmentFactory.get(customEnvId);
121
122
// Generate new environment ID for manual management
123
Long newId = MLEnvironmentFactory.getNewMLEnvironmentId();
124
125
// Remove environment when done
126
MLEnvironment removed = MLEnvironmentFactory.remove(customEnvId);
127
```
128
129
### HasMLEnvironmentId Interface
130
131
Parameter interface for ML components that need to specify which environment to use.
132
133
```java { .api }
134
/**
135
* Parameter interface for ML environment ID specification
136
* @param <T> The implementing class type for method chaining
137
*/
138
public interface HasMLEnvironmentId<T> extends WithParams<T> {
139
140
/** ML environment ID parameter */
141
ParamInfo<Long> ML_ENVIRONMENT_ID = ParamInfoFactory
142
.createParamInfo("mlEnvironmentId", Long.class)
143
.setDescription("ML environment ID")
144
.setHasDefaultValue(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID)
145
.build();
146
147
/** Get ML environment ID */
148
default Long getMLEnvironmentId() {
149
return get(ML_ENVIRONMENT_ID);
150
}
151
152
/** Set ML environment ID */
153
default T setMLEnvironmentId(Long value) {
154
return set(ML_ENVIRONMENT_ID, value);
155
}
156
}
157
```
158
159
**Usage Examples:**
160
161
```java
162
// ML component using environment ID
163
public class MyMLAlgorithm extends EstimatorBase<MyMLAlgorithm, MyMLModel>
164
implements HasMLEnvironmentId<MyMLAlgorithm> {
165
166
@Override
167
protected MyMLModel fit(BatchOperator input) {
168
// Get the specified ML environment
169
Long envId = getMLEnvironmentId();
170
MLEnvironment mlEnv = MLEnvironmentFactory.get(envId);
171
172
// Use environment for operations
173
BatchTableEnvironment tEnv = mlEnv.getBatchTableEnvironment();
174
175
// Training logic using the specified environment
176
// ...
177
178
return new MyMLModel(this.getParams());
179
}
180
}
181
182
// Usage with specific environment
183
MyMLAlgorithm algorithm = new MyMLAlgorithm()
184
.setMLEnvironmentId(customEnvId)
185
.setMaxIter(100);
186
187
MyMLModel model = algorithm.fit(trainingData);
188
```
189
190
## Environment Management Patterns
191
192
### Default Environment Usage
193
194
Most common pattern for simple applications:
195
196
```java
197
// Use default environment (ID = 0)
198
MLEnvironment env = MLEnvironmentFactory.getDefault();
199
200
// All ML components will use default environment
201
Pipeline pipeline = new Pipeline()
202
.appendStage(new FeatureScaler()) // Uses env ID 0
203
.appendStage(new LinearRegression()); // Uses env ID 0
204
205
Pipeline trained = pipeline.fit(env.getBatchTableEnvironment(), data);
206
```
207
208
### Multi-Environment Setup
209
210
For complex applications requiring different execution configurations:
211
212
```java
213
// Create environments for different purposes
214
MLEnvironment trainingEnv = new MLEnvironment(
215
getHighMemoryBatchEnv(), // High memory for training
216
getHighMemoryBatchTableEnv()
217
);
218
219
MLEnvironment streamingEnv = new MLEnvironment(
220
getLowLatencyStreamEnv(), // Low latency for streaming
221
getLowLatencyStreamTableEnv()
222
);
223
224
// Register environments
225
Long trainingEnvId = MLEnvironmentFactory.registerMLEnvironment(trainingEnv);
226
Long streamingEnvId = MLEnvironmentFactory.registerMLEnvironment(streamingEnv);
227
228
// Configure components for different environments
229
Estimator<?> trainer = new MyEstimator()
230
.setMLEnvironmentId(trainingEnvId); // Use high-memory env for training
231
232
Transformer<?> predictor = new MyPredictor()
233
.setMLEnvironmentId(streamingEnvId); // Use low-latency env for prediction
234
```
235
236
### Environment Isolation
237
238
For concurrent ML workflows:
239
240
```java
241
// Workflow 1: Real-time recommendation
242
Long realtimeEnvId = MLEnvironmentFactory.registerMLEnvironment(
243
createRealtimeEnvironment()
244
);
245
246
Pipeline realtimePipeline = new Pipeline()
247
.appendStage(new FeatureExtractor().setMLEnvironmentId(realtimeEnvId))
248
.appendStage(new RecommendationModel().setMLEnvironmentId(realtimeEnvId));
249
250
// Workflow 2: Batch analytics
251
Long batchEnvId = MLEnvironmentFactory.registerMLEnvironment(
252
createBatchAnalyticsEnvironment()
253
);
254
255
Pipeline batchPipeline = new Pipeline()
256
.appendStage(new DataAggregator().setMLEnvironmentId(batchEnvId))
257
.appendStage(new StatisticalAnalyzer().setMLEnvironmentId(batchEnvId));
258
259
// Workflows run independently with different resource configurations
260
```
261
262
### Environment Lifecycle Management
263
264
Proper cleanup and resource management:
265
266
```java
267
public class MLWorkflowManager {
268
private Map<String, Long> environments = new HashMap<>();
269
270
public Long createEnvironment(String name, MLEnvironment env) {
271
Long envId = MLEnvironmentFactory.registerMLEnvironment(env);
272
environments.put(name, envId);
273
return envId;
274
}
275
276
public MLEnvironment getEnvironment(String name) {
277
Long envId = environments.get(name);
278
return envId != null ? MLEnvironmentFactory.get(envId) : null;
279
}
280
281
public void cleanup() {
282
// Remove all registered environments
283
for (Long envId : environments.values()) {
284
MLEnvironmentFactory.remove(envId);
285
}
286
environments.clear();
287
}
288
}
289
290
// Usage
291
MLWorkflowManager manager = new MLWorkflowManager();
292
293
try {
294
// Set up environments
295
Long trainEnvId = manager.createEnvironment("training", trainingEnv);
296
Long serveEnvId = manager.createEnvironment("serving", servingEnv);
297
298
// Run ML workflows
299
// ...
300
301
} finally {
302
// Clean up resources
303
manager.cleanup();
304
}
305
```
306
307
## Integration with Table API
308
309
The environment management system seamlessly integrates with Flink's Table API:
310
311
```java
312
// Get ML environment
313
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
314
315
// Access table environments
316
BatchTableEnvironment batchTEnv = mlEnv.getBatchTableEnvironment();
317
StreamTableEnvironment streamTEnv = mlEnv.getStreamTableEnvironment();
318
319
// Create tables
320
Table batchTable = batchTEnv.fromDataSet(batchDataSet, "features, label");
321
Table streamTable = streamTEnv.fromDataStream(stream, "features, label");
322
323
// Use with ML components
324
Pipeline pipeline = new Pipeline()
325
.appendStage(new FeatureNormalizer())
326
.appendStage(new LogisticRegression());
327
328
// Training on batch data
329
Pipeline trained = pipeline.fit(batchTEnv, batchTable);
330
331
// Real-time prediction on stream data
332
Table predictions = trained.transform(streamTEnv, streamTable);
333
```
334
335
This tight integration ensures that ML operations have access to the appropriate execution contexts while maintaining consistency across batch and stream processing modes.