or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithm-operators.mdindex.mdlinear-algebra.mdml-environment.mdml-pipeline.mdstatistical-operations.mdtable-utilities.md
tile.json

algorithm-operators.mddocs/

Algorithm Operators

Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration. The operator system provides the foundation for implementing ML algorithms in Flink.

Capabilities

AlgoOperator Abstract Class

Base class for all algorithm operators providing common functionality for output table management, parameter handling, and schema operations.

/**
 * Base class for algorithm operators with output table management
 */
public abstract class AlgoOperator<T extends AlgoOperator<T>> 
    implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {
    
    /** Default constructor */
    public AlgoOperator();
    
    /** Constructor with parameters */
    public AlgoOperator(Params params);
    
    /** Get parameters */
    public Params getParams();
    
    /**
     * Get primary output table
     * @return Primary output table
     */
    public Table getOutput();
    
    /**
     * Get side output tables
     * @return Array of side output tables
     */
    public Table[] getSideOutputs();
    
    /**
     * Get output column names
     * @return Array of column names
     */
    public String[] getColNames();
    
    /**
     * Get output column types
     * @return Array of column type information
     */
    public TypeInformation<?>[] getColTypes();
    
    /**
     * Get side output column names by index
     * @param index Side output index
     * @return Array of column names for specified side output
     */
    public String[] getSideOutputColNames(int index);
    
    /**
     * Get side output column types by index
     * @param index Side output index
     * @return Array of column types for specified side output
     */
    public TypeInformation<?>[] getSideOutputColTypes(int index);
    
    /**
     * Get output table schema
     * @return TableSchema of primary output
     */
    public TableSchema getSchema();
    
    /**
     * String representation of operator
     * @return String description
     */
    public String toString();
    
    /** Set output table (protected) */
    protected void setOutput(Table output);
    
    /** Set side output tables (protected) */
    protected void setSideOutputs(Table[] sideOutputs);
}

BatchOperator Abstract Class

Base class for batch algorithm operators providing linking capabilities and batch-specific functionality.

/**
 * Base class for batch algorithm operators
 */
public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {
    
    /** Default constructor */
    public BatchOperator();
    
    /** Constructor with parameters */
    public BatchOperator(Params params);
    
    /**
     * Link to next batch operator in chain
     * @param next Next operator to link to
     * @return The next operator for method chaining
     */
    public <B extends BatchOperator<?>> B link(B next);
    
    /**
     * Link from input batch operators (abstract - implemented by subclasses)
     * @param inputs Input batch operators
     * @return This operator instance configured with inputs
     */
    public abstract T linkFrom(BatchOperator<?>... inputs);
    
    /**
     * Create batch operator from table
     * @param table Input table
     * @return BatchOperator wrapping the table
     */
    public static BatchOperator<?> fromTable(Table table);
    
    /**
     * Validate and get first input operator (protected utility)
     * @param inputs Input operators
     * @return First input operator
     */
    protected static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);
}

Usage Examples:

import org.apache.flink.ml.operator.batch.BatchOperator;
import org.apache.flink.table.api.Table;

// Example custom batch operator implementation
public class MyBatchOperator extends BatchOperator<MyBatchOperator> {
    
    public MyBatchOperator() {
        super();
    }
    
    public MyBatchOperator(Params params) {
        super(params);
    }
    
    @Override
    public MyBatchOperator linkFrom(BatchOperator<?>... inputs) {
        // Validate inputs
        BatchOperator<?> input = checkAndGetFirst(inputs);
        
        // Get input table
        Table inputTable = input.getOutput();
        
        // Process the table (example transformation)
        Table outputTable = inputTable.select("*"); // Your processing logic here
        
        // Set the output
        this.setOutput(outputTable);
        
        return this;
    }
}

// Usage - operator chaining
BatchOperator<?> source = BatchOperator.fromTable(inputTable);

MyBatchOperator operator1 = new MyBatchOperator()
    .setParam("parameter1", "value1");

MyBatchOperator operator2 = new MyBatchOperator()
    .setParam("parameter2", "value2");

// Chain operators
Table result = source
    .link(operator1.linkFrom(source))
    .link(operator2.linkFrom(operator1))
    .getOutput();

StreamOperator Abstract Class

Base class for stream algorithm operators providing linking capabilities and stream-specific functionality.

/**
 * Base class for stream algorithm operators
 */
public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {
    
    /** Default constructor */
    public StreamOperator();
    
    /** Constructor with parameters */
    public StreamOperator(Params params);
    
    /**
     * Link to next stream operator in chain
     * @param next Next operator to link to
     * @return The next operator for method chaining
     */
    public <S extends StreamOperator<?>> S link(S next);
    
    /**
     * Link from input stream operators (abstract - implemented by subclasses)
     * @param inputs Input stream operators
     * @return This operator instance configured with inputs
     */
    public abstract T linkFrom(StreamOperator<?>... inputs);
    
    /**
     * Create stream operator from table
     * @param table Input table
     * @return StreamOperator wrapping the table
     */
    public static StreamOperator<?> fromTable(Table table);
}

Usage Examples:

