0
# Clustering Algorithms
1
2
Machine learning examples demonstrating K-Means clustering implementation for 2D data points. Features iterative algorithm patterns, custom data types, and bulk iteration capabilities.
3
4
## Capabilities
5
6
### K-Means Clustering
7
8
Iterative clustering algorithm that groups 2D data points into K clusters using centroid-based partitioning.
9
10
```java { .api }
11
/**
12
* K-Means clustering algorithm implementation using bulk iterations.
13
* Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>
14
*/
15
@SuppressWarnings("serial")
16
public class KMeans {
17
public static void main(String[] args) throws Exception;
18
19
/**
20
* Two-dimensional point with basic geometric operations
21
*/
22
public static class Point implements Serializable {
23
public double x, y;
24
25
public Point();
26
public Point(double x, double y);
27
28
/**
29
* Add another point's coordinates to this point
30
* @param other Point to add
31
* @return This point with updated coordinates
32
*/
33
public Point add(Point other);
34
35
/**
36
* Divide coordinates by a value
37
* @param val Divisor value
38
* @return This point with divided coordinates
39
*/
40
public Point div(long val);
41
42
/**
43
* Calculate Euclidean distance to another point
44
* @param other Target point
45
* @return Distance as double
46
*/
47
public double euclideanDistance(Point other);
48
49
/**
50
* Reset coordinates to zero
51
*/
52
public void clear();
53
54
@Override
55
public String toString();
56
}
57
58
/**
59
* Cluster center point with ID
60
*/
61
public static class Centroid extends Point {
62
public int id;
63
64
public Centroid();
65
public Centroid(int id, double x, double y);
66
public Centroid(int id, Point p);
67
68
@Override
69
public String toString();
70
}
71
}
72
```
73
74
**Usage Examples:**
75
76
```java
77
// Run with custom data files
78
String[] args = {
79
"--points", "/path/to/points.txt",
80
"--centroids", "/path/to/centroids.txt",
81
"--output", "/path/to/output",
82
"--iterations", "20"
83
};
84
KMeans.main(args);
85
86
// Run with default data
87
String[] emptyArgs = {};
88
KMeans.main(emptyArgs);
89
90
// Use Point and Centroid classes directly
91
KMeans.Point p1 = new KMeans.Point(1.0, 2.0);
92
KMeans.Point p2 = new KMeans.Point(3.0, 4.0);
93
double distance = p1.euclideanDistance(p2);
94
95
KMeans.Centroid c1 = new KMeans.Centroid(1, 0.0, 0.0);
96
KMeans.Centroid c2 = new KMeans.Centroid(2, p1);
97
```
98
99
### K-Means User Functions
100
101
Specialized functions implementing the K-Means algorithm steps.
102
103
```java { .api }
104
/**
105
* Determines the closest cluster center for a data point
106
*/
107
@ForwardedFields("*->1")
108
public static final class SelectNearestCenter
109
extends RichMapFunction<Point, Tuple2<Integer, Point>> {
110
/**
111
* Maps a point to its nearest centroid ID and the point itself
112
* @param p Input point
113
* @return Tuple of (centroid_id, point)
114
*/
115
public Tuple2<Integer, Point> map(Point p) throws Exception;
116
117
/**
118
* Reads centroid values from broadcast variable
119
* @param parameters Configuration parameters
120
*/
121
@Override
122
public void open(Configuration parameters) throws Exception;
123
}
124
125
/**
126
* Appends a count variable to the tuple for aggregation
127
*/
128
@ForwardedFields("f0;f1")
129
public static final class CountAppender
130
implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
131
/**
132
* Adds count of 1 to each point-centroid assignment
133
* @param t Input tuple (centroid_id, point)
134
* @return Tuple (centroid_id, point, count)
135
*/
136
public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t);
137
}
138
139
/**
140
* Sums and counts point coordinates for centroid calculation
141
*/
142
@ForwardedFields("0")
143
public static final class CentroidAccumulator
144
implements ReduceFunction<Tuple3<Integer, Point, Long>> {
145
/**
146
* Reduces point coordinates and counts for centroid averaging
147
* @param val1 First accumulation tuple
148
* @param val2 Second accumulation tuple
149
* @return Combined accumulation result
150
*/
151
public Tuple3<Integer, Point, Long> reduce(
152
Tuple3<Integer, Point, Long> val1,
153
Tuple3<Integer, Point, Long> val2);
154
}
155
156
/**
157
* Computes new centroid from coordinate sum and count of points
158
*/
159
@ForwardedFields("0->id")
160
public static final class CentroidAverager
161
implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
162
/**
163
* Calculates average position for new centroid
164
* @param value Tuple (centroid_id, accumulated_point, count)
165
* @return New centroid at average position
166
*/
167
public Centroid map(Tuple3<Integer, Point, Long> value);
168
}
169
```
170
171
**Usage Examples:**
172
173
```java
174
// Use K-Means functions in custom algorithm
175
DataSet<KMeans.Point> points = getPointDataSet(params, env);
176
DataSet<KMeans.Centroid> centroids = getCentroidDataSet(params, env);
177
178
// Iterative computation
179
IterativeDataSet<KMeans.Centroid> loop = centroids.iterate(10);
180
181
DataSet<KMeans.Centroid> newCentroids = points
182
.map(new KMeans.SelectNearestCenter())
183
.withBroadcastSet(loop, "centroids")
184
.map(new KMeans.CountAppender())
185
.groupBy(0)
186
.reduce(new KMeans.CentroidAccumulator())
187
.map(new KMeans.CentroidAverager());
188
189
DataSet<KMeans.Centroid> finalCentroids = loop.closeWith(newCentroids);
190
```
191
192
### Data Provider and Generator
193
194
Utilities for generating and providing K-Means test data.
195
196
```java { .api }
197
/**
198
* Provides default data sets for K-Means examples
199
*/
200
public class KMeansData {
201
/**
202
* Default centroid data as object arrays
203
*/
204
public static final Object[][] CENTROIDS;
205
206
/**
207
* Default point data as object arrays
208
*/
209
public static final Object[][] POINTS;
210
211
/**
212
* Creates DataSet with default centroid data
213
* @param env Execution environment
214
* @return DataSet containing default centroids
215
*/
216
public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);
217
218
/**
219
* Creates DataSet with default point data
220
* @param env Execution environment
221
* @return DataSet containing default points
222
*/
223
public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);
224
}
225
226
/**
227
* Generates random K-Means data files for testing
228
*/
229
public class KMeansDataGenerator {
230
public static void main(String[] args) throws Exception;
231
232
public static final String CENTERS_FILE = "centers";
233
public static final String POINTS_FILE = "points";
234
public static final long DEFAULT_SEED = 4650285087650871364L;
235
public static final double DEFAULT_VALUE_RANGE = 100.0;
236
public static final double DEFAULT_DATA_FRACTION = 1.0;
237
public static final int DEFAULT_NUM_POINTS = 500;
238
public static final int DEFAULT_NUM_CENTERS = 20;
239
}
240
```
241
242
**Usage Examples:**
243
244
```java
245
// Use default data in custom applications
246
import org.apache.flink.examples.java.clustering.util.KMeansData;
247
248
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
249
DataSet<KMeans.Point> points = KMeansData.getDefaultPointDataSet(env);
250
DataSet<KMeans.Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);
251
252
// Generate custom test data
253
String[] generatorArgs = {
254
"--numPoints", "1000",
255
"--numCenters", "10",
256
"--output", "/path/to/data",
257
"--range", "50.0"
258
};
259
KMeansDataGenerator.main(generatorArgs);
260
```
261
262
## Algorithm Pattern
263
264
### Bulk Iteration Structure
265
266
K-Means uses Flink's bulk iteration pattern for iterative convergence:
267
268
```java
269
// Set up iterative computation
270
IterativeDataSet<Centroid> loop = centroids.iterate(maxIterations);
271
272
// Compute new centroids in each iteration
273
DataSet<Centroid> newCentroids = points
274
.map(new SelectNearestCenter()) // Assign points to nearest centroids
275
.withBroadcastSet(loop, "centroids") // Broadcast current centroids
276
.map(new CountAppender()) // Add count for averaging
277
.groupBy(0) // Group by centroid ID
278
.reduce(new CentroidAccumulator()) // Sum coordinates and counts
279
.map(new CentroidAverager()); // Calculate new centroid positions
280
281
// Close iteration loop
282
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
283
```
284
285
### Data Format Requirements
286
287
Input files must follow specific formats:
288
289
**Points file format:**
290
```
291
1.2 2.3
292
5.3 7.2
293
-1.0 3.4
294
```
295
296
**Centroids file format:**
297
```
298
1 6.2 3.2
299
2 2.9 5.7
300
3 -1.5 4.8
301
```
302
303
### Parameter Handling
304
305
K-Means supports comprehensive parameter configuration:
306
307
```java
308
ParameterTool params = ParameterTool.fromArgs(args);
309
310
// Data input parameters
311
String pointsPath = params.get("points"); // Points data file
312
String centroidsPath = params.get("centroids"); // Centroids data file
313
String outputPath = params.get("output"); // Output directory
314
315
// Algorithm parameters
316
int iterations = params.getInt("iterations", 10); // Number of iterations
317
```
318
319
## Types
320
321
### Core Geometric Types
322
323
```java { .api }
324
// 2D point with coordinates
325
KMeans.Point point = new KMeans.Point(x, y);
326
327
// Cluster center with ID
328
KMeans.Centroid centroid = new KMeans.Centroid(id, x, y);
329
330
// Flink tuples for algorithm steps
331
Tuple2<Integer, Point> assignment; // Point assignment to centroid
332
Tuple3<Integer, Point, Long> accumulation; // Accumulated point data
333
```