or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithm-operators.mdindex.mdlinear-algebra.mdml-environment.mdml-pipeline.mdstatistical-operations.mdtable-utilities.md
tile.json

ml-pipeline.mddocs/

ML Pipeline Framework

Estimator/Transformer pattern for building machine learning workflows with type-safe parameter management and support for both batch and stream processing. The pipeline framework provides the foundation for creating reusable ML components.

Capabilities

PipelineStageBase Abstract Class

Base class for all pipeline stages providing common functionality for parameter management, environment handling, and cloning operations.

/**
 * Base class for pipeline stages (estimators and transformers)
 */
public abstract class PipelineStageBase<S extends PipelineStageBase<S>> 
    implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable {
    
    /** Parameter storage */
    protected Params params;
    
    /** Default constructor */
    public PipelineStageBase();
    
    /** Constructor with parameters */
    public PipelineStageBase(Params params);
    
    /** Get parameters */
    public Params getParams();
    
    /** Clone pipeline stage */
    public S clone();
    
    /** Get table environment from table */
    protected static TableEnvironment tableEnvOf(Table table);
}

EstimatorBase Abstract Class

Base class for estimator implementations that train models from data. Estimators implement the fit() operation to produce trained models.

/**
 * Base class for estimator implementations that train models
 */
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>> 
    extends PipelineStageBase<E> implements Estimator<E, M> {
    
    /** Default constructor */
    public EstimatorBase();
    
    /** Constructor with parameters */
    public EstimatorBase(Params params);
    
    /**
     * Fit model with explicit table environment
     * @param tEnv Table environment to use
     * @param input Input data table
     * @return Trained model
     */
    public M fit(TableEnvironment tEnv, Table input);
    
    /**
     * Fit model from table (uses table's environment)
     * @param input Input data table
     * @return Trained model
     */
    public M fit(Table input);
    
    /**
     * Fit model from batch operator (abstract - implemented by subclasses)
     * @param input Input batch operator
     * @return Trained model
     */
    protected abstract M fit(BatchOperator input);
    
    /**
     * Fit model from stream operator (default throws UnsupportedOperationException)
     * @param input Input stream operator
     * @return Trained model
     */
    protected M fit(StreamOperator input);
}

Usage Examples:

import org.apache.flink.ml.pipeline.EstimatorBase;
import org.apache.flink.table.api.Table;

// Example custom estimator implementation
public class MyEstimator extends EstimatorBase<MyEstimator, MyModel> {
    
    public MyEstimator() {
        super();
    }
    
    @Override
    protected MyModel fit(BatchOperator input) {
        // Training logic here
        // Access parameters: getParams().get(PARAM_NAME)
        
        // Create and return trained model
        MyModel model = new MyModel();
        // Set model parameters and data
        return model;
    }
}

// Usage
MyEstimator estimator = new MyEstimator()
    .setMLEnvironmentId(envId)
    .setParam("learningRate", 0.01);

Table trainingData = getTrainingData();
MyModel trainedModel = estimator.fit(trainingData);

TransformerBase Abstract Class

Base class for transformer implementations that transform data. Transformers implement the transform() operation to process input data.

/**
 * Base class for transformer implementations that transform data
 */
public abstract class TransformerBase<T extends TransformerBase<T>> 
    extends PipelineStageBase<T> implements Transformer<T> {
    
    /** Default constructor */
    public TransformerBase();
    
    /** Constructor with parameters */
    public TransformerBase(Params params);
    
    /**
     * Transform data with explicit table environment
     * @param tEnv Table environment to use
     * @param input Input data table
     * @return Transformed data table
     */
    public Table transform(TableEnvironment tEnv, Table input);
    
    /**
     * Transform table (uses table's environment)
     * @param input Input data table
     * @return Transformed data table
     */
    public Table transform(Table input);
    
    /**
     * Transform batch data (abstract - implemented by subclasses)
     * @param input Input batch operator
     * @return Transformed batch operator
     */
    protected abstract BatchOperator transform(BatchOperator input);
    
    /**
     * Transform stream data (abstract - implemented by subclasses)
     * @param input Input stream operator
     * @return Transformed stream operator
     */
    protected abstract StreamOperator transform(StreamOperator input);
}

Usage Examples:

import org.apache.flink.ml.pipeline.TransformerBase;
import org.apache.flink.table.api.Table;

// Example custom transformer implementation
public class MyTransformer extends TransformerBase<MyTransformer> {
    
    public MyTransformer() {
        super();
    }
    
    @Override
    protected BatchOperator transform(BatchOperator input) {
        // Batch transformation logic
        return input.select("*"); // Example
    }
    
    @Override
    protected StreamOperator transform(StreamOperator input) {
        // Stream transformation logic
        return input.select("*"); // Example
    }
}

// Usage
MyTransformer transformer = new MyTransformer()
    .setOutputCol("transformed_col");

Table inputData = getInputData();
Table transformedData = transformer.transform(inputData);

ModelBase Abstract Class

Base class for machine learning models that can transform data and manage model state. Models extend transformers with model data management capabilities.

