or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md
tile.json

graph-processing.mddocs/

Graph Processing

Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure. Features specialized data types, iterative processing patterns, and graph-specific operations.

Capabilities

PageRank Algorithm

Bulk iteration-based PageRank algorithm for computing page importance rankings in graphs.

/**
 * PageRank algorithm using bulk iterations.
 * Usage: PageRank --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
 */
@SuppressWarnings("serial")
public class PageRank {
    private static final double DAMPENING_FACTOR = 0.85;
    private static final double EPSILON = 0.0001;
    
    public static void main(String[] args) throws Exception;
    
    /**
     * Assigns initial rank to all pages
     */
    public static final class RankAssigner 
            implements MapFunction<Long, Tuple2<Long, Double>> {
        /**
         * Creates RankAssigner with specified initial rank
         * @param rank Initial rank value for all pages
         */
        public RankAssigner(double rank);
        
        /**
         * Maps page ID to page-rank tuple
         * @param page Page ID
         * @return Tuple (page_id, rank)
         */
        public Tuple2<Long, Double> map(Long page);
    }
    
    /**
     * Builds adjacency list for outgoing edges from vertices
     */
    @ForwardedFields("0")
    public static final class BuildOutgoingEdgeList 
            implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
        /**
         * Reduces edges to build adjacency list
         * @param values Iterator of edges from same source vertex
         * @param out Collector for adjacency list entries
         */
        public void reduce(
                Iterable<Tuple2<Long, Long>> values, 
                Collector<Tuple2<Long, Long[]>> out);
    }
    
    /**
     * Distributes vertex rank to all neighbors
     */
    public static final class JoinVertexWithEdgesMatch 
            implements FlatMapFunction<
                    Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
                    Tuple2<Long, Double>> {
        /**
         * Distributes rank evenly among neighboring vertices
         * @param value Joined vertex-rank and adjacency list
         * @param out Collector for distributed rank values
         */
        public void flatMap(
                Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value,
                Collector<Tuple2<Long, Double>> out);
    }
    
    /**
     * Applies PageRank dampening formula
     */
    @ForwardedFields("0")
    public static final class Dampener 
            implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
        /**
         * Creates dampener with specified parameters
         * @param dampening Dampening factor (typically 0.85)
         * @param numVertices Total number of vertices for random jump calculation
         */
        public Dampener(double dampening, double numVertices);
        
        /**
         * Applies dampening formula to rank value
         * @param value Tuple (vertex_id, accumulated_rank)
         * @return Tuple (vertex_id, dampened_rank)
         */
        public Tuple2<Long, Double> map(Tuple2<Long, Double> value);
    }
    
    /**
     * Filters vertices where rank difference is below threshold
     */
    public static final class EpsilonFilter 
            implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
        /**
         * Checks if rank difference exceeds convergence threshold
         * @param value Tuple of (old_rank, new_rank) tuples
         * @return true if difference exceeds epsilon, false otherwise
         */
        public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value);
    }
}

Usage Examples:

// Run PageRank with custom data  
String[] args = {
    "--pages", "/path/to/pages.txt",
    "--links", "/path/to/links.txt",
    "--output", "/path/to/output",
    "--numPages", "1000",
    "--iterations", "20"
};
PageRank.main(args);

// Use PageRank functions in custom graph algorithm
DataSet<Long> pages = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> links = getLinksDataSet(env, params);

// Build PageRank pipeline
DataSet<Tuple2<Long, Double>> initialRanks = pages
    .map(new PageRank.RankAssigner(1.0 / numPages));

DataSet<Tuple2<Long, Long[]>> adjacencyList = links
    .groupBy(0)
    .reduceGroup(new PageRank.BuildOutgoingEdgeList());

Connected Components

Algorithm for finding connected components in undirected graphs using bulk iterations.

/**
 * Connected Components algorithm using bulk iterations.
 * Usage: ConnectedComponents --vertices <path> --edges <path> --output <path>
 */
public class ConnectedComponents {
    public static void main(String[] args) throws Exception;
    
