or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md

graph-processing.mddocs/

0

# Graph Processing

1

2

Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure. Features specialized data types, iterative processing patterns, and graph-specific operations.

3

4

## Capabilities

5

6

### PageRank Algorithm

7

8

Bulk iteration-based PageRank algorithm for computing page importance rankings in graphs.

9

10

```java { .api }

11

/**

12

* PageRank algorithm using bulk iterations.

13

* Usage: PageRank --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>

14

*/

15

@SuppressWarnings("serial")

16

public class PageRank {

17

private static final double DAMPENING_FACTOR = 0.85;

18

private static final double EPSILON = 0.0001;

19

20

public static void main(String[] args) throws Exception;

21

22

/**

23

* Assigns initial rank to all pages

24

*/

25

public static final class RankAssigner

26

implements MapFunction<Long, Tuple2<Long, Double>> {

27

/**

28

* Creates RankAssigner with specified initial rank

29

* @param rank Initial rank value for all pages

30

*/

31

public RankAssigner(double rank);

32

33

/**

34

* Maps page ID to page-rank tuple

35

* @param page Page ID

36

* @return Tuple (page_id, rank)

37

*/

38

public Tuple2<Long, Double> map(Long page);

39

}

40

41

/**

42

* Builds adjacency list for outgoing edges from vertices

43

*/

44

@ForwardedFields("0")

45

public static final class BuildOutgoingEdgeList

46

implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {

47

/**

48

* Reduces edges to build adjacency list

49

* @param values Iterator of edges from same source vertex

50

* @param out Collector for adjacency list entries

51

*/

52

public void reduce(

53

Iterable<Tuple2<Long, Long>> values,

54

Collector<Tuple2<Long, Long[]>> out);

55

}

56

57

/**

58

* Distributes vertex rank to all neighbors

59

*/

60

public static final class JoinVertexWithEdgesMatch

61

implements FlatMapFunction<

62

Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>,

63

Tuple2<Long, Double>> {

64

/**

65

* Distributes rank evenly among neighboring vertices

66

* @param value Joined vertex-rank and adjacency list

67

* @param out Collector for distributed rank values

68

*/

69

public void flatMap(

70

Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,

71

Collector<Tuple2<Long, Double>> out);

72

}

73

74

/**

75

* Applies PageRank dampening formula

76

*/

77

@ForwardedFields("0")

78

public static final class Dampener

79

implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {

80

/**

81

* Creates dampener with specified parameters

82

* @param dampening Dampening factor (typically 0.85)

83

* @param numVertices Total number of vertices for random jump calculation

84

*/

85

public Dampener(double dampening, double numVertices);

86

87

/**

88

* Applies dampening formula to rank value

89

* @param value Tuple (vertex_id, accumulated_rank)

90

* @return Tuple (vertex_id, dampened_rank)

91

*/

92

public Tuple2<Long, Double> map(Tuple2<Long, Double> value);

93

}

94

95

/**

96

* Filters vertices where rank difference is below threshold

97

*/

98

public static final class EpsilonFilter

99

implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

100

/**

101

* Checks if rank difference exceeds convergence threshold

102

* @param value Tuple of (old_rank, new_rank) tuples

103

* @return true if difference exceeds epsilon, false otherwise

104

*/

105

public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value);

106

}

107

}

108

```

109

110

**Usage Examples:**

111

112

```java

113

// Run PageRank with custom data

114

String[] args = {

115

"--pages", "/path/to/pages.txt",

116

"--links", "/path/to/links.txt",

117

"--output", "/path/to/output",

118

"--numPages", "1000",

119

"--iterations", "20"

120

};

121

PageRank.main(args);

122

123

// Use PageRank functions in custom graph algorithm

124

DataSet<Long> pages = getPagesDataSet(env, params);

125

DataSet<Tuple2<Long, Long>> links = getLinksDataSet(env, params);

126

127

// Build PageRank pipeline

128

DataSet<Tuple2<Long, Double>> initialRanks = pages

129

.map(new PageRank.RankAssigner(1.0 / numPages));

130

131

DataSet<Tuple2<Long, Long[]>> adjacencyList = links

132

.groupBy(0)

133

.reduceGroup(new PageRank.BuildOutgoingEdgeList());

134

```

135

136

### Connected Components

137

138

Algorithm for finding connected components in undirected graphs using bulk iterations.

139

140

