or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md

clustering.mddocs/

0

# Clustering Algorithms

1

2

Machine learning examples demonstrating K-Means clustering implementation for 2D data points. Features iterative algorithm patterns, custom data types, and bulk iteration capabilities.

3

4

## Capabilities

5

6

### K-Means Clustering

7

8

Iterative clustering algorithm that groups 2D data points into K clusters using centroid-based partitioning.

9

10

```java { .api }

11

/**

12

* K-Means clustering algorithm implementation using bulk iterations.

13

* Usage: KMeans --points <path> --centroids <path> --output <path> --iterations <n>

14

*/

15

@SuppressWarnings("serial")

16

public class KMeans {

17

public static void main(String[] args) throws Exception;

18

19

/**

20

* Two-dimensional point with basic geometric operations

21

*/

22

public static class Point implements Serializable {

23

public double x, y;

24

25

public Point();

26

public Point(double x, double y);

27

28

/**

29

* Add another point's coordinates to this point

30

* @param other Point to add

31

* @return This point with updated coordinates

32

*/

33

public Point add(Point other);

34

35

/**

36

* Divide coordinates by a value

37

* @param val Divisor value

38

* @return This point with divided coordinates

39

*/

40

public Point div(long val);

41

42

/**

43

* Calculate Euclidean distance to another point

44

* @param other Target point

45

* @return Distance as double

46

*/

47

public double euclideanDistance(Point other);

48

49

/**

50

* Reset coordinates to zero

51

*/

52

public void clear();

53

54

@Override

55

public String toString();

56

}

57

58

/**

59

* Cluster center point with ID

60

*/

61

public static class Centroid extends Point {

62

public int id;

63

64

public Centroid();

65

public Centroid(int id, double x, double y);

66

public Centroid(int id, Point p);

67

68

@Override

69

public String toString();

70

}

71

}

72

```

73

74

**Usage Examples:**

75

76

```java

77

// Run with custom data files

78

String[] args = {

79

"--points", "/path/to/points.txt",

80

"--centroids", "/path/to/centroids.txt",

81

"--output", "/path/to/output",

82

"--iterations", "20"

83

};

84

KMeans.main(args);

85

86

// Run with default data

87

String[] emptyArgs = {};

88

KMeans.main(emptyArgs);

89

90

// Use Point and Centroid classes directly

91

KMeans.Point p1 = new KMeans.Point(1.0, 2.0);

92

KMeans.Point p2 = new KMeans.Point(3.0, 4.0);

93

double distance = p1.euclideanDistance(p2);

94

95

KMeans.Centroid c1 = new KMeans.Centroid(1, 0.0, 0.0);

96

KMeans.Centroid c2 = new KMeans.Centroid(2, p1);

97

```

98

99

### K-Means User Functions

100

101

Specialized functions implementing the K-Means algorithm steps.

102

103

```java { .api }

104

/**

105

* Determines the closest cluster center for a data point

106

*/

107

@ForwardedFields("*->1")

108

public static final class SelectNearestCenter

109

extends RichMapFunction<Point, Tuple2<Integer, Point>> {

110

/**

111

* Maps a point to its nearest centroid ID and the point itself

112

* @param p Input point

113

* @return Tuple of (centroid_id, point)

114

*/

115

public Tuple2<Integer, Point> map(Point p) throws Exception;

116

117

/**

118

* Reads centroid values from broadcast variable

119

* @param parameters Configuration parameters

120

*/

121

@Override

122

public void open(Configuration parameters) throws Exception;

123

}

124

125

/**

126

* Appends a count variable to the tuple for aggregation

127

*/

128

@ForwardedFields("f0;f1")

129

public static final class CountAppender

130

implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {

131

/**

132

* Adds count of 1 to each point-centroid assignment

133

* @param t Input tuple (centroid_id, point)

134

* @return Tuple (centroid_id, point, count)

135

*/

136

public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t);

137

}

138

139

/**

140

* Sums and counts point coordinates for centroid calculation

141

*/

142

@ForwardedFields("0")

143

public static final class CentroidAccumulator

144

implements ReduceFunction<Tuple3<Integer, Point, Long>> {

145

/**

146

* Reduces point coordinates and counts for centroid averaging

147

* @param val1 First accumulation tuple

148

* @param val2 Second accumulation tuple

149

* @return Combined accumulation result

150

*/

151

public Tuple3<Integer, Point, Long> reduce(

152

Tuple3<Integer, Point, Long> val1,

153

Tuple3<Integer, Point, Long> val2);

154

}

155

156

/**

157

* Computes new centroid from coordinate sum and count of points

158

*/

159

@ForwardedFields("0->id")

160

public static final class CentroidAverager

161

implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {

162

/**

163

* Calculates average position for new centroid

164

* @param value Tuple (centroid_id, accumulated_point, count)

165

* @return New centroid at average position

166

*/

167

public Centroid map(Tuple3<Integer, Point, Long> value);

168

}

169

```

170

171

**Usage Examples:**

172

173