    /**
     * Duplicates input value into a tuple pair
     */
    public static final class DuplicateValue<T> 
            implements MapFunction<T, Tuple2<T, T>> {
        /**
         * Creates tuple (value, value) from input
         * @param vertex Input value to duplicate
         * @return Tuple containing value twice
         */
        public Tuple2<T, T> map(T vertex);
    }
    
    /**
     * Creates undirected edges from directed edges
     */
    public static final class UndirectEdge 
            implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        /**
         * Emits both (a,b) and (b,a) for input edge (a,b)
         * @param edge Directed edge tuple
         * @param out Collector for undirected edge pairs
         */
        public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out);
    }
    
    /**
     * Joins neighbor vertices with component IDs
     */
    public static final class NeighborWithComponentIDJoin 
            implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        /**
         * Joins edge with component assignment
         * @param vertexWithComponent Vertex with its component ID  
         * @param edge Edge tuple
         * @return Neighbor vertex with component ID
         */
        public Tuple2<Long, Long> join(
                Tuple2<Long, Long> vertexWithComponent, 
                Tuple2<Long, Long> edge);
    }
    
    /**
     * Filters and propagates minimum component IDs
     */
    public static final class ComponentIdFilter 
            implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
        /**
         * Emits vertex with minimum component ID if propagation is needed
         * @param candidateComponentForVertex Candidate component for vertex
         * @param currentComponentForVertex Current component for vertex  
         * @param out Collector for component updates
         */
        public void join(
                Tuple2<Long, Long> candidateComponentForVertex,
                Tuple2<Long, Long> currentComponentForVertex, 
                Collector<Tuple2<Long, Long>> out);
    }
}

Triangle Enumeration

Algorithm for enumerating triangles in graphs with specialized edge data types.

/**
 * Triangle enumeration algorithm.
 * Usage: EnumTriangles --edges <path> --output <path>
 */
public class EnumTriangles {
    public static void main(String[] args) throws Exception;
    
    /**
     * Converts tuple edges to Edge objects
     */
    public static class TupleEdgeConverter 
            implements MapFunction<Tuple2<Integer, Integer>, Edge> {
        /**
         * Converts tuple to Edge object
         * @param tuple Input edge as tuple
         * @return Edge object
         */
        public Edge map(Tuple2<Integer, Integer> tuple);
    }
}

Transitive Closure

Naive transitive closure algorithm for computing graph reachability.

/**
 * Naive transitive closure algorithm using bulk iterations.
 * Usage: TransitiveClosureNaive --edges <path> --output <path>
 */
public class TransitiveClosureNaive {
    public static void main(String[] args) throws Exception;
}

Graph Data Types

Specialized data types for graph processing operations.

/**
 * Graph edge representation extending Tuple2
 */
public static class Edge extends Tuple2<Integer, Integer> {
    public static final int V1 = 0;  // First vertex field index
    public static final int V2 = 1;  // Second vertex field index
    
    public Edge();
    public Edge(Integer v1, Integer v2);
    
    /**
     * Get first vertex ID
     * @return First vertex ID
     */
    public Integer getFirstVertex();
    
    /**
     * Get second vertex ID  
     * @return Second vertex ID
     */
    public Integer getSecondVertex();
    
    /**
     * Set first vertex ID
     * @param vertex1 First vertex ID
     */
    public void setFirstVertex(Integer vertex1);
    
    /**
     * Set second vertex ID
     * @param vertex2 Second vertex ID  
     */
    public void setSecondVertex(Integer vertex2);
    
    /**
     * Copy vertices from another edge
     * @param edge Source edge
     */
    public void copyVerticesFromEdge(Edge edge);
    
    /**
     * Swap vertex positions in edge
     */
    public void flipVertices();
}

/**
 * Three-vertex structure for triangle representation
 */
public static class Triad extends Tuple3<Integer, Integer, Integer> {
    public static final int V1 = 0;  // First vertex field index
    public static final int V2 = 1;  // Second vertex field index  
    public static final int V3 = 2;  // Third vertex field index
    
    public Triad();
    public Triad(Integer v1, Integer v2, Integer v3);
    
    public void setFirstVertex(Integer vertex1);
    public void setSecondVertex(Integer vertex2);
    public void setThirdVertex(Integer vertex3);
}

