or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-drivers.mdindex.mdinput-sources.mdlegacy-examples.mdparameter-system.mdrunner-framework.md

legacy-examples.mddocs/

0

# Legacy Examples

1

2

The legacy examples package contains standalone implementations of graph algorithms demonstrating different Flink Gelly programming models. These examples serve as educational material and reference implementations, showing scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches to graph processing.

3

4

## Capabilities

5

6

### Standalone Example Programs

7

8

#### Single Source Shortest Paths (Scatter-Gather)

9

10

Computes shortest paths from a single source vertex using the scatter-gather programming model.

11

12

```java { .api }

13

/**

14

* Single Source Shortest Paths using scatter-gather iteration

15

* Demonstrates message passing between vertices for distance computation

16

*/

17

public class SingleSourceShortestPaths implements ProgramDescription {

18

/**

19

* Main entry point for command-line execution

20

* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>

21

* @throws Exception if execution fails

22

*/

23

public static void main(String[] args) throws Exception;

24

25

/**

26

* Program description for help and documentation

27

* @return Description of the SSSP algorithm and usage

28

*/

29

public String getDescription();

30

}

31

```

32

33

**Command-line Usage:**

34

```bash

35

flink run flink-gelly-examples_2.10-1.3.3.jar \

36

org.apache.flink.graph.examples.SingleSourceShortestPaths \

37

<source_vertex_id> <input_edges_path> <output_path> <num_iterations>

38

```

39

40

**Usage Example:**

41

```bash

42

flink run flink-gelly-examples_2.10-1.3.3.jar \

43

org.apache.flink.graph.examples.SingleSourceShortestPaths \

44

1 edges.csv shortest_paths.csv 10

45

```

46

47

#### Single Source Shortest Paths (GSA)

48

49

Computes shortest paths using the gather-sum-apply programming model.

50

51

```java { .api }

52

/**

53

* Single Source Shortest Paths using gather-sum-apply iteration

54

* Demonstrates GSA programming model with gather, sum, and apply phases

55

*/

56

public class GSASingleSourceShortestPaths implements ProgramDescription {

57

/**

58

* Main entry point for command-line execution

59

* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>

60

* @throws Exception if execution fails

61

*/

62

public static void main(String[] args) throws Exception;

63

64

/**

65

* Program description for help and documentation

66

* @return Description of the GSA SSSP algorithm and usage

67

*/

68

public String getDescription();

69

}

70

```

71

72

#### Single Source Shortest Paths (Pregel)

73

74

Computes shortest paths using the vertex-centric (Pregel) programming model.

75

76

```java { .api }

77

/**

78

* Single Source Shortest Paths using vertex-centric (Pregel) computation

79

* Demonstrates Pregel programming model with compute functions and message combiners

80

*/

81

public class PregelSSSP implements ProgramDescription {

82

/**

83

* Main entry point for command-line execution

84

* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>

85

* @throws Exception if execution fails

86

*/

87

public static void main(String[] args) throws Exception;

88

89

/**

90

* Program description for help and documentation

91

* @return Description of the Pregel SSSP algorithm and usage

92

*/

93

public String getDescription();

94

}

95

```

96

97

**Inner Classes:**

98

```java { .api }

99

/**

100

* Vertex compute function for Pregel SSSP implementation

101

* Processes incoming messages and updates vertex state

102

*/

103

public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {

104

public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) throws Exception;

105

}

106

107

/**

108

* Message combiner for Pregel SSSP to reduce message overhead

109

* Combines multiple messages to the same vertex

110

*/

111

public static final class SSSPCombiner extends MessageCombiner<Long, Double> {

112

public void combineMessages(MessageIterator<Double> messages) throws Exception;

113

}

114

```

115

116

#### Music Profiles Analysis

117

118

Complex example demonstrating mixed DataSet and Gelly API usage for user-song bipartite graph analysis.

119

120

```java { .api }

121

/**

122

* Music Profiles analysis using bipartite graphs and community detection

123

* Demonstrates integration of DataSet operations with graph processing

124

*/

125

public class MusicProfiles implements ProgramDescription {

126

/**

127

* Main entry point for command-line execution

128

* @param args Complex parameter list for music analysis pipeline

129

* @throws Exception if execution fails

130

*/

131

public static void main(String[] args) throws Exception;

132

133

/**

134

* Program description for help and documentation

135

* @return Description of the music profiles analysis pipeline

136

*/

137

public String getDescription();

138

}

139

```

