Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure. Features specialized data types, iterative processing patterns, and graph-specific operations.
Bulk iteration-based PageRank algorithm for computing page importance rankings in graphs.
/**
* PageRank algorithm using bulk iterations.
* Usage: PageRank --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
*/
@SuppressWarnings("serial")
public class PageRank {
private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;
public static void main(String[] args) throws Exception;
/**
* Assigns initial rank to all pages
*/
public static final class RankAssigner
implements MapFunction<Long, Tuple2<Long, Double>> {
/**
* Creates RankAssigner with specified initial rank
* @param rank Initial rank value for all pages
*/
public RankAssigner(double rank);
/**
* Maps page ID to page-rank tuple
* @param page Page ID
* @return Tuple (page_id, rank)
*/
public Tuple2<Long, Double> map(Long page);
}
/**
* Builds adjacency list for outgoing edges from vertices
*/
@ForwardedFields("0")
public static final class BuildOutgoingEdgeList
implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
/**
* Reduces edges to build adjacency list
* @param values Iterator of edges from same source vertex
* @param out Collector for adjacency list entries
*/
public void reduce(
Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long[]>> out);
}
/**
* Distributes vertex rank to all neighbors
*/
public static final class JoinVertexWithEdgesMatch
implements FlatMapFunction<
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>,
Tuple2<Long, Double>> {
/**
* Distributes rank evenly among neighboring vertices
* @param value Joined vertex-rank and adjacency list
* @param out Collector for distributed rank values
*/
public void flatMap(
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,
Collector<Tuple2<Long, Double>> out);
}
/**
* Applies PageRank dampening formula
*/
@ForwardedFields("0")
public static final class Dampener
implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
/**
* Creates dampener with specified parameters
* @param dampening Dampening factor (typically 0.85)
* @param numVertices Total number of vertices for random jump calculation
*/
public Dampener(double dampening, double numVertices);
/**
* Applies dampening formula to rank value
* @param value Tuple (vertex_id, accumulated_rank)
* @return Tuple (vertex_id, dampened_rank)
*/
public Tuple2<Long, Double> map(Tuple2<Long, Double> value);
}
/**
* Filters vertices where rank difference is below threshold
*/
public static final class EpsilonFilter
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
/**
* Checks if rank difference exceeds convergence threshold
* @param value Tuple of (old_rank, new_rank) tuples
* @return true if difference exceeds epsilon, false otherwise
*/
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value);
}
}Usage Examples:
// Run PageRank with custom data
String[] args = {
"--pages", "/path/to/pages.txt",
"--links", "/path/to/links.txt",
"--output", "/path/to/output",
"--numPages", "1000",
"--iterations", "20"
};
PageRank.main(args);
// Use PageRank functions in custom graph algorithm
DataSet<Long> pages = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> links = getLinksDataSet(env, params);
// Build PageRank pipeline
DataSet<Tuple2<Long, Double>> initialRanks = pages
.map(new PageRank.RankAssigner(1.0 / numPages));
DataSet<Tuple2<Long, Long[]>> adjacencyList = links
.groupBy(0)
.reduceGroup(new PageRank.BuildOutgoingEdgeList());Algorithm for finding connected components in undirected graphs using bulk iterations.
/**
* Connected Components algorithm using bulk iterations.
* Usage: ConnectedComponents --vertices <path> --edges <path> --output <path>
*/
public class ConnectedComponents {
public static void main(String[] args) throws Exception;
/**
* Duplicates input value into a tuple pair
*/
public static final class DuplicateValue<T>
implements MapFunction<T, Tuple2<T, T>> {
/**
* Creates tuple (value, value) from input
* @param vertex Input value to duplicate
* @return Tuple containing value twice
*/
public Tuple2<T, T> map(T vertex);
}
/**
* Creates undirected edges from directed edges
*/
public static final class UndirectEdge
implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Emits both (a,b) and (b,a) for input edge (a,b)
* @param edge Directed edge tuple
* @param out Collector for undirected edge pairs
*/
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out);
}
/**
* Joins neighbor vertices with component IDs
*/
public static final class NeighborWithComponentIDJoin
implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Joins edge with component assignment
* @param vertexWithComponent Vertex with its component ID
* @param edge Edge tuple
* @return Neighbor vertex with component ID
*/
public Tuple2<Long, Long> join(
Tuple2<Long, Long> vertexWithComponent,
Tuple2<Long, Long> edge);
}
/**
* Filters and propagates minimum component IDs
*/
public static final class ComponentIdFilter
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Emits vertex with minimum component ID if propagation is needed
* @param candidateComponentForVertex Candidate component for vertex
* @param currentComponentForVertex Current component for vertex
* @param out Collector for component updates
*/
public void join(
Tuple2<Long, Long> candidateComponentForVertex,
Tuple2<Long, Long> currentComponentForVertex,
Collector<Tuple2<Long, Long>> out);
}
}Algorithm for enumerating triangles in graphs with specialized edge data types.
/**
* Triangle enumeration algorithm.
* Usage: EnumTriangles --edges <path> --output <path>
*/
public class EnumTriangles {
public static void main(String[] args) throws Exception;
/**
* Converts tuple edges to Edge objects
*/
public static class TupleEdgeConverter
implements MapFunction<Tuple2<Integer, Integer>, Edge> {
/**
* Converts tuple to Edge object
* @param tuple Input edge as tuple
* @return Edge object
*/
public Edge map(Tuple2<Integer, Integer> tuple);
}
}Naive transitive closure algorithm for computing graph reachability.
/**
* Naive transitive closure algorithm using bulk iterations.
* Usage: TransitiveClosureNaive --edges <path> --output <path>
*/
public class TransitiveClosureNaive {
public static void main(String[] args) throws Exception;
}Specialized data types for graph processing operations.
/**
* Graph edge representation extending Tuple2
*/
public static class Edge extends Tuple2<Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public Edge();
public Edge(Integer v1, Integer v2);
/**
* Get first vertex ID
* @return First vertex ID
*/
public Integer getFirstVertex();
/**
* Get second vertex ID
* @return Second vertex ID
*/
public Integer getSecondVertex();
/**
* Set first vertex ID
* @param vertex1 First vertex ID
*/
public void setFirstVertex(Integer vertex1);
/**
* Set second vertex ID
* @param vertex2 Second vertex ID
*/
public void setSecondVertex(Integer vertex2);
/**
* Copy vertices from another edge
* @param edge Source edge
*/
public void copyVerticesFromEdge(Edge edge);
/**
* Swap vertex positions in edge
*/
public void flipVertices();
}
/**
* Three-vertex structure for triangle representation
*/
public static class Triad extends Tuple3<Integer, Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public static final int V3 = 2; // Third vertex field index
public Triad();
public Triad(Integer v1, Integer v2, Integer v3);
public void setFirstVertex(Integer vertex1);
public void setSecondVertex(Integer vertex2);
public void setThirdVertex(Integer vertex3);
}
/**
* Edge with vertex degree information
*/
public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public static final int D1 = 2; // First vertex degree field index
public static final int D2 = 3; // Second vertex degree field index
public EdgeWithDegrees();
public EdgeWithDegrees(Integer v1, Integer v2, Integer d1, Integer d2);
public Integer getFirstVertex();
public Integer getSecondVertex();
public Integer getFirstDegree();
public Integer getSecondDegree();
public void setFirstVertex(Integer vertex1);
public void setSecondVertex(Integer vertex2);
public void setFirstDegree(Integer degree1);
public void setSecondDegree(Integer degree2);
}Utility classes providing default graph datasets for testing.
/**
* Provides default PageRank data sets
*/
public class PageRankData {
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges
*/
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default page data
* @param env Execution environment
* @return DataSet containing default pages
*/
public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env);
/**
* Get total number of pages in default dataset
* @return Number of pages
*/
public static int getNumberOfPages();
}
/**
* Provides default Connected Components data sets
*/
public class ConnectedComponentsData {
/**
* Default vertex IDs
*/
public static final long[] VERTICES;
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default vertex data
* @param env Execution environment
* @return DataSet containing default vertices
*/
public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges
*/
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}
/**
* Provides default Triangle Enumeration data sets
*/
public class EnumTrianglesData {
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges as Edge objects
*/
public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env);
}Usage Examples:
// Use default datasets in custom graph algorithms
import org.apache.flink.examples.java.graph.util.PageRankData;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// PageRank data
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);
int numPages = PageRankData.getNumberOfPages();
// Connected Components data
DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);Most graph algorithms use Flink's bulk iteration pattern:
// PageRank iteration pattern
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
.join(adjacencyList).where(0).equalTo(0)
.flatMap(new JoinVertexWithEdgesMatch())
.groupBy(0).aggregate(SUM, 1)
.map(new Dampener(DAMPENING_FACTOR, numPages));
DataSet<Tuple2<Long, Double>> finalRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
.filter(new EpsilonFilter()) // Convergence condition
);Pages file format (PageRank):
1
2
12
42
63Links file format (PageRank):
1 2
2 12
1 12
42 63Edges file format (Connected Components):
1 2
2 3
3 4
5 6// Vertex/page representation
Long pageId = 1L;
Long vertexId = 1L;
// Edge representations
Tuple2<Long, Long> edge = new Tuple2<>(1L, 2L); // Simple edge
Tuple2<Long, Double> pageRank = new Tuple2<>(1L, 0.5); // Page with rank
Tuple2<Long, Long[]> adjacencyList; // Vertex with neighbors
// Specialized edge types
EnumTrianglesDataTypes.Edge edge = new EnumTrianglesDataTypes.Edge(1, 2);
EnumTrianglesDataTypes.Triad triangle = new EnumTrianglesDataTypes.Triad(1, 2, 3);