/**
 * Base class for machine learning models
 */
public abstract class ModelBase<M extends ModelBase<M>> 
    extends TransformerBase<M> implements Model<M> {
    
    /** Model data table */
    protected Table modelData;
    
    /** Default constructor */
    public ModelBase();
    
    /** Constructor with parameters */
    public ModelBase(Params params);
    
    /**
     * Get model data table
     * @return Model data as Table
     */
    public Table getModelData();
    
    /**
     * Set model data table
     * @param modelData Model data table
     * @return This model instance for method chaining
     */
    public M setModelData(Table modelData);
    
    /**
     * Clone model with model data
     * @return Cloned model instance
     */
    public M clone();
}

Usage Examples:

import org.apache.flink.ml.pipeline.ModelBase;
import org.apache.flink.table.api.Table;

// Example custom model implementation
public class MyModel extends ModelBase<MyModel> {
    
    public MyModel() {
        super();
    }
    
    @Override
    protected BatchOperator transform(BatchOperator input) {
        // Use model data for transformation
        Table modelData = getModelData();
        // Apply model transformation logic
        return input; // Example
    }
    
    @Override
    protected StreamOperator transform(StreamOperator input) {
        // Stream prediction logic using model data
        return input; // Example
    }
}

// Usage
MyModel model = new MyModel();

// Set model data (typically from training)
Table modelData = getModelData();
model.setModelData(modelData);

// Use model for prediction
Table inputData = getInputData();
Table predictions = model.transform(inputData);

// Clone model for different use cases
MyModel clonedModel = model.clone();

Pipeline Construction Patterns

Common patterns for building ML pipelines using the estimator/transformer framework.

Usage Examples:

import org.apache.flink.ml.pipeline.EstimatorBase;
import org.apache.flink.ml.pipeline.TransformerBase;
import org.apache.flink.table.api.Table;

// Sequential pipeline pattern
public class MLPipeline {
    
    public static Table buildPipeline(Table data) {
        // 1. Data preprocessing
        MyPreprocessor preprocessor = new MyPreprocessor()
            .setInputCol("raw_features")
            .setOutputCol("processed_features");
        
        Table preprocessedData = preprocessor.transform(data);
        
        // 2. Feature extraction
        MyFeatureExtractor extractor = new MyFeatureExtractor()
            .setInputCol("processed_features")
            .setOutputCol("features");
        
        Table featuresData = extractor.transform(preprocessedData);
        
        // 3. Model training
        MyEstimator estimator = new MyEstimator()
            .setFeaturesCol("features")
            .setLabelCol("label");
        
        MyModel model = estimator.fit(featuresData);
        
        // 4. Prediction
        Table predictions = model.transform(featuresData);
        
        return predictions;
    }
}

// Reusable pipeline components
public class PipelineBuilder {
    private List<TransformerBase> transformers = new ArrayList<>();
    
    public PipelineBuilder addTransformer(TransformerBase transformer) {
        transformers.add(transformer);
        return this;
    }
    
    public Table transform(Table input) {
        Table result = input;
        for (TransformerBase transformer : transformers) {
            result = transformer.transform(result);
        }
        return result;
    }
}

// Usage
Table result = new PipelineBuilder()
    .addTransformer(preprocessor)
    .addTransformer(featureExtractor)
    .addTransformer(trainedModel)
    .transform(inputData);

Parameter Management Integration

Pipeline stages integrate with Flink ML's parameter system for type-safe configuration.

Usage Examples:

import org.apache.flink.ml.params.shared.colname.HasOutputCol;
import org.apache.flink.ml.params.shared.colname.HasSelectedCols;

// Example estimator with parameter interfaces
public class ConfigurableEstimator 
    extends EstimatorBase<ConfigurableEstimator, ConfigurableModel>
    implements HasSelectedCols<ConfigurableEstimator>, HasOutputCol<ConfigurableEstimator> {
    
    // Custom parameters
    public static final ParamInfo<Double> LEARNING_RATE = ParamInfoFactory
        .createParamInfo("learningRate", Double.class)
        .setDescription("Learning rate for training")
        .setHasDefaultValue(0.01)
        .build();
    
    public Double getLearningRate() {
        return get(LEARNING_RATE);
    }
    
    public ConfigurableEstimator setLearningRate(Double value) {
        return set(LEARNING_RATE, value);
    }
    
    @Override
    protected ConfigurableModel fit(BatchOperator input) {
        // Access parameters
        String[] selectedCols = getSelectedCols();
        String outputCol = getOutputCol();
        Double learningRate = getLearningRate();
        
        // Training logic using parameters
        ConfigurableModel model = new ConfigurableModel()
            .setOutputCol(outputCol);
        
        return model;
    }
}

// Usage with parameters
ConfigurableEstimator estimator = new ConfigurableEstimator()
    .setSelectedCols(new String[]{"feature1", "feature2"})
    .setOutputCol("prediction")
    .setLearningRate(0.05)
    .setMLEnvironmentId(envId);

ConfigurableModel model = estimator.fit(trainingData);