CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-java

Apache Flink Java API - Core Java APIs for Apache Flink's batch and stream processing framework

Pending
Overview
Eval results
Files

iteration-operations.mddocs/

Iteration Operations

Support for iterative algorithms including bulk iterations and delta iterations. These operations enable implementation of machine learning algorithms, graph processing, and other iterative computations that require multiple passes over the data.

Capabilities

Bulk Iterations

Simple bulk iterations where the entire dataset is processed in each iteration until a maximum number of iterations is reached or a termination criterion is met.

/**
 * Create a bulk iteration
 * @param maxIterations maximum number of iterations
 * @return IterativeDataSet for iteration configuration
 */
public IterativeDataSet<T> iterate(int maxIterations);

IterativeDataSet Operations

Operations available on iterative datasets for implementing bulk iteration logic.

/**
 * Close the iteration with the iteration result
 * @param iterationResult the result of each iteration step
 * @return DataSet with the final iteration result
 */
public DataSet<T> closeWith(DataSet<T> iterationResult);

/**
 * Close the iteration with result and termination criterion
 * @param iterationResult the result of each iteration step
 * @param terminationCriterion dataset that determines when to terminate
 * @return DataSet with the final iteration result
 */
public DataSet<T> closeWith(DataSet<T> iterationResult, DataSet<T> terminationCriterion);

Usage Examples:

// Simple bulk iteration - compute powers of 2
DataSet<Integer> initial = env.fromElements(1);

DataSet<Integer> result = initial
    .iterate(10) // max 10 iterations
    .map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer value) {
            return value * 2;
        }
    })
    .closeWith(); // Close without termination criterion

// Bulk iteration with termination criterion
DataSet<Double> initialValues = env.fromElements(1.0, 2.0, 3.0);

IterativeDataSet<Double> iteration = initialValues.iterate(100);

DataSet<Double> nextValues = iteration
    .map(new MapFunction<Double, Double>() {
        @Override
        public Double map(Double value) {
            return Math.sqrt(value);
        }
    });

// Termination criterion: stop when values are close to 1.0
DataSet<Double> terminationCriterion = nextValues
    .filter(new FilterFunction<Double>() {
        @Override
        public boolean filter(Double value) {
            return Math.abs(value - 1.0) > 0.001;
        }
    });

DataSet<Double> finalResult = iteration.closeWith(nextValues, terminationCriterion);

Delta Iterations

More efficient iterations for scenarios where only a small part of the data changes in each iteration. Delta iterations maintain a solution set (complete state) and a workset (elements to be processed).

/**
 * Create a delta iteration
 * @param workset initial workset (elements to process)
 * @param maxIterations maximum number of iterations
 * @param keyFields key fields for joining solution set and workset
 * @return DeltaIteration for delta iteration configuration
 */
public DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, int... keyFields);

/**
 * Create a delta iteration with key selector
 * @param workset initial workset (elements to process)
 * @param maxIterations maximum number of iterations
 * @param keyExtractor function to extract keys for joining
 * @return DeltaIteration for delta iteration configuration
 */
public <K> DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, KeySelector<T, K> keyExtractor);

DeltaIteration Operations

Operations for configuring delta iterations with solution set updates and workset generation.

/**
 * Delta iteration class for solution set and workset management
 * @param <ST> solution set element type
 * @param <WT> workset element type
 */
public class DeltaIteration<ST, WT> {
    
    /**
     * Get the solution set placeholder for join operations
     * @return SolutionSetPlaceHolder representing the solution set
     */
    public SolutionSetPlaceHolder<ST> getSolutionSet();
    
    /**
     * Get the workset placeholder for transformations
     * @return WorksetPlaceHolder representing the workset
     */
    public WorksetPlaceHolder<WT> getWorkset();
    
    /**
     * Close the delta iteration
     * @param solutionSetDelta updates to the solution set
     * @param newWorkset new workset for next iteration
     * @return DataSet with final solution set
     */
    public DataSet<ST> closeWith(DataSet<ST> solutionSetDelta, DataSet<WT> newWorkset);
}

Usage Examples:

// Delta iteration example: Single Source Shortest Paths
DataSet<Tuple2<Long, Double>> vertices = env.fromElements(
    new Tuple2<>(1L, 0.0),    // source vertex with distance 0
    new Tuple2<>(2L, Double.POSITIVE_INFINITY),
    new Tuple2<>(3L, Double.POSITIVE_INFINITY)
);

DataSet<Tuple2<Long, Long>> edges = env.fromElements(
    new Tuple2<>(1L, 2L),     // edge from vertex 1 to vertex 2
    new Tuple2<>(2L, 3L)      // edge from vertex 2 to vertex 3
);

// Initial workset contains only the source vertex
DataSet<Tuple2<Long, Double>> initialWorkset = vertices
    .filter(new FilterFunction<Tuple2<Long, Double>>() {
        @Override
        public boolean filter(Tuple2<Long, Double> vertex) {
            return vertex.f1 == 0.0; // source vertex
        }
    });

