0
# ML Environment Management
1
2
Core execution environment management providing centralized context for ML operations in both batch and streaming scenarios. The ML environment manages Flink execution contexts and table environments required for machine learning operations.
3
4
## Capabilities
5
6
### MLEnvironment Class
7
8
Central class for managing Flink execution environments and table environments with unique identification for job sharing and resource management.
9
10
```java { .api }
11
/**
12
* Stores necessary Flink execution context with unique ID for job sharing
13
*/
14
public class MLEnvironment {
15
/**
16
* Default constructor creating empty environment
17
*/
18
public MLEnvironment();
19
20
/**
21
* Constructor for batch-only environment
22
* @param batchEnv Batch execution environment
23
* @param batchTableEnv Batch table environment
24
*/
25
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv);
26
27
/**
28
* Constructor for stream-only environment
29
* @param streamEnv Stream execution environment
30
* @param streamTableEnv Stream table environment
31
*/
32
public MLEnvironment(StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
33
34
/**
35
* Constructor for dual batch/stream environment
36
* @param batchEnv Batch execution environment
37
* @param batchTableEnv Batch table environment
38
* @param streamEnv Stream execution environment
39
* @param streamTableEnv Stream table environment
40
*/
41
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv,
42
StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
43
44
/**
45
* Get batch execution environment
46
* @return ExecutionEnvironment for batch processing
47
*/
48
public ExecutionEnvironment getExecutionEnvironment();
49
50
/**
51
* Get stream execution environment
52
* @return StreamExecutionEnvironment for stream processing
53
*/
54
public StreamExecutionEnvironment getStreamExecutionEnvironment();
55
56
/**
57
* Get batch table environment
58
* @return BatchTableEnvironment for batch table operations
59
*/
60
public BatchTableEnvironment getBatchTableEnvironment();
61
62
/**
63
* Get stream table environment
64
* @return StreamTableEnvironment for stream table operations
65
*/
66
public StreamTableEnvironment getStreamTableEnvironment();
67
}
68
```
69
70
**Usage Examples:**
71
72
```java
73
import org.apache.flink.ml.common.MLEnvironment;
74
import org.apache.flink.api.java.ExecutionEnvironment;
75
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
76
77
// Create batch-only ML environment
78
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
79
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
80
MLEnvironment mlEnv = new MLEnvironment(batchEnv, batchTableEnv);
81
82
// Use the environment
83
ExecutionEnvironment env = mlEnv.getExecutionEnvironment();
84
BatchTableEnvironment tableEnv = mlEnv.getBatchTableEnvironment();
85
```
86
87
### MLEnvironmentFactory Class
88
89
Factory class for creating, registering, and managing MLEnvironment instances with automatic ID assignment and lifecycle management.
90
91
```java { .api }
92
/**
93
* Factory for creating and managing MLEnvironment instances
94
*/
95
public class MLEnvironmentFactory {
96
/** Default environment ID constant */
97
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
98
99
/**
100
* Get MLEnvironment by ID
101
* @param mlEnvId Environment ID
102
* @return MLEnvironment instance or null if not found
103
*/
104
public static MLEnvironment get(Long mlEnvId);
105
106
/**
107
* Get default MLEnvironment
108
* @return Default MLEnvironment instance
109
*/
110
public static MLEnvironment getDefault();
111
112
/**
113
* Create new unique MLEnvironment ID
114
* @return New unique environment ID
115
*/
116
public static Long getNewMLEnvironmentId();
117
118
/**
119
* Register MLEnvironment instance
120
* @param env MLEnvironment to register
121
* @return Assigned environment ID
122
*/
123
public static Long registerMLEnvironment(MLEnvironment env);
124
125
/**
126
* Remove MLEnvironment by ID
127
* @param mlEnvId Environment ID to remove
128
* @return Removed MLEnvironment instance or null
129
*/
130
public static MLEnvironment remove(Long mlEnvId);
131
}
132
```
133
134
**Usage Examples:**
135
136
```java
137
import org.apache.flink.ml.common.MLEnvironmentFactory;
138
import org.apache.flink.ml.common.MLEnvironment;
139
140
// Get default environment
141
MLEnvironment defaultEnv = MLEnvironmentFactory.getDefault();
142
143
// Register custom environment
144
MLEnvironment customEnv = new MLEnvironment(batchEnv, batchTableEnv);
145
Long envId = MLEnvironmentFactory.registerMLEnvironment(customEnv);
146
147
// Retrieve registered environment
148
MLEnvironment retrievedEnv = MLEnvironmentFactory.get(envId);
149
150
// Clean up
151
MLEnvironmentFactory.remove(envId);
152
```
153
154
### HasMLEnvironmentId Interface
155
156
Parameter interface for ML environment ID management, allowing components to specify which ML environment to use.
157
158
```java { .api }
159
/**
160
* Parameter interface for ML environment ID
161
*/
162
public interface HasMLEnvironmentId<T> extends WithParams<T> {
163
/** Parameter info for ML environment ID */
164
ParamInfo<Long> ML_ENVIRONMENT_ID = ParamInfoFactory
165
.createParamInfo("mlEnvironmentId", Long.class)
166
.setDescription("ML environment ID")
167
.setHasDefaultValue(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID)
168
.build();
169
170
/**
171
* Get ML environment ID
172
* @return Environment ID
173
*/
174
default Long getMLEnvironmentId() {
175
return get(ML_ENVIRONMENT_ID);
176
}
177
178
/**
179
* Set ML environment ID
180
* @param value Environment ID
181
* @return This instance for method chaining
182
*/
183
default T setMLEnvironmentId(Long value) {
184
return set(ML_ENVIRONMENT_ID, value);
185
}
186
}
187
```
188
189
**Usage Examples:**
190
191
```java
192
// Any class implementing HasMLEnvironmentId can manage environment ID
193
public class MyMLComponent implements HasMLEnvironmentId<MyMLComponent> {
194
private Params params = new Params();
195
196
@Override
197
public Params getParams() { return params; }
198
199
public void process() {
200
Long envId = getMLEnvironmentId();
201
MLEnvironment env = MLEnvironmentFactory.get(envId);
202
// Use environment for processing
203
}
204
}
205
206
// Usage
207
MyMLComponent component = new MyMLComponent()
208
.setMLEnvironmentId(customEnvId);
209
```