or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md
tile.json

clustering.mddocs/

Clustering Algorithms

Machine learning examples demonstrating K-Means clustering implementation for 2D data points. Features iterative algorithm patterns, custom data types, and bulk iteration capabilities.

Capabilities

K-Means Clustering

Iterative clustering algorithm that groups 2D data points into K clusters using centroid-based partitioning.

/**
 * K-Means clustering algorithm implementation using bulk iterations.
 * Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>
 */
@SuppressWarnings("serial")
public class KMeans {
    public static void main(String[] args) throws Exception;
    
    /**
     * Two-dimensional point with basic geometric operations
     */
    public static class Point implements Serializable {
        public double x, y;
        
        public Point();
        public Point(double x, double y);
        
        /**
         * Add another point's coordinates to this point
         * @param other Point to add
         * @return This point with updated coordinates
         */
        public Point add(Point other);
        
        /**
         * Divide coordinates by a value
         * @param val Divisor value
         * @return This point with divided coordinates
         */
        public Point div(long val);
        
        /**
         * Calculate Euclidean distance to another point
         * @param other Target point
         * @return Distance as double
         */
        public double euclideanDistance(Point other);
        
        /**
         * Reset coordinates to zero
         */
        public void clear();
        
        @Override
        public String toString();
    }
    
    /**
     * Cluster center point with ID
     */
    public static class Centroid extends Point {
        public int id;
        
        public Centroid();
        public Centroid(int id, double x, double y);
        public Centroid(int id, Point p);
        
        @Override
        public String toString();
    }
}

Usage Examples:

// Run with custom data files
String[] args = {
    "--points", "/path/to/points.txt",
    "--centroids", "/path/to/centroids.txt", 
    "--output", "/path/to/output",
    "--iterations", "20"
};
KMeans.main(args);

// Run with default data
String[] emptyArgs = {};
KMeans.main(emptyArgs);

// Use Point and Centroid classes directly
KMeans.Point p1 = new KMeans.Point(1.0, 2.0);
KMeans.Point p2 = new KMeans.Point(3.0, 4.0);
double distance = p1.euclideanDistance(p2);

KMeans.Centroid c1 = new KMeans.Centroid(1, 0.0, 0.0);
KMeans.Centroid c2 = new KMeans.Centroid(2, p1);

K-Means User Functions

Specialized functions implementing the K-Means algorithm steps.

/**
 * Determines the closest cluster center for a data point
 */
@ForwardedFields("*->1")
public static final class SelectNearestCenter 
        extends RichMapFunction<Point, Tuple2<Integer, Point>> {
    /**
     * Maps a point to its nearest centroid ID and the point itself
     * @param p Input point
     * @return Tuple of (centroid_id, point)
     */
    public Tuple2<Integer, Point> map(Point p) throws Exception;
    
    /**
     * Reads centroid values from broadcast variable
     * @param parameters Configuration parameters
     */
    @Override
    public void open(Configuration parameters) throws Exception;
}

/**
 * Appends a count variable to the tuple for aggregation
 */
@ForwardedFields("f0;f1")
public static final class CountAppender 
        implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
    /**
     * Adds count of 1 to each point-centroid assignment
     * @param t Input tuple (centroid_id, point)
     * @return Tuple (centroid_id, point, count)
     */
    public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t);
}

/**
 * Sums and counts point coordinates for centroid calculation
 */
@ForwardedFields("0")
public static final class CentroidAccumulator 
        implements ReduceFunction<Tuple3<Integer, Point, Long>> {
    /**
     * Reduces point coordinates and counts for centroid averaging
     * @param val1 First accumulation tuple
     * @param val2 Second accumulation tuple  
     * @return Combined accumulation result
     */
    public Tuple3<Integer, Point, Long> reduce(
            Tuple3<Integer, Point, Long> val1, 
            Tuple3<Integer, Point, Long> val2);
}

/**
 * Computes new centroid from coordinate sum and count of points
 */
@ForwardedFields("0->id")
public static final class CentroidAverager 
        implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
    /**
     * Calculates average position for new centroid
     * @param value Tuple (centroid_id, accumulated_point, count)
     * @return New centroid at average position
     */
    public Centroid map(Tuple3<Integer, Point, Long> value);
}

