CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-ml-uber-2-11

Comprehensive machine learning library for Apache Flink that enables scalable ML pipelines on distributed stream processing platform.

Pending
Overview
Eval results
Files

algorithm-operators.mddocs/

Algorithm Operators

Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API for scalable data processing workflows.

Capabilities

AlgoOperator Base Class

Abstract base class for all algorithm operators providing common functionality.

/**
 * Base class for algorithm operators with parameter support
 * @param <T> The concrete operator type for method chaining
 */
public abstract class AlgoOperator<T extends AlgoOperator<T>>
                      implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
    
    /** Create operator with empty parameters */
    public AlgoOperator();
    
    /** Create operator with initial parameters */
    public AlgoOperator(Params params);
    
    /** Get all operator parameters */
    public Params getParams();
    
    /** Get primary output table */
    public Table getOutput();
    
    /** Get all side output tables */
    public Table[] getSideOutputs();
    
    /** Get output column names */
    public String[] getColNames();
    
    /** Get output column types */
    public TypeInformation<?>[] getColTypes();
    
    /** Get side output column names by index */
    public String[] getSideOutputColNames(int index);
    
    /** Get side output column types by index */
    public TypeInformation<?>[] getSideOutputColTypes(int index);
    
    /** Get output table schema */
    public TableSchema getSchema();
    
    /** Set side output tables (protected) */
    protected void setSideOutputs(Table[] sideOutputs);
    
    /** Set primary output table (protected) */
    protected void setOutput(Table output);
    
    /** Validate minimum number of input operators */
    public static void checkMinOpSize(int size, AlgoOperator<?>... inputs);
    
    /** Validate exact number of input operators */
    public static void checkOpSize(int size, AlgoOperator<?>... inputs);
}

BatchOperator Class

Base class for batch algorithm operators with linking and chaining capabilities.

/**
 * Base class for batch algorithm operators
 * Provides operator linking and batch-specific functionality
 * @param <T> The concrete batch operator type
 */
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
    
    /** Create batch operator with empty parameters */
    public BatchOperator();
    
    /** Create batch operator with initial parameters */
    public BatchOperator(Params params);
    
    /** Link this operator to the next operator in chain */
    public <B extends BatchOperator<?>> B link(B next);
    
    /** Link this operator from input operators (abstract) */
    public abstract T linkFrom(BatchOperator<?>... inputs);
    
    /** Create batch operator from existing table */
    public static BatchOperator<?> fromTable(Table table);
    
    /** Get first input operator, validating inputs exist */
    public static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
}

Usage Examples:

import org.apache.flink.ml.operator.batch.BatchOperator;
import org.apache.flink.table.api.Table;

// Create operators from tables
Table inputTable = // ... your input table
BatchOperator<?> source = BatchOperator.fromTable(inputTable);

// Chain operators
BatchOperator<?> result = source
    .link(new MyBatchTransformer())
    .link(new MyBatchEstimator())
    .link(new MyBatchSink());

// Custom batch operator implementation
public class MyBatchTransformer extends BatchOperator<MyBatchTransformer> {
    @Override
    public MyBatchTransformer linkFrom(BatchOperator<?>... inputs) {
        BatchOperator.checkOpSize(1, inputs);
        BatchOperator<?> input = inputs[0];
        
        // Process input table
        Table processedTable = // ... transformation logic
        this.setOutput(processedTable);
        
        return this;
    }
}

StreamOperator Class

Base class for stream algorithm operators with real-time processing capabilities.

/**
 * Base class for stream algorithm operators
 * Provides operator linking and stream-specific functionality
 * @param <T> The concrete stream operator type
 */
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
    
    /** Create stream operator with empty parameters */
    public StreamOperator();
    
    /** Create stream operator with initial parameters */
    public StreamOperator(Params params);
    
    /** Link this operator to the next operator in chain */
    public <S extends StreamOperator<?>> S link(S next);
    
    /** Link this operator from input operators (abstract) */
    public abstract T linkFrom(StreamOperator<?>... inputs);
    
    /** Create stream operator from existing table */
    public static StreamOperator<?> fromTable(Table table);
    
    /** Get first input operator, validating inputs exist */
    public static StreamOperator<?> checkAndGetFirst(StreamOperator<?>... inputs);
}

Usage Examples:

import org.apache.flink.ml.operator.stream.StreamOperator;
import org.apache.flink.table.api.Table;

// Create operators from streaming tables
Table streamingTable = // ... your streaming table
StreamOperator<?> source = StreamOperator.fromTable(streamingTable);

// Chain streaming operators
StreamOperator<?> result = source
    .link(new MyStreamTransformer())
    .link(new MyStreamProcessor())
    .link(new MyStreamSink());

// Custom stream operator implementation
public class MyStreamTransformer extends StreamOperator<MyStreamTransformer> {
    @Override
    public MyStreamTransformer linkFrom(StreamOperator<?>... inputs) {
        StreamOperator.checkOpSize(1, inputs);
        StreamOperator<?> input = inputs[0];
        
        // Process streaming table
        Table processedStream = // ... streaming transformation logic
        this.setOutput(processedStream);
        
        return this;
    }
}

Source Operators

Specialized operators for converting Tables to operator chains.

TableSourceBatchOp

/**
 * Transform Table to batch source operator
 * Entry point for batch operator chains
 */
public final class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
    
    /** Create batch source from table */
    public TableSourceBatchOp(Table table);
    
    /** Not supported - throws UnsupportedOperationException */
    public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
}