/**
 * Edge with vertex degree information
 */
public static class EdgeWithDegrees extends Tuple4<Integer, Integer, Integer, Integer> {
    public static final int V1 = 0;  // First vertex field index
    public static final int V2 = 1;  // Second vertex field index
    public static final int D1 = 2;  // First vertex degree field index
    public static final int D2 = 3;  // Second vertex degree field index
    
    public EdgeWithDegrees();
    public EdgeWithDegrees(Integer v1, Integer v2, Integer d1, Integer d2);
    
    public Integer getFirstVertex();
    public Integer getSecondVertex();
    public Integer getFirstDegree();
    public Integer getSecondDegree();
    
    public void setFirstVertex(Integer vertex1);
    public void setSecondVertex(Integer vertex2);
    public void setFirstDegree(Integer degree1);
    public void setSecondDegree(Integer degree2);
}

Graph Data Providers

Utility classes providing default graph datasets for testing.

/**
 * Provides default PageRank data sets
 */
public class PageRankData {
    /**
     * Default edge data as object arrays
     */
    public static final Object[][] EDGES;
    
    /**
     * Creates DataSet with default edge data
     * @param env Execution environment
     * @return DataSet containing default edges
     */
    public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
    
    /**
     * Creates DataSet with default page data
     * @param env Execution environment
     * @return DataSet containing default pages
     */
    public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env);
    
    /**
     * Get total number of pages in default dataset
     * @return Number of pages
     */
    public static int getNumberOfPages();
}

/**
 * Provides default Connected Components data sets
 */
public class ConnectedComponentsData {
    /**
     * Default vertex IDs
     */
    public static final long[] VERTICES;
    
    /**
     * Default edge data as object arrays
     */
    public static final Object[][] EDGES;
    
    /**
     * Creates DataSet with default vertex data
     * @param env Execution environment
     * @return DataSet containing default vertices
     */
    public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env);
    
    /**
     * Creates DataSet with default edge data
     * @param env Execution environment
     * @return DataSet containing default edges
     */
    public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}

/**
 * Provides default Triangle Enumeration data sets
 */
public class EnumTrianglesData {
    /**
     * Default edge data as object arrays
     */
    public static final Object[][] EDGES;
    
    /**
     * Creates DataSet with default edge data
     * @param env Execution environment
     * @return DataSet containing default edges as Edge objects
     */
    public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env);
}

Usage Examples:

// Use default datasets in custom graph algorithms
import org.apache.flink.examples.java.graph.util.PageRankData;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// PageRank data
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);
int numPages = PageRankData.getNumberOfPages();

// Connected Components data  
DataSet<Long> vertices = ConnectedComponentsData.getDefaultVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);

Common Graph Processing Patterns

Bulk Iteration for Graph Algorithms

Most graph algorithms use Flink's bulk iteration pattern:

// PageRank iteration pattern
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
    .join(adjacencyList).where(0).equalTo(0)
    .flatMap(new JoinVertexWithEdgesMatch())
    .groupBy(0).aggregate(SUM, 1)
    .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalRanks = iteration.closeWith(
    newRanks,
    newRanks.join(iteration).where(0).equalTo(0)
        .filter(new EpsilonFilter())  // Convergence condition
);

Graph Data Format Requirements

Pages file format (PageRank):

1
2
12
42
63

Links file format (PageRank):

1 2
2 12
1 12
42 63

Edges file format (Connected Components):

1 2
2 3
3 4
5 6

Types

Core Graph Types

// Vertex/page representation
Long pageId = 1L;
Long vertexId = 1L;

// Edge representations
Tuple2<Long, Long> edge = new Tuple2<>(1L, 2L);           // Simple edge
Tuple2<Long, Double> pageRank = new Tuple2<>(1L, 0.5);   // Page with rank
Tuple2<Long, Long[]> adjacencyList;                       // Vertex with neighbors

// Specialized edge types
EnumTrianglesDataTypes.Edge edge = new EnumTrianglesDataTypes.Edge(1, 2);
EnumTrianglesDataTypes.Triad triangle = new EnumTrianglesDataTypes.Triad(1, 2, 3);