// Create delta iteration
DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration = 
    vertices.iterateDelta(initialWorkset, 100, 0); // group by vertex ID (field 0)

// Get current solution set and workset placeholders
SolutionSetPlaceHolder<Tuple2<Long, Double>> solutionSet = iteration.getSolutionSet();
WorksetPlaceHolder<Tuple2<Long, Double>> workset = iteration.getWorkset();

// Join workset with edges to find neighbors
DataSet<Tuple3<Long, Long, Double>> candidateUpdates = workset
    .join(edges)
    .where(0).equalTo(0) // join on source vertex ID
    .with(new JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long>, Tuple3<Long, Long, Double>>() {
        @Override
        public Tuple3<Long, Long, Double> join(
            Tuple2<Long, Double> vertex, 
            Tuple2<Long, Long> edge) {
            return new Tuple3<>(edge.f1, vertex.f0, vertex.f1 + 1.0); // neighbor, source, new distance
        }
    });

// Generate solution set updates (shorter paths found)
DataSet<Tuple2<Long, Double>> solutionSetUpdates = candidateUpdates
    .join(solutionSet)
    .where(0).equalTo(0) // join on target vertex ID
    .with(new JoinFunction<Tuple3<Long, Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>>() {
        @Override
        public Tuple2<Long, Double> join(
            Tuple3<Long, Long, Double> candidate,
            Tuple2<Long, Double> current) {
            return candidate.f2 < current.f1 ? 
                new Tuple2<>(candidate.f0, candidate.f2) : null; // shorter path found
        }
    })
    .filter(new FilterFunction<Tuple2<Long, Double>>() {
        @Override
        public boolean filter(Tuple2<Long, Double> value) {
            return value != null;
        }
    });

// New workset contains vertices with updated distances
DataSet<Tuple2<Long, Double>> newWorkset = solutionSetUpdates;

// Close the iteration
DataSet<Tuple2<Long, Double>> shortestPaths = iteration.closeWith(solutionSetUpdates, newWorkset);

DeltaIterationResultSet

Result container for delta iterations that provides access to both final solution set and iteration statistics.

/**
 * Result set from delta iteration containing solution set and metadata
 * @param <ST> solution set element type
 * @param <WT> workset element type
 */
public class DeltaIterationResultSet<ST, WT> {
    
    /**
     * Get the final solution set
     * @return DataSet with final solution set
     */
    public DataSet<ST> getFinalResult();
    
    /**
     * Get iteration statistics and metadata
     * @return iteration execution information
     */
    public IterationHead getIterationHead();
}

Iteration Configuration

Additional configuration options for iterations.

/**
 * Set custom name for the iteration
 * @param name name for the iteration
 * @return configured iteration
 */
public IterativeDataSet<T> name(String name);

/**
 * Set parallelism for the iteration
 * @param parallelism parallelism level
 * @return configured iteration
 */
public IterativeDataSet<T> parallelism(int parallelism);

Termination Criteria

Methods for defining when iterations should terminate.

/**
 * Register aggregator for iteration
 * @param name name of the aggregator
 * @param aggregator the aggregator function
 * @return configured iteration
 */
public <X> DeltaIteration<ST, WT> registerAggregator(String name, Aggregator<X> aggregator);

/**
 * Register convergence criterion with aggregator
 * @param name name of the convergence aggregator
 * @param aggregator the aggregator function
 * @param convergenceCheck convergence criterion implementation
 * @return configured iteration
 */
public <X> DeltaIteration<ST, WT> registerAggregationConvergenceCriterion(
    String name, 
    Aggregator<X> aggregator,
    ConvergenceCriterion<X> convergenceCheck);

Advanced Usage Examples:

// Iteration with broadcast variable and aggregator
DataSet<Double> parameters = env.fromElements(0.1, 0.2, 0.3);

DataSet<Double> result = initialData
    .iterate(50)
    .name("Gradient Descent")
    .withBroadcastSet(parameters, "parameters")
    .registerAggregator("convergence", new DoubleSumAggregator())
    .map(new RichMapFunction<Double, Double>() {
        private List<Double> params;
        private DoubleSumAggregator convergenceAgg;
        
        @Override
        public void open(Configuration config) {
            params = getRuntimeContext().getBroadcastVariable("parameters");
            convergenceAgg = getIterationRuntimeContext().getIterationAggregator("convergence");
        }
        
        @Override
        public Double map(Double value) {
            double newValue = value * params.get(0); // use broadcast parameter
            convergenceAgg.aggregate(Math.abs(newValue - value)); // track convergence
            return newValue;
        }
    })
    .closeWith();

Types

import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
import org.apache.flink.api.java.operators.DeltaIteration.WorksetPlaceHolder;
import org.apache.flink.api.java.operators.DeltaIterationResultSet;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-java

docs

aggregation-grouping.md

data-input-output.md

dataset-operations.md

execution-environments.md

index.md

iteration-operations.md

join-cogroup-operations.md

utility-functions.md

tile.json