or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

example-implementations.mdexecution-framework.mdgraph-algorithms.mdindex.mdinput-sources.mdoutput-handlers.mdparameter-system.mdtransformations.md

example-implementations.mddocs/

0

# Example Implementations

1

2

Standalone example implementations demonstrating various graph processing algorithms and programming patterns. These examples showcase different approaches to graph analysis using Apache Flink's Gelly API and serve as reference implementations for common use cases.

3

4

## Capabilities

5

6

### Java Examples

7

8

Collection of standalone Java implementations demonstrating different algorithm approaches and Flink programming patterns.

9

10

#### PageRank Implementation

11

12

Generic PageRank implementation using scatter-gather pattern with configurable parameters.

13

14

```java { .api }

15

/**

16

* PageRank algorithm implementation with generic key type support

17

* Uses scatter-gather pattern for distributed computation

18

* @param <K> Vertex key type

19

*/

20

public class PageRank<K> {

21

/**

22

* Create PageRank instance with specified parameters

23

* @param beta Damping factor (typically 0.85)

24

* @param maxIterations Maximum number of iterations

25

*/

26

public PageRank(double beta, int maxIterations);

27

28

/**

29

* Execute PageRank algorithm on input graph

30

* @param network Input graph with double-valued vertices and edges

31

* @return DataSet of vertices with PageRank scores

32

*/

33

public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network);

34

}

35

```

36

37

**Usage Examples:**

38

39

```java

40

import org.apache.flink.graph.examples.PageRank;

41

42

// Create PageRank instance

43

PageRank<Long> pageRank = new PageRank<>(0.85, 10);

44

45

// Prepare graph with initial values

46

Graph<Long, Double, Double> graph = inputGraph

47

.mapVertices(new MapFunction<Vertex<Long, NullValue>, Double>() {

48

public Double map(Vertex<Long, NullValue> vertex) {

49

return 1.0; // Initial PageRank value

50

}

51

});

52

53

// Execute algorithm

54

DataSet<Vertex<Long, Double>> results = pageRank.run(graph);

55

results.print();

56

```

57

58

#### Single Source Shortest Paths

59

60

Standard implementation of single source shortest paths using iterative vertex-centric computation.

61

62

```java { .api }

63

/**

64

* Single Source Shortest Paths algorithm

65

* Computes shortest distances from a source vertex to all reachable vertices

66

*/

67

public class SingleSourceShortestPaths {

68

/**

69

* Execute SSSP from specified source vertex

70

* @param graph Input graph with numeric edge weights

71

* @param srcVertexId Source vertex identifier

72

* @param maxIterations Maximum number of iterations

73

* @return DataSet of vertices with shortest distances

74

*/

75

public static DataSet<Vertex<Long, Double>> run(

76

Graph<Long, NullValue, Double> graph,

77

Long srcVertexId,

78

Integer maxIterations

79

) throws Exception;

80

}

81

```

82

83

**Usage Examples:**

84

85

```java

86

// Execute SSSP from vertex 1

87

DataSet<Vertex<Long, Double>> distances =

88

SingleSourceShortestPaths.run(weightedGraph, 1L, 10);

89

90

// Filter reachable vertices

91

DataSet<Vertex<Long, Double>> reachable = distances

92

.filter(vertex -> vertex.getValue() < Double.POSITIVE_INFINITY);

93

```

94

95

#### Gather-Sum-Apply Examples

96

97

Examples demonstrating the Gather-Sum-Apply (GSA) programming model.

98

99

##### GSA PageRank

100

101

```java { .api }

102

/**

103

* PageRank implementation using Gather-Sum-Apply pattern

104

*/

105

public class GSAPageRank {

106

public static void main(String[] args) throws Exception;

107

108

/** Custom gather function for PageRank computation */

109

public static class GatherRanks implements GatherFunction<Double, Double, Double>;

110

111

/** Custom sum function for aggregating ranks */

112

public static class SumRanks implements SumFunction<Double, Double, Double>;

113

114

/** Custom apply function for updating vertex values */

115

public static class UpdateRanks implements ApplyFunction<Long, Double, Double>;

116

}

117

```

118

119

##### GSA Single Source Shortest Paths

120

121