```java

174

// Use K-Means functions in custom algorithm

175

DataSet<KMeans.Point> points = getPointDataSet(params, env);

176

DataSet<KMeans.Centroid> centroids = getCentroidDataSet(params, env);

177

178

// Iterative computation

179

IterativeDataSet<KMeans.Centroid> loop = centroids.iterate(10);

180

181

DataSet<KMeans.Centroid> newCentroids = points

182

.map(new KMeans.SelectNearestCenter())

183

.withBroadcastSet(loop, "centroids")

184

.map(new KMeans.CountAppender())

185

.groupBy(0)

186

.reduce(new KMeans.CentroidAccumulator())

187

.map(new KMeans.CentroidAverager());

188

189

DataSet<KMeans.Centroid> finalCentroids = loop.closeWith(newCentroids);

190

```

191

192

### Data Provider and Generator

193

194

Utilities for generating and providing K-Means test data.

195

196

```java { .api }

197

/**

198

* Provides default data sets for K-Means examples

199

*/

200

public class KMeansData {

201

/**

202

* Default centroid data as object arrays

203

*/

204

public static final Object[][] CENTROIDS;

205

206

/**

207

* Default point data as object arrays

208

*/

209

public static final Object[][] POINTS;

210

211

/**

212

* Creates DataSet with default centroid data

213

* @param env Execution environment

214

* @return DataSet containing default centroids

215

*/

216

public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env);

217

218

/**

219

* Creates DataSet with default point data

220

* @param env Execution environment

221

* @return DataSet containing default points

222

*/

223

public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env);

224

}

225

226

/**

227

* Generates random K-Means data files for testing

228

*/

229

public class KMeansDataGenerator {

230

public static void main(String[] args) throws Exception;

231

232

public static final String CENTERS_FILE = "centers";

233

public static final String POINTS_FILE = "points";

234

public static final long DEFAULT_SEED = 4650285087650871364L;

235

public static final double DEFAULT_VALUE_RANGE = 100.0;

236

public static final double DEFAULT_DATA_FRACTION = 1.0;

237

public static final int DEFAULT_NUM_POINTS = 500;

238

public static final int DEFAULT_NUM_CENTERS = 20;

239

}

240

```

241

242

**Usage Examples:**

243

244

```java

245

// Use default data in custom applications

246

import org.apache.flink.examples.java.clustering.util.KMeansData;

247

248

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

249

DataSet<KMeans.Point> points = KMeansData.getDefaultPointDataSet(env);

250

DataSet<KMeans.Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);

251

252

// Generate custom test data

253

String[] generatorArgs = {

254

"--numPoints", "1000",

255

"--numCenters", "10",

256

"--output", "/path/to/data",

257

"--range", "50.0"

258

};

259

KMeansDataGenerator.main(generatorArgs);

260

```

261

262

## Algorithm Pattern

263

264

### Bulk Iteration Structure

265

266

K-Means uses Flink's bulk iteration pattern for iterative convergence:

267

268

```java

269

// Set up iterative computation

270

IterativeDataSet<Centroid> loop = centroids.iterate(maxIterations);

271

272

// Compute new centroids in each iteration

273

DataSet<Centroid> newCentroids = points

274

.map(new SelectNearestCenter()) // Assign points to nearest centroids

275

.withBroadcastSet(loop, "centroids") // Broadcast current centroids

276

.map(new CountAppender()) // Add count for averaging

277

.groupBy(0) // Group by centroid ID

278

.reduce(new CentroidAccumulator()) // Sum coordinates and counts

279

.map(new CentroidAverager()); // Calculate new centroid positions

280

281

// Close iteration loop

282

DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);

283

```

284

285

### Data Format Requirements

286

287

Input files must follow specific formats:

288

289

**Points file format:**

290

```

291

1.2 2.3

292

5.3 7.2

293

-1.0 3.4

294

```

295

296

**Centroids file format:**

297

```

298

1 6.2 3.2

299

2 2.9 5.7

300

3 -1.5 4.8

301

```

302

303

### Parameter Handling

304

305

K-Means supports comprehensive parameter configuration:

306

307

```java

308

ParameterTool params = ParameterTool.fromArgs(args);

309

310

// Data input parameters

311

String pointsPath = params.get("points"); // Points data file

312

String centroidsPath = params.get("centroids"); // Centroids data file

313

String outputPath = params.get("output"); // Output directory

314

315

// Algorithm parameters

316

int iterations = params.getInt("iterations", 10); // Number of iterations

317

```

318

319

## Types

320

321

### Core Geometric Types

322

323

```java { .api }

324

// 2D point with coordinates

325

KMeans.Point point = new KMeans.Point(x, y);

326

327

// Cluster center with ID

328

KMeans.Centroid centroid = new KMeans.Centroid(id, x, y);

329

330

// Flink tuples for algorithm steps

331

Tuple2<Integer, Point> assignment; // Point assignment to centroid

332

Tuple3<Integer, Point, Long> accumulation; // Accumulated point data

333

```