0
# Graph Processing
1
2
Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure. Features specialized data types, iterative processing patterns, and graph-specific operations.
3
4
## Capabilities
5
6
### PageRank Algorithm
7
8
Bulk iteration-based PageRank algorithm for computing page importance rankings in graphs.
9
10
```java { .api }
11
/**
12
* PageRank algorithm using bulk iterations.
13
* Usage: PageRank --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
14
*/
15
@SuppressWarnings("serial")
16
public class PageRank {
17
private static final double DAMPENING_FACTOR = 0.85;
18
private static final double EPSILON = 0.0001;
19
20
public static void main(String[] args) throws Exception;
21
22
/**
23
* Assigns initial rank to all pages
24
*/
25
public static final class RankAssigner
26
implements MapFunction<Long, Tuple2<Long, Double>> {
27
/**
28
* Creates RankAssigner with specified initial rank
29
* @param rank Initial rank value for all pages
30
*/
31
public RankAssigner(double rank);
32
33
/**
34
* Maps page ID to page-rank tuple
35
* @param page Page ID
36
* @return Tuple (page_id, rank)
37
*/
38
public Tuple2<Long, Double> map(Long page);
39
}
40
41
/**
42
* Builds adjacency list for outgoing edges from vertices
43
*/
44
@ForwardedFields("0")
45
public static final class BuildOutgoingEdgeList
46
implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
47
/**
48
* Reduces edges to build adjacency list
49
* @param values Iterator of edges from same source vertex
50
* @param out Collector for adjacency list entries
51
*/
52
public void reduce(
53
Iterable<Tuple2<Long, Long>> values,
54
Collector<Tuple2<Long, Long[]>> out);
55
}
56
57
/**
58
* Distributes vertex rank to all neighbors
59
*/
60
public static final class JoinVertexWithEdgesMatch
61
implements FlatMapFunction<
62
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>,
63
Tuple2<Long, Double>> {
64
/**
65
* Distributes rank evenly among neighboring vertices
66
* @param value Joined vertex-rank and adjacency list
67
* @param out Collector for distributed rank values
68
*/
69
public void flatMap(
70
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,
71
Collector<Tuple2<Long, Double>> out);
72
}
73
74
/**
75
* Applies PageRank dampening formula
76
*/
77
@ForwardedFields("0")
78
public static final class Dampener
79
implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
80
/**
81
* Creates dampener with specified parameters
82
* @param dampening Dampening factor (typically 0.85)
83
* @param numVertices Total number of vertices for random jump calculation
84
*/
85
public Dampener(double dampening, double numVertices);
86
87
/**
88
* Applies dampening formula to rank value
89
* @param value Tuple (vertex_id, accumulated_rank)
90
* @return Tuple (vertex_id, dampened_rank)
91
*/
92
public Tuple2<Long, Double> map(Tuple2<Long, Double> value);
93
}
94
95
/**
96
* Filters vertices where rank difference is below threshold
97
*/
98
public static final class EpsilonFilter
99
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
100
/**
101
* Checks if rank difference exceeds convergence threshold
102
* @param value Tuple of (old_rank, new_rank) tuples
103
* @return true if difference exceeds epsilon, false otherwise
104
*/
105
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value);
106
}
107
}
108
```
109
110
**Usage Examples:**
111
112
```java
113
// Run PageRank with custom data
114
String[] args = {
115
"--pages", "/path/to/pages.txt",
116
"--links", "/path/to/links.txt",
117
"--output", "/path/to/output",
118
"--numPages", "1000",
119
"--iterations", "20"
120
};
121
PageRank.main(args);
122
123
// Use PageRank functions in custom graph algorithm
124
DataSet<Long> pages = getPagesDataSet(env, params);
125
DataSet<Tuple2<Long, Long>> links = getLinksDataSet(env, params);
126
127
// Build PageRank pipeline
128
DataSet<Tuple2<Long, Double>> initialRanks = pages
129
.map(new PageRank.RankAssigner(1.0 / numPages));
130
131
DataSet<Tuple2<Long, Long[]>> adjacencyList = links
132
.groupBy(0)
133
.reduceGroup(new PageRank.BuildOutgoingEdgeList());
134
```
135
136
### Connected Components
137
138
Algorithm for finding connected components in undirected graphs using bulk iterations.
139
140
```java { .api }
141
/**
142
* Connected Components algorithm using bulk iterations.
143
* Usage: ConnectedComponents --vertices <path> --edges <path> --output <path>
144
*/
145
public class ConnectedComponents {
146
public static void main(String[] args) throws Exception;
147
148
/**
149
* Duplicates input value into a tuple pair
150
*/
151
public static final class DuplicateValue<T>
152
implements MapFunction<T, Tuple2<T, T>> {
153
/**
154
* Creates tuple (value, value) from input
155
* @param vertex Input value to duplicate
156
* @return Tuple containing value twice
157
*/
158
public Tuple2<T, T> map(T vertex);
159
}
160
161
/**
162
* Creates undirected edges from directed edges
163
*/
164
public static final class UndirectEdge
165
implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
166
/**
167
* Emits both (a,b) and (b,a) for input edge (a,b)
168
* @param edge Directed edge tuple
169
* @param out Collector for undirected edge pairs
170
*/
171
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out);
172
}
173
174
/**
175
* Joins neighbor vertices with component IDs
176
*/
177
public static final class NeighborWithComponentIDJoin
178
implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
179
/**
180
* Joins edge with component assignment
181
* @param vertexWithComponent Vertex with its component ID
182
* @param edge Edge tuple
183
* @return Neighbor vertex with component ID
184
*/
185
public Tuple2<Long, Long> join(
186
Tuple2<Long, Long> vertexWithComponent,
187
Tuple2<Long, Long> edge);
188
}
189
190
/**
191
* Filters and propagates minimum component IDs
192
*/
193
public static final class ComponentIdFilter
194
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
195
/**
196
* Emits vertex with minimum component ID if propagation is needed
197
* @param candidateComponentForVertex Candidate component for vertex
198
* @param currentComponentForVertex Current component for vertex
199
* @param out Collector for component updates
200
*/
201
public void join(
202
Tuple2<Long, Long> candidateComponentForVertex,
203
Tuple2<Long, Long> currentComponentForVertex,
204
Collector<Tuple2<Long, Long>> out);
205
}
206
}
207
```
208
209
### Triangle Enumeration
210
211
Algorithm for enumerating triangles in graphs with specialized edge data types.
212
213
```java { .api }
214
/**
215
* Triangle enumeration algorithm.
216
* Usage: EnumTriangles --edges <path> --output <path>
217
*/
218
public class EnumTriangles {
219
public static void main(String[] args) throws Exception;
220
221
/**
222
* Converts tuple edges to Edge objects
223
*/
224
public static class TupleEdgeConverter
225
implements MapFunction<Tuple2<Integer, Integer>, Edge> {
226
/**
227
* Converts tuple to Edge object
228
* @param tuple Input edge as tuple
229
* @return Edge object
230
*/
231
public Edge map(Tuple2<Integer, Integer> tuple);
232
}
233
}
234
```
235
236
### Transitive Closure
237
238
Naive transitive closure algorithm for computing graph reachability.
239
240
```java { .api }
241
/**
242
* Naive transitive closure algorithm using bulk iterations.
243
* Usage: TransitiveClosureNaive --edges <path> --output <path>
244
*/
245
public class TransitiveClosureNaive {
246
public static void main(String[] args) throws Exception;
247
}
248
```
249
250
### Graph Data Types
251
252
Specialized data types for graph processing operations.
253
254
```java { .api }
255
/**
256
* Graph edge representation extending Tuple2
257
*/
258
public static class Edge extends Tuple2<Integer, Integer> {
259
public static final int V1 = 0; // First vertex field index
260
public static final int V2 = 1; // Second vertex field index
261
262
public Edge();
263
public Edge(Integer v1, Integer v2);
264
265
/**
266
* Get first vertex ID
267
* @return First vertex ID
268
*/
269
public Integer getFirstVertex();
270
271
/**
272
* Get second vertex ID
273
* @return Second vertex ID
274
*/
275
public Integer getSecondVertex();
276
277
/**
278
* Set first vertex ID
279
* @param vertex1 First vertex ID
280
*/
281
public void setFirstVertex(Integer vertex1);
282
283
/**
284
* Set second vertex ID
285
* @param vertex2 Second vertex ID
286
*/
287
public void setSecondVertex(Integer vertex2);
288
289
/**
290
* Copy vertices from another edge
291
* @param edge Source edge
292
*/
293
public void copyVerticesFromEdge(Edge edge);
294
295
/**
296
* Swap vertex positions in edge
297
*/
298
public void flipVertices();
299
}
300
301
/**
302
* Three-vertex structure for triangle representation
303
*/
304
public static class Triad extends Tuple3<Integer, Integer, Integer> {
305
public static final int V1 = 0; // First vertex field index
306
public static final int V2 = 1; // Second vertex field index
307
public static final int V3 = 2; // Third vertex field index
308
309
public Triad();
310
public Triad(Integer v1, Integer v2, Integer v3);
311
312
public void setFirstVertex(Integer vertex1);
313
public void setSecondVertex(Integer vertex2);
314
public void setThirdVertex(Integer vertex3);
315
}
316
317
/**
318
* Edge with vertex degree information
319
*/
320
public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
321
public static final int V1 = 0; // First vertex field index
322
public static final int V2 = 1; // Second vertex field index
323
public static final int D1 = 2; // First vertex degree field index
324
public static final int D2 = 3; // Second vertex degree field index
325
326
public EdgeWithDegrees();
327
public EdgeWithDegrees(Integer v1, Integer v2, Integer d1, Integer d2);
328
329
public Integer getFirstVertex();
330
public Integer getSecondVertex();
331
public Integer getFirstDegree();
332
public Integer getSecondDegree();
333
334
public void setFirstVertex(Integer vertex1);
335
public void setSecondVertex(Integer vertex2);
336
public void setFirstDegree(Integer degree1);
337
public void setSecondDegree(Integer degree2);
338
}
339
```
340
341
### Graph Data Providers
342
343
Utility classes providing default graph datasets for testing.
344
345
```java { .api }
346
/**
347
* Provides default PageRank data sets
348
*/
349
public class PageRankData {
350
/**
351
* Default edge data as object arrays
352
*/
353
public static final Object[][] EDGES;
354
355
/**
356
* Creates DataSet with default edge data
357
* @param env Execution environment
358
* @return DataSet containing default edges
359
*/
360
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
361
362
/**
363
* Creates DataSet with default page data
364
* @param env Execution environment
365
* @return DataSet containing default pages
366
*/
367
public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env);
368
369
/**
370
* Get total number of pages in default dataset
371
* @return Number of pages
372
*/
373
public static int getNumberOfPages();
374
}
375
376
/**
377
* Provides default Connected Components data sets
378
*/
379
public class ConnectedComponentsData {
380
/**
381
* Default vertex IDs
382
*/
383
public static final long[] VERTICES;
384
385
/**
386
* Default edge data as object arrays
387
*/
388
public static final Object[][] EDGES;
389
390
/**
391
* Creates DataSet with default vertex data
392
* @param env Execution environment
393
* @return DataSet containing default vertices
394
*/
395
public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env);
396
397
/**
398
* Creates DataSet with default edge data
399
* @param env Execution environment
400
* @return DataSet containing default edges
401
*/
402
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
403
}
404
405
/**
406
* Provides default Triangle Enumeration data sets
407
*/
408
public class EnumTrianglesData {
409
/**
410
* Default edge data as object arrays
411
*/
412
public static final Object[][] EDGES;
413
414
/**
415
* Creates DataSet with default edge data
416
* @param env Execution environment
417
* @return DataSet containing default edges as Edge objects
418
*/
419
public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env);
420
}
421
```
422
423
**Usage Examples:**
424
425
```java
426
// Use default datasets in custom graph algorithms
427
import org.apache.flink.examples.java.graph.util.PageRankData;
428
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
429
430
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
431
432
// PageRank data
433
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
434
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);
435
int numPages = PageRankData.getNumberOfPages();
436
437
// Connected Components data
438
DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env);
439
DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);
440
```
441
442
## Common Graph Processing Patterns
443
444
### Bulk Iteration for Graph Algorithms
445
446
Most graph algorithms use Flink's bulk iteration pattern:
447
448
```java
449
// PageRank iteration pattern
450
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
451
452
DataSet<Tuple2<Long, Double>> newRanks = iteration
453
.join(adjacencyList).where(0).equalTo(0)
454
.flatMap(new JoinVertexWithEdgesMatch())
455
.groupBy(0).aggregate(SUM, 1)
456
.map(new Dampener(DAMPENING_FACTOR, numPages));
457
458
DataSet<Tuple2<Long, Double>> finalRanks = iteration.closeWith(
459
newRanks,
460
newRanks.join(iteration).where(0).equalTo(0)
461
.filter(new EpsilonFilter()) // Convergence condition
462
);
463
```
464
465
### Graph Data Format Requirements
466
467
**Pages file format (PageRank):**
468
```
469
1
470
2
471
12
472
42
473
63
474
```
475
476
**Links file format (PageRank):**
477
```
478
1 2
479
2 12
480
1 12
481
42 63
482
```
483
484
**Edges file format (Connected Components):**
485
```
486
1 2
487
2 3
488
3 4
489
5 6
490
```
491
492
## Types
493
494
### Core Graph Types
495
496
```java { .api }
497
// Vertex/page representation
498
Long pageId = 1L;
499
Long vertexId = 1L;
500
501
// Edge representations
502
Tuple2<Long, Long> edge = new Tuple2<>(1L, 2L); // Simple edge
503
Tuple2<Long, Double> pageRank = new Tuple2<>(1L, 0.5); // Page with rank
504
Tuple2<Long, Long[]> adjacencyList; // Vertex with neighbors
505
506
// Specialized edge types
507
EnumTrianglesDataTypes.Edge edge = new EnumTrianglesDataTypes.Edge(1, 2);
508
EnumTrianglesDataTypes.Triad triangle = new EnumTrianglesDataTypes.Triad(1, 2, 3);
509
```