or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-operators.mdenvironment-management.mdindex.mdlinear-algebra.mdparameter-system.mdpipeline-base-classes.mdpipeline-framework.mdutility-libraries.md

environment-management.mddocs/

0

# Environment Management

1

2

ML execution context management for Flink batch and stream environments. Provides centralized access to execution contexts and table environments with support for multiple concurrent ML environments.

3

4

## Capabilities

5

6

### MLEnvironment Class

7

8

Core class that stores Flink execution contexts for both batch and stream processing.

9

10

```java { .api }

11

/**

12

* Stores Flink execution contexts for ML operations

13

* Provides access to both batch and stream environments

14

*/

15

public class MLEnvironment {

16

17

/** Create ML environment with default settings */

18

public MLEnvironment();

19

20

/** Create ML environment with batch-only contexts */

21

public MLEnvironment(ExecutionEnvironment batchEnv,

22

BatchTableEnvironment batchTableEnv);

23

24

/** Create ML environment with stream-only contexts */

25

public MLEnvironment(StreamExecutionEnvironment streamEnv,

26

StreamTableEnvironment streamTableEnv);

27

28

/** Create ML environment with both batch and stream contexts */

29

public MLEnvironment(ExecutionEnvironment batchEnv,

30

BatchTableEnvironment batchTableEnv,

31

StreamExecutionEnvironment streamEnv,

32

StreamTableEnvironment streamTableEnv);

33

34

/** Get batch execution environment */

35

public ExecutionEnvironment getExecutionEnvironment();

36

37

/** Get stream execution environment */

38

public StreamExecutionEnvironment getStreamExecutionEnvironment();

39

40

/** Get batch table environment */

41

public BatchTableEnvironment getBatchTableEnvironment();

42

43

/** Get stream table environment */

44

public StreamTableEnvironment getStreamTableEnvironment();

45

}

46

```

47

48

**Usage Examples:**

49

50

```java

51

import org.apache.flink.ml.common.MLEnvironment;

52

import org.apache.flink.api.java.ExecutionEnvironment;

53

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

54

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

55

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

56

57

// Create custom ML environment

58

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

59

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

60

BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);

61

StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);

62

63

MLEnvironment mlEnv = new MLEnvironment(batchEnv, batchTableEnv, streamEnv, streamTableEnv);

64

65

// Access environments

66

ExecutionEnvironment batchExecEnv = mlEnv.getExecutionEnvironment();

67

StreamExecutionEnvironment streamExecEnv = mlEnv.getStreamExecutionEnvironment();

68

BatchTableEnvironment batchTblEnv = mlEnv.getBatchTableEnvironment();

69

StreamTableEnvironment streamTblEnv = mlEnv.getStreamTableEnvironment();

70

71

// Use for ML operations

72

Table batchData = batchTblEnv.fromDataSet(/* dataset */);

73

Table streamData = streamTblEnv.fromDataStream(/* datastream */);

74

```

75

76

### MLEnvironmentFactory Class

77

78

Factory class for managing multiple ML environments with unique identifiers.

79

80

```java { .api }

81

/**

82

* Factory for managing MLEnvironment instances

83

* Supports multiple concurrent ML environments with unique IDs

84

*/

85

public class MLEnvironmentFactory {

86

87

/** Default ML environment ID */

88

public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;

89

90

/** Get ML environment by ID */

91

public static MLEnvironment get(Long mlEnvId);

92

93

/** Get default ML environment */

94

public static MLEnvironment getDefault();

95

96

/** Generate new unique ML environment ID */

97

public static Long getNewMLEnvironmentId();

98

99

/** Register ML environment and return its ID */

100

public static Long registerMLEnvironment(MLEnvironment env);

101

102

/** Remove ML environment and return removed instance */

103

public static MLEnvironment remove(Long mlEnvId);

104

}

105

```

106

107

**Usage Examples:**

108

109

