Apache Flink batch processing examples demonstrating various algorithms and use cases including WordCount, PageRank, KMeans clustering, Connected Components, and graph processing
Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure. Features specialized data types, iterative processing patterns, and graph-specific operations.
Bulk iteration-based PageRank algorithm for computing page importance rankings in graphs.
/**
* PageRank algorithm using bulk iterations.
* Usage: PageRank --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
*/
@SuppressWarnings("serial")
public class PageRank {
private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;
public static void main(String[] args) throws Exception;
/**
* Assigns initial rank to all pages
*/
public static final class RankAssigner
implements MapFunction<Long, Tuple2<Long, Double>> {
/**
* Creates RankAssigner with specified initial rank
* @param rank Initial rank value for all pages
*/
public RankAssigner(double rank);
/**
* Maps page ID to page-rank tuple
* @param page Page ID
* @return Tuple (page_id, rank)
*/
public Tuple2<Long, Double> map(Long page);
}
/**
* Builds adjacency list for outgoing edges from vertices
*/
@ForwardedFields("0")
public static final class BuildOutgoingEdgeList
implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
/**
* Reduces edges to build adjacency list
* @param values Iterator of edges from same source vertex
* @param out Collector for adjacency list entries
*/
public void reduce(
Iterable<Tuple2<Long, Long>> values,
Collector<Tuple2<Long, Long[]>> out);
}
/**
* Distributes vertex rank to all neighbors
*/
public static final class JoinVertexWithEdgesMatch
implements FlatMapFunction<
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>,
Tuple2<Long, Double>> {
/**
* Distributes rank evenly among neighboring vertices
* @param value Joined vertex-rank and adjacency list
* @param out Collector for distributed rank values
*/
public void flatMap(
Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,
Collector<Tuple2<Long, Double>> out);
}
/**
* Applies PageRank dampening formula
*/
@ForwardedFields("0")
public static final class Dampener
implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
/**
* Creates dampener with specified parameters
* @param dampening Dampening factor (typically 0.85)
* @param numVertices Total number of vertices for random jump calculation
*/
public Dampener(double dampening, double numVertices);
/**
* Applies dampening formula to rank value
* @param value Tuple (vertex_id, accumulated_rank)
* @return Tuple (vertex_id, dampened_rank)
*/
public Tuple2<Long, Double> map(Tuple2<Long, Double> value);
}
/**
* Filters vertices where rank difference is below threshold
*/
public static final class EpsilonFilter
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
/**
* Checks if rank difference exceeds convergence threshold
* @param value Tuple of (old_rank, new_rank) tuples
* @return true if difference exceeds epsilon, false otherwise
*/
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value);
}
}Usage Examples:
// Run PageRank with custom data
String[] args = {
"--pages", "/path/to/pages.txt",
"--links", "/path/to/links.txt",
"--output", "/path/to/output",
"--numPages", "1000",
"--iterations", "20"
};
PageRank.main(args);
// Use PageRank functions in custom graph algorithm
DataSet<Long> pages = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> links = getLinksDataSet(env, params);
// Build PageRank pipeline
DataSet<Tuple2<Long, Double>> initialRanks = pages
.map(new PageRank.RankAssigner(1.0 / numPages));
DataSet<Tuple2<Long, Long[]>> adjacencyList = links
.groupBy(0)
.reduceGroup(new PageRank.BuildOutgoingEdgeList());Algorithm for finding connected components in undirected graphs using bulk iterations.
/**
* Connected Components algorithm using bulk iterations.
* Usage: ConnectedComponents --vertices <path> --edges <path> --output <path>
*/
public class ConnectedComponents {
public static void main(String[] args) throws Exception;
/**
* Duplicates input value into a tuple pair
*/
public static final class DuplicateValue<T>
implements MapFunction<T, Tuple2<T, T>> {
/**
* Creates tuple (value, value) from input
* @param vertex Input value to duplicate
* @return Tuple containing value twice
*/
public Tuple2<T, T> map(T vertex);
}
/**
* Creates undirected edges from directed edges
*/
public static final class UndirectEdge
implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Emits both (a,b) and (b,a) for input edge (a,b)
* @param edge Directed edge tuple
* @param out Collector for undirected edge pairs
*/
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out);
}
/**
* Joins neighbor vertices with component IDs
*/
public static final class NeighborWithComponentIDJoin
implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Joins edge with component assignment
* @param vertexWithComponent Vertex with its component ID
* @param edge Edge tuple
* @return Neighbor vertex with component ID
*/
public Tuple2<Long, Long> join(
Tuple2<Long, Long> vertexWithComponent,
Tuple2<Long, Long> edge);
}
/**
* Filters and propagates minimum component IDs
*/
public static final class ComponentIdFilter
implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* Emits vertex with minimum component ID if propagation is needed
* @param candidateComponentForVertex Candidate component for vertex
* @param currentComponentForVertex Current component for vertex
* @param out Collector for component updates
*/
public void join(
Tuple2<Long, Long> candidateComponentForVertex,
Tuple2<Long, Long> currentComponentForVertex,
Collector<Tuple2<Long, Long>> out);
}
}Algorithm for enumerating triangles in graphs with specialized edge data types.
/**
* Triangle enumeration algorithm.
* Usage: EnumTriangles --edges <path> --output <path>
*/
public class EnumTriangles {
public static void main(String[] args) throws Exception;
/**
* Converts tuple edges to Edge objects
*/
public static class TupleEdgeConverter
implements MapFunction<Tuple2<Integer, Integer>, Edge> {
/**
* Converts tuple to Edge object
* @param tuple Input edge as tuple
* @return Edge object
*/
public Edge map(Tuple2<Integer, Integer> tuple);
}
}Naive transitive closure algorithm for computing graph reachability.
/**
* Naive transitive closure algorithm using bulk iterations.
* Usage: TransitiveClosureNaive --edges <path> --output <path>
*/
public class TransitiveClosureNaive {
public static void main(String[] args) throws Exception;
}Specialized data types for graph processing operations.
/**
* Graph edge representation extending Tuple2
*/
public static class Edge extends Tuple2<Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public Edge();
public Edge(Integer v1, Integer v2);
/**
* Get first vertex ID
* @return First vertex ID
*/
public Integer getFirstVertex();
/**
* Get second vertex ID
* @return Second vertex ID
*/
public Integer getSecondVertex();
/**
* Set first vertex ID
* @param vertex1 First vertex ID
*/
public void setFirstVertex(Integer vertex1);
/**
* Set second vertex ID
* @param vertex2 Second vertex ID
*/
public void setSecondVertex(Integer vertex2);
/**
* Copy vertices from another edge
* @param edge Source edge
*/
public void copyVerticesFromEdge(Edge edge);
/**
* Swap vertex positions in edge
*/
public void flipVertices();
}
/**
* Three-vertex structure for triangle representation
*/
public static class Triad extends Tuple3<Integer, Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public static final int V3 = 2; // Third vertex field index
public Triad();
public Triad(Integer v1, Integer v2, Integer v3);
public void setFirstVertex(Integer vertex1);
public void setSecondVertex(Integer vertex2);
public void setThirdVertex(Integer vertex3);
}
/**
* Edge with vertex degree information
*/
public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
public static final int V1 = 0; // First vertex field index
public static final int V2 = 1; // Second vertex field index
public static final int D1 = 2; // First vertex degree field index
public static final int D2 = 3; // Second vertex degree field index
public EdgeWithDegrees();
public EdgeWithDegrees(Integer v1, Integer v2, Integer d1, Integer d2);
public Integer getFirstVertex();
public Integer getSecondVertex();
public Integer getFirstDegree();
public Integer getSecondDegree();
public void setFirstVertex(Integer vertex1);
public void setSecondVertex(Integer vertex2);
public void setFirstDegree(Integer degree1);
public void setSecondDegree(Integer degree2);
}Utility classes providing default graph datasets for testing.
/**
* Provides default PageRank data sets
*/
public class PageRankData {
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges
*/
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default page data
* @param env Execution environment
* @return DataSet containing default pages
*/
public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env);
/**
* Get total number of pages in default dataset
* @return Number of pages
*/
public static int getNumberOfPages();
}
/**
* Provides default Connected Components data sets
*/
public class ConnectedComponentsData {
/**
* Default vertex IDs
*/
public static final long[] VERTICES;
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default vertex data
* @param env Execution environment
* @return DataSet containing default vertices
*/
public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges
*/
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}
/**
* Provides default Triangle Enumeration data sets
*/
public class EnumTrianglesData {
/**
* Default edge data as object arrays
*/
public static final Object[][] EDGES;
/**
* Creates DataSet with default edge data
* @param env Execution environment
* @return DataSet containing default edges as Edge objects
*/
public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env);
}Usage Examples:
// Use default datasets in custom graph algorithms
import org.apache.flink.examples.java.graph.util.PageRankData;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// PageRank data
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);
int numPages = PageRankData.getNumberOfPages();
// Connected Components data
DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);Most graph algorithms use Flink's bulk iteration pattern:
// PageRank iteration pattern
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
.join(adjacencyList).where(0).equalTo(0)
.flatMap(new JoinVertexWithEdgesMatch())
.groupBy(0).aggregate(SUM, 1)
.map(new Dampener(DAMPENING_FACTOR, numPages));
DataSet<Tuple2<Long, Double>> finalRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
.filter(new EpsilonFilter()) // Convergence condition
);Pages file format (PageRank):
1
2
12
42
63Links file format (PageRank):
1 2
2 12
1 12
42 63Edges file format (Connected Components):
1 2
2 3
3 4
5 6// Vertex/page representation
Long pageId = 1L;
Long vertexId = 1L;
// Edge representations
Tuple2<Long, Long> edge = new Tuple2<>(1L, 2L); // Simple edge
Tuple2<Long, Double> pageRank = new Tuple2<>(1L, 0.5); // Page with rank
Tuple2<Long, Long[]> adjacencyList; // Vertex with neighbors
// Specialized edge types
EnumTrianglesDataTypes.Edge edge = new EnumTrianglesDataTypes.Edge(1, 2);
EnumTrianglesDataTypes.Triad triangle = new EnumTrianglesDataTypes.Triad(1, 2, 3);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-batch-2-11