140

141

**Command-line Parameters:**

142

```bash

143

<input_user_song_triplets_path> <input_song_mismatches_path> <output_top_tracks_path>

144

<playcount_threshold> <output_communities_path> <num_iterations>

145

```

146

147

**Public Transformation Classes:**

148

```java { .api }

149

/**

150

* Extract mismatch song IDs from mismatch data

151

* Maps mismatch records to song ID strings

152

*/

153

public static final class ExtractMismatchSongIds implements MapFunction<String, String> {

154

public String map(String value) throws Exception;

155

}

156

157

/**

158

* Filter out songs that appear in the mismatch list

159

* Removes problematic songs from the analysis dataset

160

*/

161

public static final class FilterOutMismatches implements FilterFunction<Tuple3<String, String, Integer>> {

162

public boolean filter(Tuple3<String, String, Integer> value) throws Exception;

163

}

164

165

/**

166

* Filter song vertices from user-song bipartite graph

167

* Identifies vertices representing songs vs users

168

*/

169

public static final class FilterSongNodes implements FilterFunction<Vertex<String, Double>> {

170

public boolean filter(Vertex<String, Double> vertex) throws Exception;

171

}

172

173

/**

174

* Get top song per user based on play counts

175

* Finds the most played song for each user

176

*/

177

public static final class GetTopSongPerUser implements GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>> {

178

public void reduce(Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple2<String, String>> out) throws Exception;

179

}

180

181

/**

182

* Create edges between users who share similar music preferences

183

* Builds user similarity graph based on common top songs

184

*/

185

public static final class CreateSimilarUserEdges implements GroupReduceFunction<Tuple2<String, String>, Edge<String, NullValue>> {

186

public void reduce(Iterable<Tuple2<String, String>> values, Collector<Edge<String, NullValue>> out) throws Exception;

187

}

188

```

189

190

#### Euclidean Graph Weighting

191

192

Demonstrates graph transformations and geometric computations with triplet operations.

193

194

```java { .api }

195

/**

196

* Euclidean Graph Weighting example showing graph transformations

197

* Demonstrates triplet operations and custom distance computations

198

*/

199

public class EuclideanGraphWeighing implements ProgramDescription {

200

/**

201

* Main entry point for command-line execution

202

* @param args Command-line arguments: <input vertices path> <input edges path> <output path>

203

* @throws Exception if execution fails

204

*/

205

public static void main(String[] args) throws Exception;

206

207

/**

208

* Program description for help and documentation

209

* @return Description of the Euclidean weighting transformation

210

*/

211

public String getDescription();

212

}

213

```

214

215

**Public Utility Classes:**

216

```java { .api }

217

/**

218

* 2D Point class for geometric computations

219

* Represents vertex positions in Euclidean space

220

*/

221

public static class Point implements Serializable {

222

public double x;

223

public double y;

224

225

public Point(double x, double y);

226

227

/**

228

* Calculate Euclidean distance between two points

229

* @param other Other point to calculate distance to

230

* @return Euclidean distance as double

231

*/

232

public double euclideanDistance(Point other);

233

}

234

```

235

236

#### Incremental SSSP

237

238

Advanced example showing incremental shortest path updates when edges are removed from the graph.

239

240

```java { .api }

241

/**

242

* Incremental Single Source Shortest Paths with edge removal

243

* Demonstrates dynamic graph updates and incremental computation

244

*/

245

public class IncrementalSSSP implements ProgramDescription {

246

/**

247

* Main entry point for command-line execution

248

* @param args Complex parameter list for incremental SSSP

249

* @throws Exception if execution fails

250

*/

251

public static void main(String[] args) throws Exception;

252

253

/**

254

* Program description for help and documentation

255

* @return Description of the incremental SSSP algorithm

256

*/

257

public String getDescription();

258

259

/**

260

* Check if an edge is part of the shortest path tree

261

* @param edgeToBeRemoved Edge to check for SSSP membership

262

* @param edgesInSSSP DataSet of edges currently in the SSSP tree

263

* @return True if edge is in SSSP, false otherwise

264

* @throws Exception if check fails

265

*/

266

public static boolean isInSSSP(Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception;

267

}

268

```

269

270

**Inner Algorithm Classes:**

271