```java { .api }

141

/**

142

* Connected Components algorithm using bulk iterations.

143

* Usage: ConnectedComponents --vertices <path> --edges <path> --output <path>

144

*/

145

public class ConnectedComponents {

146

public static void main(String[] args) throws Exception;

147

148

/**

149

* Duplicates input value into a tuple pair

150

*/

151

public static final class DuplicateValue<T>

152

implements MapFunction<T, Tuple2<T, T>> {

153

/**

154

* Creates tuple (value, value) from input

155

* @param vertex Input value to duplicate

156

* @return Tuple containing value twice

157

*/

158

public Tuple2<T, T> map(T vertex);

159

}

160

161

/**

162

* Creates undirected edges from directed edges

163

*/

164

public static final class UndirectEdge

165

implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

166

/**

167

* Emits both (a,b) and (b,a) for input edge (a,b)

168

* @param edge Directed edge tuple

169

* @param out Collector for undirected edge pairs

170

*/

171

public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out);

172

}

173

174

/**

175

* Joins neighbor vertices with component IDs

176

*/

177

public static final class NeighborWithComponentIDJoin

178

implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

179

/**

180

* Joins edge with component assignment

181

* @param vertexWithComponent Vertex with its component ID

182

* @param edge Edge tuple

183

* @return Neighbor vertex with component ID

184

*/

185

public Tuple2<Long, Long> join(

186

Tuple2<Long, Long> vertexWithComponent,

187

Tuple2<Long, Long> edge);

188

}

189

190

/**

191

* Filters and propagates minimum component IDs

192

*/

193

public static final class ComponentIdFilter

194

implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

195

/**

196

* Emits vertex with minimum component ID if propagation is needed

197

* @param candidateComponentForVertex Candidate component for vertex

198

* @param currentComponentForVertex Current component for vertex

199

* @param out Collector for component updates

200

*/

201

public void join(

202

Tuple2<Long, Long> candidateComponentForVertex,

203

Tuple2<Long, Long> currentComponentForVertex,

204

Collector<Tuple2<Long, Long>> out);

205

}

206

}

207

```

208

209

### Triangle Enumeration

210

211

Algorithm for enumerating triangles in graphs with specialized edge data types.

212

213

```java { .api }

214

/**

215

* Triangle enumeration algorithm.

216

* Usage: EnumTriangles --edges <path> --output <path>

217

*/

218

public class EnumTriangles {

219

public static void main(String[] args) throws Exception;

220

221

/**

222

* Converts tuple edges to Edge objects

223

*/

224

public static class TupleEdgeConverter

225

implements MapFunction<Tuple2<Integer, Integer>, Edge> {

226

/**

227

* Converts tuple to Edge object

228

* @param tuple Input edge as tuple

229

* @return Edge object

230

*/

231

public Edge map(Tuple2<Integer, Integer> tuple);

232

}

233

}

234

```

235

236

### Transitive Closure

237

238

Naive transitive closure algorithm for computing graph reachability.

239

240

```java { .api }

241

/**

242

* Naive transitive closure algorithm using bulk iterations.

243

* Usage: TransitiveClosureNaive --edges <path> --output <path>

244

*/

245

public class TransitiveClosureNaive {

246

public static void main(String[] args) throws Exception;

247

}

248

```

249

250

### Graph Data Types

251

252

Specialized data types for graph processing operations.

253

254

```java { .api }

255

/**

256

* Graph edge representation extending Tuple2

257

*/

258

public static class Edge extends Tuple2<Integer, Integer> {

259

public static final int V1 = 0; // First vertex field index

260

public static final int V2 = 1; // Second vertex field index

261

262

public Edge();

263

public Edge(Integer v1, Integer v2);

264

265

/**

266

* Get first vertex ID

267

* @return First vertex ID

268

*/

269

public Integer getFirstVertex();

270

271

/**

272

* Get second vertex ID

273

* @return Second vertex ID

274

*/

275

public Integer getSecondVertex();

276

277

/**

278

* Set first vertex ID

279

* @param vertex1 First vertex ID

280

*/

281

public void setFirstVertex(Integer vertex1);

282

283

/**

284

* Set second vertex ID

285

* @param vertex2 Second vertex ID

286

*/

287

public void setSecondVertex(Integer vertex2);

288

289

/**

290

* Copy vertices from another edge

291

* @param edge Source edge

292

*/

293

public void copyVerticesFromEdge(Edge edge);

294

295

/**

296

* Swap vertex positions in edge

297

*/

298

public void flipVertices();

299

}

300

301

/**

302

* Three-vertex structure for triangle representation

303

*/

304

public static class Triad extends Tuple3<Integer, Integer, Integer> {

305

public static final int V1 = 0; // First vertex field index

306

public static final int V2 = 1; // Second vertex field index

307

public static final int V3 = 2; // Third vertex field index

308

309

public Triad();

310

public Triad(Integer v1, Integer v2, Integer v3);

311

312

public void setFirstVertex(Integer vertex1);

313

public void setSecondVertex(Integer vertex2);

314

public void setThirdVertex(Integer vertex3);

315

}

316

317

/**

318

* Edge with vertex degree information

319

*/

320

public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {

321

public static final int V1 = 0; // First vertex field index

322

public static final int V2 = 1; // Second vertex field index

323

public static final int D1 = 2; // First vertex degree field index

324

public static final int D2 = 3; // Second vertex degree field index

325

326

public EdgeWithDegrees();

327

public EdgeWithDegrees(Integer v1, Integer v2, Integer d1, Integer d2);

328

329

public Integer getFirstVertex();

330

public Integer getSecondVertex();

331

public Integer getFirstDegree();

332

public Integer getSecondDegree();

333

334

public void setFirstVertex(Integer vertex1);

335

public void setSecondVertex(Integer vertex2);

336

public void setFirstDegree(Integer degree1);

337

public void setSecondDegree(Integer degree2);

338

}

339

```

