Machine learning examples demonstrating K-Means clustering implementation for 2D data points. Features iterative algorithm patterns, custom data types, and bulk iteration capabilities.
Iterative clustering algorithm that groups 2D data points into K clusters using centroid-based partitioning.
/**
* K-Means clustering algorithm implementation using bulk iterations.
* Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>
*/
@SuppressWarnings("serial")
public class KMeans {
public static void main(String[] args) throws Exception;
/**
* Two-dimensional point with basic geometric operations
*/
public static class Point implements Serializable {
public double x, y;
public Point();
public Point(double x, double y);
/**
* Add another point's coordinates to this point
* @param other Point to add
* @return This point with updated coordinates
*/
public Point add(Point other);
/**
* Divide coordinates by a value
* @param val Divisor value
* @return This point with divided coordinates
*/
public Point div(long val);
/**
* Calculate Euclidean distance to another point
* @param other Target point
* @return Distance as double
*/
public double euclideanDistance(Point other);
/**
* Reset coordinates to zero
*/
public void clear();
@Override
public String toString();
}
/**
* Cluster center point with ID
*/
public static class Centroid extends Point {
public int id;
public Centroid();
public Centroid(int id, double x, double y);
public Centroid(int id, Point p);
@Override
public String toString();
}
}Usage Examples:
// Run with custom data files
String[] args = {
"--points", "/path/to/points.txt",
"--centroids", "/path/to/centroids.txt",
"--output", "/path/to/output",
"--iterations", "20"
};
KMeans.main(args);
// Run with default data
String[] emptyArgs = {};
KMeans.main(emptyArgs);
// Use Point and Centroid classes directly
KMeans.Point p1 = new KMeans.Point(1.0, 2.0);
KMeans.Point p2 = new KMeans.Point(3.0, 4.0);
double distance = p1.euclideanDistance(p2);
KMeans.Centroid c1 = new KMeans.Centroid(1, 0.0, 0.0);
KMeans.Centroid c2 = new KMeans.Centroid(2, p1);Specialized functions implementing the K-Means algorithm steps.
/**
* Determines the closest cluster center for a data point
*/
@ForwardedFields("*->1")
public static final class SelectNearestCenter
extends RichMapFunction<Point, Tuple2<Integer, Point>> {
/**
* Maps a point to its nearest centroid ID and the point itself
* @param p Input point
* @return Tuple of (centroid_id, point)
*/
public Tuple2<Integer, Point> map(Point p) throws Exception;
/**
* Reads centroid values from broadcast variable
* @param parameters Configuration parameters
*/
@Override
public void open(Configuration parameters) throws Exception;
}
/**
* Appends a count variable to the tuple for aggregation
*/
@ForwardedFields("f0;f1")
public static final class CountAppender
implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
/**
* Adds count of 1 to each point-centroid assignment
* @param t Input tuple (centroid_id, point)
* @return Tuple (centroid_id, point, count)
*/
public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t);
}
/**
* Sums and counts point coordinates for centroid calculation
*/
@ForwardedFields("0")
public static final class CentroidAccumulator
implements ReduceFunction<Tuple3<Integer, Point, Long>> {
/**
* Reduces point coordinates and counts for centroid averaging
* @param val1 First accumulation tuple
* @param val2 Second accumulation tuple
* @return Combined accumulation result
*/
public Tuple3<Integer, Point, Long> reduce(
Tuple3<Integer, Point, Long> val1,
Tuple3<Integer, Point, Long> val2);
}
/**
* Computes new centroid from coordinate sum and count of points
*/
@ForwardedFields("0->id")
public static final class CentroidAverager
implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
/**
* Calculates average position for new centroid
* @param value Tuple (centroid_id, accumulated_point, count)
* @return New centroid at average position
*/
public Centroid map(Tuple3<Integer, Point, Long> value);
}Usage Examples:
// Use K-Means functions in custom algorithm
DataSet<KMeans.Point> points = getPointDataSet(params, env);
DataSet<KMeans.Centroid> centroids = getCentroidDataSet(params, env);
// Iterative computation
IterativeDataSet<KMeans.Centroid> loop = centroids.iterate(10);
DataSet<KMeans.Centroid> newCentroids = points
.map(new KMeans.SelectNearestCenter())
.withBroadcastSet(loop, "centroids")
.map(new KMeans.CountAppender())
.groupBy(0)
.reduce(new KMeans.CentroidAccumulator())
.map(new KMeans.CentroidAverager());
DataSet<KMeans.Centroid> finalCentroids = loop.closeWith(newCentroids);Utilities for generating and providing K-Means test data.
/**
* Provides default data sets for K-Means examples
*/
public class KMeansData {
/**
* Default centroid data as object arrays
*/
public static final Object[][] CENTROIDS;
/**
* Default point data as object arrays
*/
public static final Object[][] POINTS;
/**
* Creates DataSet with default centroid data
* @param env Execution environment
* @return DataSet containing default centroids
*/
public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default point data
* @param env Execution environment
* @return DataSet containing default points
*/
public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);
}
/**
* Generates random K-Means data files for testing
*/
public class KMeansDataGenerator {
public static void main(String[] args) throws Exception;
public static final String CENTERS_FILE = "centers";
public static final String POINTS_FILE = "points";
public static final long DEFAULT_SEED = 4650285087650871364L;
public static final double DEFAULT_VALUE_RANGE = 100.0;
public static final double DEFAULT_DATA_FRACTION = 1.0;
public static final int DEFAULT_NUM_POINTS = 500;
public static final int DEFAULT_NUM_CENTERS = 20;
}Usage Examples:
// Use default data in custom applications
import org.apache.flink.examples.java.clustering.util.KMeansData;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<KMeans.Point> points = KMeansData.getDefaultPointDataSet(env);
DataSet<KMeans.Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);
// Generate custom test data
String[] generatorArgs = {
"--numPoints", "1000",
"--numCenters", "10",
"--output", "/path/to/data",
"--range", "50.0"
};
KMeansDataGenerator.main(generatorArgs);K-Means uses Flink's bulk iteration pattern for iterative convergence:
// Set up iterative computation
IterativeDataSet<Centroid> loop = centroids.iterate(maxIterations);
// Compute new centroids in each iteration
DataSet<Centroid> newCentroids = points
.map(new SelectNearestCenter()) // Assign points to nearest centroids
.withBroadcastSet(loop, "centroids") // Broadcast current centroids
.map(new CountAppender()) // Add count for averaging
.groupBy(0) // Group by centroid ID
.reduce(new CentroidAccumulator()) // Sum coordinates and counts
.map(new CentroidAverager()); // Calculate new centroid positions
// Close iteration loop
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);Input files must follow specific formats:
Points file format:
1.2 2.3
5.3 7.2
-1.0 3.4Centroids file format:
1 6.2 3.2
2 2.9 5.7
3 -1.5 4.8K-Means supports comprehensive parameter configuration:
ParameterTool params = ParameterTool.fromArgs(args);
// Data input parameters
String pointsPath = params.get("points"); // Points data file
String centroidsPath = params.get("centroids"); // Centroids data file
String outputPath = params.get("output"); // Output directory
// Algorithm parameters
int iterations = params.getInt("iterations", 10); // Number of iterations// 2D point with coordinates
KMeans.Point point = new KMeans.Point(x, y);
// Cluster center with ID
KMeans.Centroid centroid = new KMeans.Centroid(id, x, y);
// Flink tuples for algorithm steps
Tuple2<Integer, Point> assignment; // Point assignment to centroid
Tuple3<Integer, Point, Long> accumulation; // Accumulated point data