```java { .api }

272

/**

273

* Message passing function for invalidating affected paths

274

* Sends invalidation messages when edges are removed

275

*/

276

public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {

277

public void sendMessages(Vertex<Long, Double> vertex) throws Exception;

278

}

279

280

/**

281

* Vertex update function for recalculating distances

282

* Updates vertex distances based on invalidation messages

283

*/

284

public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {

285

public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception;

286

}

287

```

288

289

### Reusable Algorithm Classes

290

291

#### PageRank Algorithm Implementation

292

293

Generic PageRank algorithm implementation for programmatic usage.

294

295

```java { .api }

296

/**

297

* Generic PageRank algorithm implementation using scatter-gather iteration

298

* Can be used as a reusable component in larger applications

299

*/

300

public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {

301

/**

302

* Create PageRank algorithm with parameters

303

* @param beta Damping factor for random walk (typically 0.85)

304

* @param maxIterations Maximum number of iterations

305

*/

306

public PageRank(double beta, int maxIterations);

307

308

/**

309

* Execute PageRank on the input graph

310

* @param network Input graph with Double vertex and edge values

311

* @return DataSet of vertices with PageRank scores

312

* @throws Exception if algorithm execution fails

313

*/

314

public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;

315

}

316

```

317

318

**Inner Algorithm Classes:**

319

```java { .api }

320

/**

321

* Message passing function for PageRank score distribution

322

* Sends rank messages along outgoing edges

323

*/

324

public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {

325

public void sendMessages(Vertex<K, Double> vertex) throws Exception;

326

}

327

328

/**

329

* Vertex update function for PageRank score computation

330

* Updates vertex ranks based on incoming messages

331

*/

332

public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {

333

public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) throws Exception;

334

}

335

336

/**

337

* Initialization function for vertex weights

338

* Sets initial PageRank values for all vertices

339

*/

340

public static final class InitWeights implements MapFunction<Vertex<K, NullValue>, Double> {

341

public Double map(Vertex<K, NullValue> value) throws Exception;

342

}

343

```

344

345

#### GSA PageRank Implementation

346

347

PageRank implementation using the gather-sum-apply programming model.

348

349

```java { .api }

350

/**

351

* PageRank algorithm using gather-sum-apply iteration

352

* Alternative implementation demonstrating GSA programming model

353

*/

354

public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {

355

/**

356

* Create GSA PageRank algorithm with parameters

357

* @param beta Damping factor for random walk

358

* @param maxIterations Maximum number of iterations

359

*/

360

public GSAPageRank(double beta, int maxIterations);

361

362

/**

363

* Execute GSA PageRank on the input graph

364

* @param network Input graph with Double vertex and edge values

365

* @return DataSet of vertices with PageRank scores

366

* @throws Exception if algorithm execution fails

367

*/

368

public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;

369

}

370

```

371

372

**Inner GSA Classes:**

373

```java { .api }

374

/**

375

* Gather function for collecting neighbor ranks

376

* Gathers PageRank values from neighboring vertices

377

*/

378

public static final class GatherRanks implements GatherFunction<Double, Double, Double> {

379

public Double gather(Neighbor<Double, Double> neighbor) throws Exception;

380

}

381

382

/**

383

* Sum function for aggregating gathered ranks

384

* Sums all gathered rank values for each vertex

385

*/

386

public static final class SumRanks implements SumFunction<Double, Double, Double> {

387

public Double sum(Double newValue, Double currentValue) throws Exception;

388

}

389

390

/**

391

* Apply function for updating vertex ranks

392

* Applies PageRank formula with damping factor

393

*/

394

public static final class UpdateRanks<K> implements ApplyFunction<K, Double, Double> {

395

public void apply(Double summedRanks, Vertex<K, Double> vertex) throws Exception;

396

}

397

```

398

399

### Data Generators

400

401

The examples package includes comprehensive data generators for testing and demonstration:

402

403