```java

110

import org.apache.flink.ml.common.MLEnvironmentFactory;

111

112

// Use default environment

113

MLEnvironment defaultEnv = MLEnvironmentFactory.getDefault();

114

115

// Create and register custom environment

116

MLEnvironment customEnv = new MLEnvironment(/* custom settings */);

117

Long customEnvId = MLEnvironmentFactory.registerMLEnvironment(customEnv);

118

119

// Retrieve registered environment

120

MLEnvironment retrieved = MLEnvironmentFactory.get(customEnvId);

121

122

// Generate new environment ID for manual management

123

Long newId = MLEnvironmentFactory.getNewMLEnvironmentId();

124

125

// Remove environment when done

126

MLEnvironment removed = MLEnvironmentFactory.remove(customEnvId);

127

```

128

129

### HasMLEnvironmentId Interface

130

131

Parameter interface for ML components that need to specify which environment to use.

132

133

```java { .api }

134

/**

135

* Parameter interface for ML environment ID specification

136

* @param <T> The implementing class type for method chaining

137

*/

138

public interface HasMLEnvironmentId<T> extends WithParams<T> {

139

140

/** ML environment ID parameter */

141

ParamInfo<Long> ML_ENVIRONMENT_ID = ParamInfoFactory

142

.createParamInfo("mlEnvironmentId", Long.class)

143

.setDescription("ML environment ID")

144

.setHasDefaultValue(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID)

145

.build();

146

147

/** Get ML environment ID */

148

default Long getMLEnvironmentId() {

149

return get(ML_ENVIRONMENT_ID);

150

}

151

152

/** Set ML environment ID */

153

default T setMLEnvironmentId(Long value) {

154

return set(ML_ENVIRONMENT_ID, value);

155

}

156

}

157

```

158

159

**Usage Examples:**

160

161

```java

162

// ML component using environment ID

163

public class MyMLAlgorithm extends EstimatorBase<MyMLAlgorithm, MyMLModel>

164

implements HasMLEnvironmentId<MyMLAlgorithm> {

165

166

@Override

167

protected MyMLModel fit(BatchOperator input) {

168

// Get the specified ML environment

169

Long envId = getMLEnvironmentId();

170

MLEnvironment mlEnv = MLEnvironmentFactory.get(envId);

171

172

// Use environment for operations

173

BatchTableEnvironment tEnv = mlEnv.getBatchTableEnvironment();

174

175

// Training logic using the specified environment

176

// ...

177

178

return new MyMLModel(this.getParams());

179

}

180

}

181

182

// Usage with specific environment

183

MyMLAlgorithm algorithm = new MyMLAlgorithm()

184

.setMLEnvironmentId(customEnvId)

185

.setMaxIter(100);

186

187

MyMLModel model = algorithm.fit(trainingData);

188

```

189

190

## Environment Management Patterns

191

192

### Default Environment Usage

193

194

Most common pattern for simple applications:

195

196

```java

197

// Use default environment (ID = 0)

198

MLEnvironment env = MLEnvironmentFactory.getDefault();

199

200

// All ML components will use default environment

201

Pipeline pipeline = new Pipeline()

202

.appendStage(new FeatureScaler()) // Uses env ID 0

203

.appendStage(new LinearRegression()); // Uses env ID 0

204

205

Pipeline trained = pipeline.fit(env.getBatchTableEnvironment(), data);

206

```

207

208

### Multi-Environment Setup

209

210

For complex applications requiring different execution configurations:

211

212