import org.apache.flink.ml.operator.stream.StreamOperator;
import org.apache.flink.table.api.Table;

// Example custom stream operator implementation
public class MyStreamOperator extends StreamOperator<MyStreamOperator> {
    
    public MyStreamOperator() {
        super();
    }
    
    @Override
    public MyStreamOperator linkFrom(StreamOperator<?>... inputs) {
        StreamOperator<?> input = inputs[0]; // Get first input
        
        // Get input table
        Table inputTable = input.getOutput();
        
        // Process the streaming table
        Table outputTable = inputTable.select("*"); // Your streaming logic here
        
        // Set the output
        this.setOutput(outputTable);
        
        return this;
    }
}

// Usage - stream processing
StreamOperator<?> streamSource = StreamOperator.fromTable(streamingTable);

MyStreamOperator streamProcessor = new MyStreamOperator();

Table streamResult = streamSource
    .link(streamProcessor.linkFrom(streamSource))
    .getOutput();

Table Source Operators

Specialized operators for creating operators from table sources in both batch and stream contexts.

/**
 * Batch operator for table sources
 */
public class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {
    /**
     * Constructor from table
     * @param table Source table
     */
    public TableSourceBatchOp(Table table);
    
    @Override
    public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);
}

/**
 * Stream operator for table sources
 */
public class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {
    /**
     * Constructor from table
     * @param table Source table
     */
    public TableSourceStreamOp(Table table);
    
    @Override
    public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);
}

Usage Examples:

import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;

// Create operators from tables
Table batchTable = getBatchTable();
Table streamTable = getStreamTable();

// Batch table source
TableSourceBatchOp batchSource = new TableSourceBatchOp(batchTable);

// Stream table source  
TableSourceStreamOp streamSource = new TableSourceStreamOp(streamTable);

// Use in operator chains
MyBatchOperator batchProcessor = new MyBatchOperator();
Table batchResult = batchSource
    .link(batchProcessor.linkFrom(batchSource))
    .getOutput();

MyStreamOperator streamProcessor = new MyStreamOperator();
Table streamResult = streamSource
    .link(streamProcessor.linkFrom(streamSource))
    .getOutput();

Operator Pattern Examples

Common patterns for implementing algorithm operators with proper input validation and output management.

Usage Examples:

// Multi-input operator example
public class JoinOperator extends BatchOperator<JoinOperator> {
    
    @Override
    public JoinOperator linkFrom(BatchOperator<?>... inputs) {
        // Validate multiple inputs
        if (inputs.length != 2) {
            throw new IllegalArgumentException("JoinOperator requires exactly 2 inputs");
        }
        
        BatchOperator<?> left = inputs[0];
        BatchOperator<?> right = inputs[1];
        
        // Get input tables
        Table leftTable = left.getOutput();
        Table rightTable = right.getOutput();
        
        // Perform join operation
        String joinCondition = getParams().get(JOIN_CONDITION);
        Table joinedTable = leftTable.join(rightTable, joinCondition);
        
        // Set output
        this.setOutput(joinedTable);
        
        return this;
    }
}

// Side output operator example
public class SplitOperator extends BatchOperator<SplitOperator> {
    
    @Override
    public SplitOperator linkFrom(BatchOperator<?>... inputs) {
        BatchOperator<?> input = checkAndGetFirst(inputs);
        Table inputTable = input.getOutput();
        
        // Split condition
        String condition = getParams().get(SPLIT_CONDITION);
        
        // Create main and side outputs
        Table mainOutput = inputTable.filter(condition);
        Table sideOutput = inputTable.filter("!(" + condition + ")");
        
        // Set outputs
        this.setOutput(mainOutput);
        this.setSideOutputs(new Table[]{sideOutput});
        
        return this;
    }
}

// Parameter-driven operator example
public class SelectOperator extends BatchOperator<SelectOperator> 
    implements HasSelectedCols<SelectOperator> {
    
    @Override
    public SelectOperator linkFrom(BatchOperator<?>... inputs) {
        BatchOperator<?> input = checkAndGetFirst(inputs);
        Table inputTable = input.getOutput();
        
        // Get selected columns from parameters
        String[] selectedCols = getSelectedCols();
        if (selectedCols == null || selectedCols.length == 0) {
            selectedCols = inputTable.getSchema().getFieldNames();
        }
        
        // Select columns
        Table selectedTable = inputTable.select(String.join(",", selectedCols));
        
        this.setOutput(selectedTable);
        return this;
    }
}

// Usage
SelectOperator selector = new SelectOperator()
    .setSelectedCols(new String[]{"col1", "col2", "col3"});

JoinOperator joiner = new JoinOperator()
    .set(JOIN_CONDITION, "a.id = b.id");

SplitOperator splitter = new SplitOperator()
    .set(SPLIT_CONDITION, "age > 18");

// Build pipeline
Table result = source
    .link(selector.linkFrom(source))
    .link(joiner.linkFrom(selector, anotherSource))
    .link(splitter.linkFrom(joiner))
    .getOutput();

// Access side outputs
Table sideOutput = splitter.getSideOutputs()[0];