Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.
—
ML execution context management for Flink batch and stream environments. Provides centralized access to execution contexts and table environments with support for multiple concurrent ML environments.
Core class that stores Flink execution contexts for both batch and stream processing.
/**
* Stores Flink execution contexts for ML operations
* Provides access to both batch and stream environments
*/
public class MLEnvironment {
/** Create ML environment with default settings */
public MLEnvironment();
/** Create ML environment with batch-only contexts */
public MLEnvironment(ExecutionEnvironment batchEnv,
BatchTableEnvironment batchTableEnv);
/** Create ML environment with stream-only contexts */
public MLEnvironment(StreamExecutionEnvironment streamEnv,
StreamTableEnvironment streamTableEnv);
/** Create ML environment with both batch and stream contexts */
public MLEnvironment(ExecutionEnvironment batchEnv,
BatchTableEnvironment batchTableEnv,
StreamExecutionEnvironment streamEnv,
StreamTableEnvironment streamTableEnv);
/** Get batch execution environment */
public ExecutionEnvironment getExecutionEnvironment();
/** Get stream execution environment */
public StreamExecutionEnvironment getStreamExecutionEnvironment();
/** Get batch table environment */
public BatchTableEnvironment getBatchTableEnvironment();
/** Get stream table environment */
public StreamTableEnvironment getStreamTableEnvironment();
}Usage Examples:
import org.apache.flink.ml.common.MLEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// Create custom ML environment
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);
StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(streamEnv);
MLEnvironment mlEnv = new MLEnvironment(batchEnv, batchTableEnv, streamEnv, streamTableEnv);
// Access environments
ExecutionEnvironment batchExecEnv = mlEnv.getExecutionEnvironment();
StreamExecutionEnvironment streamExecEnv = mlEnv.getStreamExecutionEnvironment();
BatchTableEnvironment batchTblEnv = mlEnv.getBatchTableEnvironment();
StreamTableEnvironment streamTblEnv = mlEnv.getStreamTableEnvironment();
// Use for ML operations
Table batchData = batchTblEnv.fromDataSet(/* dataset */);
Table streamData = streamTblEnv.fromDataStream(/* datastream */);Factory class for managing multiple ML environments with unique identifiers.
/**
* Factory for managing MLEnvironment instances
* Supports multiple concurrent ML environments with unique IDs
*/
public class MLEnvironmentFactory {
/** Default ML environment ID */
public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
/** Get ML environment by ID */
public static MLEnvironment get(Long mlEnvId);
/** Get default ML environment */
public static MLEnvironment getDefault();
/** Generate new unique ML environment ID */
public static Long getNewMLEnvironmentId();
/** Register ML environment and return its ID */
public static Long registerMLEnvironment(MLEnvironment env);
/** Remove ML environment and return removed instance */
public static MLEnvironment remove(Long mlEnvId);
}Usage Examples:
import org.apache.flink.ml.common.MLEnvironmentFactory;
// Use default environment
MLEnvironment defaultEnv = MLEnvironmentFactory.getDefault();
// Create and register custom environment
MLEnvironment customEnv = new MLEnvironment(/* custom settings */);
Long customEnvId = MLEnvironmentFactory.registerMLEnvironment(customEnv);
// Retrieve registered environment
MLEnvironment retrieved = MLEnvironmentFactory.get(customEnvId);
// Generate new environment ID for manual management
Long newId = MLEnvironmentFactory.getNewMLEnvironmentId();
// Remove environment when done
MLEnvironment removed = MLEnvironmentFactory.remove(customEnvId);Parameter interface for ML components that need to specify which environment to use.
/**
* Parameter interface for ML environment ID specification
* @param <T> The implementing class type for method chaining
*/
public interface HasMLEnvironmentId<T> extends WithParams<T> {
/** ML environment ID parameter */
ParamInfo<Long> ML_ENVIRONMENT_ID = ParamInfoFactory
.createParamInfo("mlEnvironmentId", Long.class)
.setDescription("ML environment ID")
.setHasDefaultValue(MLEnvironmentFactory.DEFAULT_ML_ENVIRONMENT_ID)
.build();
/** Get ML environment ID */
default Long getMLEnvironmentId() {
return get(ML_ENVIRONMENT_ID);
}
/** Set ML environment ID */
default T setMLEnvironmentId(Long value) {
return set(ML_ENVIRONMENT_ID, value);
}
}Usage Examples:
// ML component using environment ID
public class MyMLAlgorithm extends EstimatorBase<MyMLAlgorithm, MyMLModel>
implements HasMLEnvironmentId<MyMLAlgorithm> {
@Override
protected MyMLModel fit(BatchOperator input) {
// Get the specified ML environment
Long envId = getMLEnvironmentId();
MLEnvironment mlEnv = MLEnvironmentFactory.get(envId);
// Use environment for operations
BatchTableEnvironment tEnv = mlEnv.getBatchTableEnvironment();
// Training logic using the specified environment
// ...
return new MyMLModel(this.getParams());
}
}
// Usage with specific environment
MyMLAlgorithm algorithm = new MyMLAlgorithm()
.setMLEnvironmentId(customEnvId)
.setMaxIter(100);
MyMLModel model = algorithm.fit(trainingData);Most common pattern for simple applications:
// Use default environment (ID = 0)
MLEnvironment env = MLEnvironmentFactory.getDefault();
// All ML components will use default environment
Pipeline pipeline = new Pipeline()
.appendStage(new FeatureScaler()) // Uses env ID 0
.appendStage(new LinearRegression()); // Uses env ID 0
Pipeline trained = pipeline.fit(env.getBatchTableEnvironment(), data);For complex applications requiring different execution configurations:
// Create environments for different purposes
MLEnvironment trainingEnv = new MLEnvironment(
getHighMemoryBatchEnv(), // High memory for training
getHighMemoryBatchTableEnv()
);
MLEnvironment streamingEnv = new MLEnvironment(
getLowLatencyStreamEnv(), // Low latency for streaming
getLowLatencyStreamTableEnv()
);
// Register environments
Long trainingEnvId = MLEnvironmentFactory.registerMLEnvironment(trainingEnv);
Long streamingEnvId = MLEnvironmentFactory.registerMLEnvironment(streamingEnv);
// Configure components for different environments
Estimator<?> trainer = new MyEstimator()
.setMLEnvironmentId(trainingEnvId); // Use high-memory env for training
Transformer<?> predictor = new MyPredictor()
.setMLEnvironmentId(streamingEnvId); // Use low-latency env for predictionFor concurrent ML workflows:
// Workflow 1: Real-time recommendation
Long realtimeEnvId = MLEnvironmentFactory.registerMLEnvironment(
createRealtimeEnvironment()
);
Pipeline realtimePipeline = new Pipeline()
.appendStage(new FeatureExtractor().setMLEnvironmentId(realtimeEnvId))
.appendStage(new RecommendationModel().setMLEnvironmentId(realtimeEnvId));
// Workflow 2: Batch analytics
Long batchEnvId = MLEnvironmentFactory.registerMLEnvironment(
createBatchAnalyticsEnvironment()
);
Pipeline batchPipeline = new Pipeline()
.appendStage(new DataAggregator().setMLEnvironmentId(batchEnvId))
.appendStage(new StatisticalAnalyzer().setMLEnvironmentId(batchEnvId));
// Workflows run independently with different resource configurationsProper cleanup and resource management:
public class MLWorkflowManager {
private Map<String, Long> environments = new HashMap<>();
public Long createEnvironment(String name, MLEnvironment env) {
Long envId = MLEnvironmentFactory.registerMLEnvironment(env);
environments.put(name, envId);
return envId;
}
public MLEnvironment getEnvironment(String name) {
Long envId = environments.get(name);
return envId != null ? MLEnvironmentFactory.get(envId) : null;
}
public void cleanup() {
// Remove all registered environments
for (Long envId : environments.values()) {
MLEnvironmentFactory.remove(envId);
}
environments.clear();
}
}
// Usage
MLWorkflowManager manager = new MLWorkflowManager();
try {
// Set up environments
Long trainEnvId = manager.createEnvironment("training", trainingEnv);
Long serveEnvId = manager.createEnvironment("serving", servingEnv);
// Run ML workflows
// ...
} finally {
// Clean up resources
manager.cleanup();
}The environment management system seamlessly integrates with Flink's Table API:
// Get ML environment
MLEnvironment mlEnv = MLEnvironmentFactory.getDefault();
// Access table environments
BatchTableEnvironment batchTEnv = mlEnv.getBatchTableEnvironment();
StreamTableEnvironment streamTEnv = mlEnv.getStreamTableEnvironment();
// Create tables
Table batchTable = batchTEnv.fromDataSet(batchDataSet, "features, label");
Table streamTable = streamTEnv.fromDataStream(stream, "features, label");
// Use with ML components
Pipeline pipeline = new Pipeline()
.appendStage(new FeatureNormalizer())
.appendStage(new LogisticRegression());
// Training on batch data
Pipeline trained = pipeline.fit(batchTEnv, batchTable);
// Real-time prediction on stream data
Table predictions = trained.transform(streamTEnv, streamTable);This tight integration ensures that ML operations have access to the appropriate execution contexts while maintaining consistency across batch and stream processing modes.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-ml-uber-2-11