TableSourceStreamOp

/**
 * Transform Table to stream source operator  
 * Entry point for stream operator chains
 */
public final class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
    
    /** Create stream source from table */
    public TableSourceStreamOp(Table table);
    
    /** Not supported - throws UnsupportedOperationException */
    public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
}

Usage Examples:

import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;

// Batch processing chain  
Table batchData = // ... your batch table
BatchOperator<?> batchChain = new TableSourceBatchOp(batchData)
    .link(new FeatureScaler())
    .link(new LinearRegression())
    .link(new ModelEvaluator());

// Stream processing chain
Table streamData = // ... your streaming table  
StreamOperator<?> streamChain = new TableSourceStreamOp(streamData)
    .link(new StreamingFeatureTransformer())
    .link(new OnlinePredictor())
    .link(new AlertSystem());

Operator Chaining Patterns

Linear Chaining

Most common pattern where operators are linked in sequence:

// Simple linear chain
BatchOperator<?> result = source
    .link(preprocessor)
    .link(featureExtractor)
    .link(classifier);

Multi-Input Operators

Operators that consume multiple inputs:

public class JoinOperator extends BatchOperator<JoinOperator> {
    @Override
    public JoinOperator linkFrom(BatchOperator<?>... inputs) {
        BatchOperator.checkOpSize(2, inputs); // Require exactly 2 inputs
        
        BatchOperator<?> left = inputs[0];
        BatchOperator<?> right = inputs[1];
        
        // Join logic here
        Table joined = // ... join left and right tables
        this.setOutput(joined);
        
        return this;
    }
}

// Usage
BatchOperator<?> joined = new JoinOperator()
    .linkFrom(leftOperator, rightOperator);

Multi-Output Operators

Operators that produce multiple outputs:

public class SplitOperator extends BatchOperator<SplitOperator> {
    @Override
    public SplitOperator linkFrom(BatchOperator<?>... inputs) {
        BatchOperator.checkOpSize(1, inputs);
        
        Table input = inputs[0].getOutput();
        
        // Split logic
        Table mainOutput = // ... main result
        Table[] sideOutputs = new Table[]{
            // ... additional outputs
        };
        
        this.setOutput(mainOutput);
        this.setSideOutputs(sideOutputs);
        
        return this;
    }
}

// Usage
SplitOperator splitter = new SplitOperator().linkFrom(source);
Table main = splitter.getOutput();
Table[] sides = splitter.getSideOutputs();

Parameter Configuration

All operators support parameter configuration through the WithParams interface:

public class ConfigurableOperator extends BatchOperator<ConfigurableOperator> {
    // Parameter definitions
    public static final ParamInfo<Integer> NUM_ITERATIONS = ParamInfoFactory
        .createParamInfo("numIterations", Integer.class)
        .setHasDefaultValue(10)
        .build();
    
    public static final ParamInfo<Double> LEARNING_RATE = ParamInfoFactory
        .createParamInfo("learningRate", Double.class)
        .setHasDefaultValue(0.01)
        .build();
    
    // Convenience methods
    public ConfigurableOperator setNumIterations(int numIter) {
        return set(NUM_ITERATIONS, numIter);
    }
    
    public int getNumIterations() {
        return get(NUM_ITERATIONS);
    }
    
    @Override
    public ConfigurableOperator linkFrom(BatchOperator<?>... inputs) {
        // Use parameters in processing
        int numIter = getNumIterations();
        double lr = get(LEARNING_RATE);
        
        // Processing logic using parameters
        // ...
        
        return this;
    }
}

// Usage with parameters
BatchOperator<?> configured = new ConfigurableOperator()
    .setNumIterations(20)
    .set(ConfigurableOperator.LEARNING_RATE, 0.001)
    .linkFrom(source);

Integration with Pipeline Framework

Algorithm operators can be integrated with the higher-level pipeline framework:

public class MyEstimatorFromOperator extends EstimatorBase<MyEstimatorFromOperator, MyModelFromOperator> {
    
    @Override
    protected MyModelFromOperator fit(BatchOperator input) {
        // Use batch operators for training
        BatchOperator<?> trained = input
            .link(new FeaturePreprocessor())
            .link(new TrainingOperator())
            .link(new ModelExtractor());
        
        // Extract model data
        Table modelData = trained.getOutput();
        
        return new MyModelFromOperator(this.getParams()).setModelData(modelData);
    }
}

public class MyModelFromOperator extends ModelBase<MyModelFromOperator> {
    
    @Override
    protected BatchOperator transform(BatchOperator input) {
        // Use batch operators for prediction
        return input
            .link(new FeaturePreprocessor())
            .link(new PredictionOperator().setModelData(this.getModelData()));
    }
    
    @Override
    protected StreamOperator transform(StreamOperator input) {
        // Use stream operators for real-time prediction
        return input
            .link(new StreamFeaturePreprocessor())
            .link(new StreamPredictionOperator().setModelData(this.getModelData()));
    }
}

This integration allows you to leverage the low-level operator framework within the high-level pipeline abstractions, providing flexibility for custom algorithm implementations while maintaining compatibility with the broader ML ecosystem.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-ml-uber-2-11

docs

algorithm-operators.md

environment-management.md

index.md

linear-algebra.md

parameter-system.md

pipeline-base-classes.md

pipeline-framework.md

utility-libraries.md

tile.json