```java { .api }

122

/**

123

* SSSP implementation using Gather-Sum-Apply pattern

124

*/

125

public class GSASingleSourceShortestPaths {

126

public static void main(String[] args) throws Exception;

127

128

/** Gather minimum distances from neighbors */

129

public static class GatherDistances implements GatherFunction<Double, Double, Double>;

130

131

/** Select minimum distance */

132

public static class SelectMinDistance implements SumFunction<Double, Double, Double>;

133

134

/** Update vertex distance if improvement found */

135

public static class UpdateDistance implements ApplyFunction<Long, Double, Double>;

136

}

137

```

138

139

#### Incremental SSSP

140

141

Advanced implementation demonstrating incremental graph processing with dynamic updates.

142

143

```java { .api }

144

/**

145

* Incremental Single Source Shortest Paths

146

* Efficiently recomputes shortest paths when graph structure changes

147

*/

148

public class IncrementalSSSP {

149

public static void main(String[] args) throws Exception;

150

151

/**

152

* Process incremental updates to shortest path tree

153

* @param graph Base graph with current distances

154

* @param edgeUpdates Stream of edge additions/deletions

155

* @return Updated shortest path tree

156

*/

157

public static DataSet<Vertex<Long, Double>> processUpdates(

158

Graph<Long, Double, Double> graph,

159

DataSet<Edge<Long, Double>> edgeUpdates

160

) throws Exception;

161

}

162

```

163

164

#### Music Profiles

165

166

Real-world example demonstrating graph-based recommendation system using collaborative filtering.

167

168

```java { .api }

169

/**

170

* Music recommendation system using graph processing

171

* Builds user-song bipartite graph and computes recommendations

172

*/

173

public class MusicProfiles {

174

public static void main(String[] args) throws Exception;

175

176

/**

177

* Generate music recommendations based on user listening history

178

* @param userSongGraph Bipartite graph of users and songs

179

* @param targetUser User to generate recommendations for

180

* @return Ranked list of song recommendations

181

*/

182

public static DataSet<Tuple2<String, Double>> generateRecommendations(

183

Graph<String, NullValue, Double> userSongGraph,

184

String targetUser

185

) throws Exception;

186

}

187

```

188

189

#### Euclidean Graph Weighing

190

191

Example demonstrating spatial graph processing with geometric distance calculations.

192

193

```java { .api }

194

/**

195

* Weight graph edges by Euclidean distance between vertex coordinates

196

* Useful for spatial networks and geographic data processing

197

*/

198

public class EuclideanGraphWeighing {

199

public static void main(String[] args) throws Exception;

200

201

/**

202

* Compute Euclidean distances for graph edges

203

* @param graph Graph with vertex coordinates

204

* @return Graph with distance-weighted edges

205

*/

206

public static Graph<Long, Point, Double> computeDistances(

207

Graph<Long, Point, NullValue> graph

208

) throws Exception;

209

}

210

```

211

212

#### Pregel SSSP

213

214

Implementation showcasing the Pregel programming model for vertex-centric computation.

215

216

```java { .api }

217

/**

218

* Single Source Shortest Paths using Pregel model

219

* Demonstrates vertex-centric programming approach

220

*/

221

public class PregelSSSP {

222

public static void main(String[] args) throws Exception;

223

224

/** Pregel compute function for SSSP */

225

public static class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double>;

226

227

/** Message combiner for efficiency */

228

public static class MinMessageCombiner implements MessageCombiner<Long, Double>;

229

}

230

```

231

232

### Scala Examples

233

234

Scala implementations demonstrating functional programming approaches to graph processing.

235

236

#### Scala Connected Components

237

238

```scala { .api }

239

/**

240

* Connected Components implementation in Scala

241

* Demonstrates functional programming style with Flink Scala API

242

*/

243

object ConnectedComponents {

244

def main(args: Array[String]): Unit

245

246

/**

247

* Find connected components using label propagation

248

* @param graph Input graph

249

* @param maxIterations Maximum iterations

250

* @return Components as vertex-component pairs

251

*/

252

def findComponents(

253

graph: Graph[Long, Long, NullValue],

254

maxIterations: Int

255

): DataSet[(Long, Long)]

256

}

257

```

258

259

#### Scala GSA SSSP

260

261

```scala { .api }

262

/**

263

* GSA Single Source Shortest Paths in Scala

264

* Functional implementation of gather-sum-apply pattern

265

*/

266

object GSASingleSourceShortestPaths {

267

def main(args: Array[String]): Unit

268

269

/**

270

* Compute shortest paths using functional GSA approach

271

* @param graph Weighted graph

272

* @param srcVertex Source vertex

273

* @return Shortest distances from source

274

*/

275

def computeShortestPaths(

276

graph: Graph[Long, Double, Double],

277

srcVertex: Long

278

): DataSet[(Long, Double)]

279

}

280

```

