Core execution environment management providing centralized context for ML operations in both batch and streaming scenarios. The ML environment manages Flink execution contexts and table environments required for machine learning operations.
Central class for managing Flink execution environments and table environments with unique identification for job sharing and resource management.
/**
* Stores necessary Flink execution context with unique ID for job sharing
*/
public class MLEnvironment {
/**
* Default constructor creating empty environment
*/
public MLEnvironment();
/**
* Constructor for batch-only environment
* @param batchEnv Batch execution environment
* @param batchTableEnv Batch table environment
*/
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv);
/**
* Constructor for stream-only environment
* @param streamEnv Stream execution environment
* @param streamTableEnv Stream table environment
*/
public MLEnvironment(StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
/**
* Constructor for dual batch/stream environment
* @param batchEnv Batch execution environment
* @param batchTableEnv Batch table environment
* @param streamEnv Stream execution environment
* @param streamTableEnv Stream table environment
*/
public MLEnvironment(ExecutionEnvironment batchEnv, BatchTableEnvironment batchTableEnv,
StreamExecutionEnvironment streamEnv, StreamTableEnvironment streamTableEnv);
/**
* Get batch execution environment
* @return ExecutionEnvironment for batch processing
*/
public ExecutionEnvironment getExecutionEnvironment();
/**
* Get stream execution environment
* @return StreamExecutionEnvironment for stream processing
*/
public StreamExecutionEnvironment getStreamExecutionEnvironment();
/**
* Get batch table environment
* @return BatchTableEnvironment for batch table operations
*/
public BatchTableEnvironment getBatchTableEnvironment();
/**
* Get stream table environment
* @return StreamTableEnvironment for stream table operations
*/
public StreamTableEnvironment getStreamTableEnvironment();
}Usage Examples:
import org.apache.flink.ml.common.MLEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
// Create batch-only ML environment
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
MLEnvironment mlEnv = new MLEnvironment(batchEnv, batchTableEnv);
// Use the environment
ExecutionEnvironment env = mlEnv.getExecutionEnvironment();
BatchTableEnvironment tableEnv = mlEnv.getBatchTableEnvironment();Factory class for creating, registering, and managing MLEnvironment instances with automatic ID assignment and lifecycle management.
/**
* Factory for creating and managing MLEnvironment instances
*/
public class MLEnvironmentFactory {
/** Default environment ID constant */
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
/**
* Get MLEnvironment by ID
* @param mlEnvId Environment ID
* @return MLEnvironment instance or null if not found
*/
public static MLEnvironment get(Long mlEnvId);
/**
* Get default MLEnvironment
* @return Default MLEnvironment instance
*/
public static MLEnvironment getDefault();
/**
* Create new unique MLEnvironment ID
* @return New unique environment ID
*/
public static Long getNewMLEnvironmentId();
/**
* Register MLEnvironment instance
* @param env MLEnvironment to register
* @return Assigned environment ID
*/
public static Long registerMLEnvironment(MLEnvironment env);
/**
* Remove MLEnvironment by ID
* @param mlEnvId Environment ID to remove
* @return Removed MLEnvironment instance or null
*/
public static MLEnvironment remove(Long mlEnvId);
}Usage Examples:
import org.apache.flink.ml.common.MLEnvironmentFactory;
import org.apache.flink.ml.common.MLEnvironment;
// Get default environment
MLEnvironment defaultEnv = MLEnvironmentFactory.getDefault();
// Register custom environment
MLEnvironment customEnv = new MLEnvironment(batchEnv, batchTableEnv);
Long envId = MLEnvironmentFactory.registerMLEnvironment(customEnv);
// Retrieve registered environment
MLEnvironment retrievedEnv = MLEnvironmentFactory.get(envId);
// Clean up
MLEnvironmentFactory.remove(envId);Parameter interface for ML environment ID management, allowing components to specify which ML environment to use.
/**
* Parameter interface for ML environment ID
*/
public interface HasMLEnvironmentId<T> extends WithParams<T> {
/** Parameter info for ML environment ID */
ParamInfo<Long> ML_ENVIRONMENT_ID = ParamInfoFactory
.createParamInfo("mlEnvironmentId", Long.class)
.setDescription("ML environment ID")
.setHasDefaultValue(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID)
.build();
/**
* Get ML environment ID
* @return Environment ID
*/
default Long getMLEnvironmentId() {
return get(ML_ENVIRONMENT_ID);
}
/**
* Set ML environment ID
* @param value Environment ID
* @return This instance for method chaining
*/
default T setMLEnvironmentId(Long value) {
return set(ML_ENVIRONMENT_ID, value);
}
}Usage Examples:
// Any class implementing HasMLEnvironmentId can manage environment ID
public class MyMLComponent implements HasMLEnvironmentId<MyMLComponent> {
private Params params = new Params();
@Override
public Params getParams() { return params; }
public void process() {
Long envId = getMLEnvironmentId();
MLEnvironment env = MLEnvironmentFactory.get(envId);
// Use environment for processing
}
}
// Usage
MyMLComponent component = new MyMLComponent()
.setMLEnvironmentId(customEnvId);