```java { .api }

404

/**

405

* PageRank test data generator

406

* Provides sample graphs and expected results for PageRank algorithm

407

*/

408

public class PageRankData {

409

/**

410

* Get default edge dataset for PageRank testing

411

* @param env Flink ExecutionEnvironment

412

* @return DataSet of sample edges

413

*/

414

public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);

415

416

// Public constants for test data

417

public static final String EDGES = "...";

418

public static final String RANKS_AFTER_3_ITERATIONS = "...";

419

}

420

421

/**

422

* Single Source Shortest Paths test data generator

423

* Provides sample graphs and expected SSSP results

424

*/

425

public class SingleSourceShortestPathsData {

426

/**

427

* Get default edge dataset for SSSP testing

428

* @param env Flink ExecutionEnvironment

429

* @return DataSet of sample edges with weights

430

*/

431

public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);

432

433

// Public constants

434

public static final Long SRC_VERTEX_ID = 1L;

435

public static final String EDGES = "...";

436

public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "...";

437

}

438

439

/**

440

* Music profiles test data generator

441

* Provides sample user-song interaction data

442

*/

443

public class MusicProfilesData {

444

/**

445

* Get sample user-song triplet data

446

* @param env Flink ExecutionEnvironment

447

* @return DataSet of user-song-playcount triplets

448

*/

449

public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env);

450

451

/**

452

* Get sample mismatch data for filtering

453

* @param env Flink ExecutionEnvironment

454

* @return DataSet of problematic song IDs

455

*/

456

public static DataSet<String> getMismatches(ExecutionEnvironment env);

457

}

458

459

/**

460

* Euclidean graph test data generator

461

* Provides sample geometric graphs with point coordinates

462

*/

463

public class EuclideanGraphData {

464

/**

465

* Get default vertex dataset with 2D coordinates

466

* @param env Flink ExecutionEnvironment

467

* @return DataSet of vertices with Point values

468

*/

469

public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env);

470

471

/**

472

* Get corresponding edge dataset

473

* @param env Flink ExecutionEnvironment

474

* @return DataSet of edges with distance weights

475

*/

476

public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);

477

}

478

```

479

480

### Scala Examples

481

482

The package also includes Scala implementations demonstrating the Scala Graph API:

483

484

```scala { .api }

485

/**

486

* Connected Components implementation in Scala

487

* Demonstrates Scala Graph API usage with GSA

488

*/

489

object ConnectedComponents {

490

/**

491

* Main entry point for Scala Connected Components

492

* @param args Command-line arguments: <edge path> <output path> <num iterations>

493

*/

494

def main(args: Array[String]): Unit

495

}

496

497

/**

498

* Single Source Shortest Paths in Scala using scatter-gather

499

* Shows Scala functional programming patterns with Gelly

500

*/

501

object SingleSourceShortestPaths {

502

/**

503

* Main entry point for Scala SSSP

504

* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>

505

*/

506

def main(args: Array[String]): Unit

507

}

508

509

/**

510

* GSA Single Source Shortest Paths in Scala

511

* Demonstrates GSA programming model in Scala

512

*/

513

object GSASingleSourceShortestPaths {

514

/**

515

* Main entry point for Scala GSA SSSP

516

* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>

517

*/

518

def main(args: Array[String]): Unit

519

}

520

```

521

522

## Types

523

524

```java { .api }

525

// Graph algorithm interfaces

526

interface GraphAlgorithm<K, VV, EV, T> extends Serializable {

527

DataSet<T> run(Graph<K, VV, EV> input) throws Exception;

528

}

529

530

interface ProgramDescription {

531

String getDescription();

532

}

533

534

// Gelly programming model interfaces

535

abstract class ComputeFunction<K, VV, EV, M> {

536

public abstract void compute(Vertex<K, VV> vertex, MessageIterator<M> messages) throws Exception;

537

}

538

539

abstract class MessagingFunction<K, VV, EV, M> {

540

public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;

541

}

542

543

abstract class VertexUpdateFunction<K, VV, M> {

544

public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;

545

}

546

547

// GSA interfaces

548

interface GatherFunction<VV, EV, M> {

549

M gather(Neighbor<VV, EV> neighbor) throws Exception;

550

}

551

552

interface SumFunction<VV, EV, M> {

553

M sum(M newValue, M currentValue) throws Exception;

554

}

555

556

interface ApplyFunction<K, VV, M> {

557

void apply(M newValue, Vertex<K, VV> vertex) throws Exception;

558

}

559

560

// Flink DataSet API types

561

class DataSet<T> {

562

// Distributed dataset operations

563

}

564

565

class Tuple2<T0, T1> {

566

public T0 f0;

567

public T1 f1;

568

}

569

570

class Tuple3<T0, T1, T2> {

571

public T0 f0;

572

public T1 f1;

573

public T2 f2;

574

}

575

```