Usage Examples:

// Use K-Means functions in custom algorithm
DataSet<KMeans.Point> points = getPointDataSet(params, env);
DataSet<KMeans.Centroid> centroids = getCentroidDataSet(params, env);

// Iterative computation
IterativeDataSet<KMeans.Centroid> loop = centroids.iterate(10);

DataSet<KMeans.Centroid> newCentroids = points
    .map(new KMeans.SelectNearestCenter())
    .withBroadcastSet(loop, "centroids")
    .map(new KMeans.CountAppender())
    .groupBy(0)
    .reduce(new KMeans.CentroidAccumulator())
    .map(new KMeans.CentroidAverager());

DataSet<KMeans.Centroid> finalCentroids = loop.closeWith(newCentroids);

Data Provider and Generator

Utilities for generating and providing K-Means test data.

/**
 * Provides default data sets for K-Means examples
 */
public class KMeansData {
    /**
     * Default centroid data as object arrays
     */
    public static final Object[][] CENTROIDS;
    
    /**
     * Default point data as object arrays  
     */
    public static final Object[][] POINTS;
    
    /**
     * Creates DataSet with default centroid data
     * @param env Execution environment
     * @return DataSet containing default centroids
     */
    public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);
    
    /**
     * Creates DataSet with default point data
     * @param env Execution environment
     * @return DataSet containing default points
     */
    public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);
}

/**
 * Generates random K-Means data files for testing
 */
public class KMeansDataGenerator {
    public static void main(String[] args) throws Exception;
    
    public static final String CENTERS_FILE = "centers";
    public static final String POINTS_FILE = "points";
    public static final long DEFAULT_SEED = 4650285087650871364L;
    public static final double DEFAULT_VALUE_RANGE = 100.0;
    public static final double DEFAULT_DATA_FRACTION = 1.0;
    public static final int DEFAULT_NUM_POINTS = 500;
    public static final int DEFAULT_NUM_CENTERS = 20;
}

Usage Examples:

// Use default data in custom applications
import org.apache.flink.examples.java.clustering.util.KMeansData;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<KMeans.Point> points = KMeansData.getDefaultPointDataSet(env);
DataSet<KMeans.Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);

// Generate custom test data
String[] generatorArgs = {
    "--numPoints", "1000",
    "--numCenters", "10", 
    "--output", "/path/to/data",
    "--range", "50.0"
};
KMeansDataGenerator.main(generatorArgs);

Algorithm Pattern

Bulk Iteration Structure

K-Means uses Flink's bulk iteration pattern for iterative convergence:

// Set up iterative computation
IterativeDataSet<Centroid> loop = centroids.iterate(maxIterations);

// Compute new centroids in each iteration
DataSet<Centroid> newCentroids = points
    .map(new SelectNearestCenter())           // Assign points to nearest centroids
    .withBroadcastSet(loop, "centroids")      // Broadcast current centroids
    .map(new CountAppender())                 // Add count for averaging  
    .groupBy(0)                               // Group by centroid ID
    .reduce(new CentroidAccumulator())        // Sum coordinates and counts
    .map(new CentroidAverager());             // Calculate new centroid positions

// Close iteration loop
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

Data Format Requirements

Input files must follow specific formats:

Points file format:

1.2 2.3
5.3 7.2
-1.0 3.4

Centroids file format:

1 6.2 3.2
2 2.9 5.7
3 -1.5 4.8

Parameter Handling

K-Means supports comprehensive parameter configuration:

ParameterTool params = ParameterTool.fromArgs(args);

// Data input parameters
String pointsPath = params.get("points");        // Points data file
String centroidsPath = params.get("centroids");  // Centroids data file
String outputPath = params.get("output");        // Output directory

// Algorithm parameters  
int iterations = params.getInt("iterations", 10); // Number of iterations

Types

Core Geometric Types

// 2D point with coordinates
KMeans.Point point = new KMeans.Point(x, y);

// Cluster center with ID
KMeans.Centroid centroid = new KMeans.Centroid(id, x, y);

// Flink tuples for algorithm steps
Tuple2<Integer, Point> assignment;           // Point assignment to centroid
Tuple3<Integer, Point, Long> accumulation;   // Accumulated point data