281

282

#### Scala SSSP

283

284

```scala { .api }

285

/**

286

* Standard SSSP implementation in Scala

287

* Demonstrates idiomatic Scala programming with Flink

288

*/

289

object SingleSourceShortestPaths {

290

def main(args: Array[String]): Unit

291

292

// Functional vertex update operations

293

def updateDistance(vertex: Vertex[Long, Double], messages: Iterator[Double]): Double

294

295

// Distance initialization

296

def initializeDistances(srcVertex: Long): MapFunction[Vertex[Long, NullValue], Double]

297

}

298

```

299

300

## Data Classes and Test Utilities

301

302

### Test Data Generators

303

304

Collection of utility classes providing default datasets for testing and benchmarking examples.

305

306

```java { .api }

307

/**

308

* Default datasets for PageRank testing

309

*/

310

public class PageRankData {

311

/** Get default vertex dataset */

312

public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env);

313

314

/** Get default edge dataset */

315

public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);

316

317

/** Expected PageRank results for validation */

318

public static String getExpectedResult();

319

}

320

321

/**

322

* Test data for connected components algorithms

323

*/

324

public class ConnectedComponentsDefaultData {

325

public static DataSet<Vertex<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);

326

public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);

327

public static String getExpectedResult();

328

}

329

330

/**

331

* Spatial graph data for Euclidean distance examples

332

*/

333

public class EuclideanGraphData {

334

/** Vertices with 2D coordinates */

335

public static DataSet<Vertex<Long, Point>> getDefaultVertexDataSet(ExecutionEnvironment env);

336

337

/** Edges for spatial connectivity */

338

public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);

339

}

340

341

/**

342

* Music recommendation test data

343

*/

344

public class MusicProfilesData {

345

/** User-song interaction data */

346

public static DataSet<Tuple3<String, String, Double>> getUserSongRatings(ExecutionEnvironment env);

347

348

/** Expected recommendation results */

349

public static String getExpectedResult();

350

}

351

```

352

353

## Usage Patterns

354

355

### Standalone Example Execution

356

357

```java

358

// Direct execution of example algorithms

359

public static void main(String[] args) throws Exception {

360

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

361

362

// Create test graph

363

Graph<Long, NullValue, Double> graph = Graph.fromDataSet(

364

vertices, edges, env

365

);

366

367

// Run algorithm

368

DataSet<Vertex<Long, Double>> result =

369

SingleSourceShortestPaths.run(graph, 1L, 10);

370

371

// Print results

372

result.print();

373

}

374

```

375

376

### Integration with Framework

377

378

```java

379

// Use examples as reference for custom drivers

380

public class CustomPageRankDriver extends DriverBase<Long, NullValue, Double> {

381

private PageRank<Long> pageRankImpl;

382

383

@Override

384

public DataSet plan(Graph<Long, NullValue, Double> graph) throws Exception {

385

// Initialize PageRank with parameters

386

pageRankImpl = new PageRank<>(

387

dampingFactor.getValue(),

388

iterations.getValue()

389

);

390

391

// Convert graph format and execute

392

Graph<Long, Double, Double> initializedGraph =

393

graph.mapVertices(vertex -> 1.0);

394

395

return pageRankImpl.run(initializedGraph);

396

}

397

}

398

```

399

400

### Testing and Validation

401

402

```java

403

// Use data classes for consistent testing

404

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

405

406

// Get standard test data

407

DataSet<Vertex<Long, Double>> vertices = PageRankData.getDefaultVertexDataSet(env);

408

DataSet<Edge<Long, Double>> edges = PageRankData.getDefaultEdgeDataSet(env);

409

Graph<Long, Double, Double> testGraph = Graph.fromDataSet(vertices, edges, env);

410

411

// Run algorithm

412

PageRank<Long> algorithm = new PageRank<>(0.85, 10);

413

DataSet<Vertex<Long, Double>> results = algorithm.run(testGraph);

414

415

// Validate against expected results

416

String expected = PageRankData.getExpectedResult();

417

String actual = DataSetUtils.collect(results).toString();

418

assertEquals(expected, actual);

419

```