```java

213

// Create environments for different purposes

214

MLEnvironment trainingEnv = new MLEnvironment(

215

getHighMemoryBatchEnv(), // High memory for training

216

getHighMemoryBatchTableEnv()

217

);

218

219

MLEnvironment streamingEnv = new MLEnvironment(

220

getLowLatencyStreamEnv(), // Low latency for streaming

221

getLowLatencyStreamTableEnv()

222

);

223

224

// Register environments

225

Long trainingEnvId = MLEnvironmentFactory.registerMLEnvironment(trainingEnv);

226

Long streamingEnvId = MLEnvironmentFactory.registerMLEnvironment(streamingEnv);

227

228

// Configure components for different environments

229

Estimator<?> trainer = new MyEstimator()

230

.setMLEnvironmentId(trainingEnvId); // Use high-memory env for training

231

232

Transformer<?> predictor = new MyPredictor()

233

.setMLEnvironmentId(streamingEnvId); // Use low-latency env for prediction

234

```

235

236

### Environment Isolation

237

238

For concurrent ML workflows:

239

240

```java

241

// Workflow 1: Real-time recommendation

242

Long realtimeEnvId = MLEnvironmentFactory.registerMLEnvironment(

243

createRealtimeEnvironment()

244

);

245

246

Pipeline realtimePipeline = new Pipeline()

247

.appendStage(new FeatureExtractor().setMLEnvironmentId(realtimeEnvId))

248

.appendStage(new RecommendationModel().setMLEnvironmentId(realtimeEnvId));

249

250

// Workflow 2: Batch analytics

251

Long batchEnvId = MLEnvironmentFactory.registerMLEnvironment(

252

createBatchAnalyticsEnvironment()

253

);

254

255

Pipeline batchPipeline = new Pipeline()

256

.appendStage(new DataAggregator().setMLEnvironmentId(batchEnvId))

257

.appendStage(new StatisticalAnalyzer().setMLEnvironmentId(batchEnvId));

258

259

// Workflows run independently with different resource configurations

260

```

261

262

### Environment Lifecycle Management

263

264

Proper cleanup and resource management:

265

266

```java

267

public class MLWorkflowManager {

268

private Map<String, Long> environments = new HashMap<>();

269

270

public Long createEnvironment(String name, MLEnvironment env) {

271

Long envId = MLEnvironmentFactory.registerMLEnvironment(env);

272

environments.put(name, envId);

273

return envId;

274

}

275

276

public MLEnvironment getEnvironment(String name) {

277

Long envId = environments.get(name);

278

return envId != null ? MLEnvironmentFactory.get(envId) : null;

279

}

280

281

public void cleanup() {

282

// Remove all registered environments

283

for (Long envId : environments.values()) {

284

MLEnvironmentFactory.remove(envId);

285

}

286

environments.clear();

287

}

288

}

289

290

// Usage

291

MLWorkflowManager manager = new MLWorkflowManager();

292

293

try {

294

// Set up environments

295

Long trainEnvId = manager.createEnvironment("training", trainingEnv);

296

Long serveEnvId = manager.createEnvironment("serving", servingEnv);

297

298

// Run ML workflows

299

// ...

300

301

} finally {

302

// Clean up resources

303

manager.cleanup();

304

}

305

```

306

307

## Integration with Table API

308

309

The environment management system seamlessly integrates with Flink's Table API:

310

311

```java

312

// Get ML environment

313

MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();

314

315

// Access table environments

316

BatchTableEnvironment batchTEnv = mlEnv.getBatchTableEnvironment();

317

StreamTableEnvironment streamTEnv = mlEnv.getStreamTableEnvironment();

318

319

// Create tables

320

Table batchTable = batchTEnv.fromDataSet(batchDataSet, "features, label");

321

Table streamTable = streamTEnv.fromDataStream(stream, "features, label");

322

323

// Use with ML components

324

Pipeline pipeline = new Pipeline()

325

.appendStage(new FeatureNormalizer())

326

.appendStage(new LogisticRegression());

327

328

// Training on batch data

329

Pipeline trained = pipeline.fit(batchTEnv, batchTable);

330

331

// Real-time prediction on stream data

332

Table predictions = trained.transform(streamTEnv, streamTable);

333

```

334

335

This tight integration ensures that ML operations have access to the appropriate execution contexts while maintaining consistency across batch and stream processing modes.