340

341

### Graph Data Providers

342

343

Utility classes providing default graph datasets for testing.

344

345

```java { .api }

346

/**

347

* Provides default PageRank data sets

348

*/

349

public class PageRankData {

350

/**

351

* Default edge data as object arrays

352

*/

353

public static final Object[][] EDGES;

354

355

/**

356

* Creates DataSet with default edge data

357

* @param env Execution environment

358

* @return DataSet containing default edges

359

*/

360

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);

361

362

/**

363

* Creates DataSet with default page data

364

* @param env Execution environment

365

* @return DataSet containing default pages

366

*/

367

public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env);

368

369

/**

370

* Get total number of pages in default dataset

371

* @return Number of pages

372

*/

373

public static int getNumberOfPages();

374

}

375

376

/**

377

* Provides default Connected Components data sets

378

*/

379

public class ConnectedComponentsData {

380

/**

381

* Default vertex IDs

382

*/

383

public static final long[] VERTICES;

384

385

/**

386

* Default edge data as object arrays

387

*/

388

public static final Object[][] EDGES;

389

390

/**

391

* Creates DataSet with default vertex data

392

* @param env Execution environment

393

* @return DataSet containing default vertices

394

*/

395

public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env);

396

397

/**

398

* Creates DataSet with default edge data

399

* @param env Execution environment

400

* @return DataSet containing default edges

401

*/

402

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);

403

}

404

405

/**

406

* Provides default Triangle Enumeration data sets

407

*/

408

public class EnumTrianglesData {

409

/**

410

* Default edge data as object arrays

411

*/

412

public static final Object[][] EDGES;

413

414

/**

415

* Creates DataSet with default edge data

416

* @param env Execution environment

417

* @return DataSet containing default edges as Edge objects

418

*/

419

public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env);

420

}

421

```

422

423

**Usage Examples:**

424

425

```java

426

// Use default datasets in custom graph algorithms

427

import org.apache.flink.examples.java.graph.util.PageRankData;

428

import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;

429

430

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

431

432

// PageRank data

433

DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);

434

DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);

435

int numPages = PageRankData.getNumberOfPages();

436

437

// Connected Components data

438

DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env);

439

DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);

440

```

441

442

## Common Graph Processing Patterns

443

444

### Bulk Iteration for Graph Algorithms

445

446

Most graph algorithms use Flink's bulk iteration pattern:

447

448

```java

449

// PageRank iteration pattern

450

IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

451

452

DataSet<Tuple2<Long, Double>> newRanks = iteration

453

.join(adjacencyList).where(0).equalTo(0)

454

.flatMap(new JoinVertexWithEdgesMatch())

455

.groupBy(0).aggregate(SUM, 1)

456

.map(new Dampener(DAMPENING_FACTOR, numPages));

457

458

DataSet<Tuple2<Long, Double>> finalRanks = iteration.closeWith(

459

newRanks,

460

newRanks.join(iteration).where(0).equalTo(0)

461

.filter(new EpsilonFilter()) // Convergence condition

462

);

463

```

464

465

### Graph Data Format Requirements

466

467

**Pages file format (PageRank):**

468

```

469

1

470

2

471

12

472

42

473

63

474

```

475

476

**Links file format (PageRank):**

477

```

478

1 2

479

2 12

480

1 12

481

42 63

482

```

483

484

**Edges file format (Connected Components):**

485

```

486

1 2

487

2 3

488

3 4

489

5 6

490

```

491

492

## Types

493

494

### Core Graph Types

495

496

```java { .api }

497

// Vertex/page representation

498

Long pageId = 1L;

499

Long vertexId = 1L;

500

501

// Edge representations

502

Tuple2<Long, Long> edge = new Tuple2<>(1L, 2L); // Simple edge

503

Tuple2<Long, Double> pageRank = new Tuple2<>(1L, 0.5); // Page with rank

504

Tuple2<Long, Long[]> adjacencyList; // Vertex with neighbors

505

506

// Specialized edge types

507

EnumTrianglesDataTypes.Edge edge = new EnumTrianglesDataTypes.Edge(1, 2);

508

EnumTrianglesDataTypes.Triad triangle = new EnumTrianglesDataTypes.Triad(1, 2, 3);

509

```