or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

algorithm-drivers.mdindex.mdinput-sources.mdlegacy-examples.mdparameter-system.mdrunner-framework.md
tile.json

legacy-examples.mddocs/

Legacy Examples

The legacy examples package contains standalone implementations of graph algorithms demonstrating different Flink Gelly programming models. These examples serve as educational material and reference implementations, showing scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches to graph processing.

Capabilities

Standalone Example Programs

Single Source Shortest Paths (Scatter-Gather)

Computes shortest paths from a single source vertex using the scatter-gather programming model.

/**
 * Single Source Shortest Paths using scatter-gather iteration
 * Demonstrates message passing between vertices for distance computation
 */
public class SingleSourceShortestPaths implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the SSSP algorithm and usage
   */
  public String getDescription();
}

Command-line Usage:

flink run flink-gelly-examples_2.10-1.3.3.jar \
  org.apache.flink.graph.examples.SingleSourceShortestPaths \
  <source_vertex_id> <input_edges_path> <output_path> <num_iterations>

Usage Example:

flink run flink-gelly-examples_2.10-1.3.3.jar \
  org.apache.flink.graph.examples.SingleSourceShortestPaths \
  1 edges.csv shortest_paths.csv 10

Single Source Shortest Paths (GSA)

Computes shortest paths using the gather-sum-apply programming model.

/**
 * Single Source Shortest Paths using gather-sum-apply iteration
 * Demonstrates GSA programming model with gather, sum, and apply phases
 */
public class GSASingleSourceShortestPaths implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the GSA SSSP algorithm and usage
   */
  public String getDescription();
}

Single Source Shortest Paths (Pregel)

Computes shortest paths using the vertex-centric (Pregel) programming model.

/**
 * Single Source Shortest Paths using vertex-centric (Pregel) computation
 * Demonstrates Pregel programming model with compute functions and message combiners
 */
public class PregelSSSP implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the Pregel SSSP algorithm and usage  
   */
  public String getDescription();
}

Inner Classes:

/**
 * Vertex compute function for Pregel SSSP implementation
 * Processes incoming messages and updates vertex state
 */
public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
  public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) throws Exception;
}

/**
 * Message combiner for Pregel SSSP to reduce message overhead
 * Combines multiple messages to the same vertex
 */
public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
  public void combineMessages(MessageIterator<Double> messages) throws Exception;
}

Music Profiles Analysis

Complex example demonstrating mixed DataSet and Gelly API usage for user-song bipartite graph analysis.

/**
 * Music Profiles analysis using bipartite graphs and community detection
 * Demonstrates integration of DataSet operations with graph processing
 */
public class MusicProfiles implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Complex parameter list for music analysis pipeline
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the music profiles analysis pipeline
   */
  public String getDescription();
}

Command-line Parameters:

<input_user_song_triplets_path> <input_song_mismatches_path> <output_top_tracks_path> 
<playcount_threshold> <output_communities_path> <num_iterations>

Public Transformation Classes:

/**
 * Extract mismatch song IDs from mismatch data
 * Maps mismatch records to song ID strings
 */
public static final class ExtractMismatchSongIds implements MapFunction<String, String> {
  public String map(String value) throws Exception;
}

/**
 * Filter out songs that appear in the mismatch list
 * Removes problematic songs from the analysis dataset
 */
public static final class FilterOutMismatches implements FilterFunction<Tuple3<String, String, Integer>> {
  public boolean filter(Tuple3<String, String, Integer> value) throws Exception;
}

/**
 * Filter song vertices from user-song bipartite graph
 * Identifies vertices representing songs vs users
 */
public static final class FilterSongNodes implements FilterFunction<Vertex<String, Double>> {
  public boolean filter(Vertex<String, Double> vertex) throws Exception;
}

/**
 * Get top song per user based on play counts
 * Finds the most played song for each user
 */
public static final class GetTopSongPerUser implements GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>> {
  public void reduce(Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple2<String, String>> out) throws Exception;
}

/**
 * Create edges between users who share similar music preferences
 * Builds user similarity graph based on common top songs
 */
public static final class CreateSimilarUserEdges implements GroupReduceFunction<Tuple2<String, String>, Edge<String, NullValue>> {
  public void reduce(Iterable<Tuple2<String, String>> values, Collector<Edge<String, NullValue>> out) throws Exception;
}

Euclidean Graph Weighting

Demonstrates graph transformations and geometric computations with triplet operations.

/**
 * Euclidean Graph Weighting example showing graph transformations
 * Demonstrates triplet operations and custom distance computations
 */
public class EuclideanGraphWeighing implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Command-line arguments: <input vertices path> <input edges path> <output path>
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the Euclidean weighting transformation
   */
  public String getDescription();
}

Public Utility Classes:

/**
 * 2D Point class for geometric computations
 * Represents vertex positions in Euclidean space
 */
public static class Point implements Serializable {
  public double x;
  public double y;
  
  public Point(double x, double y);
  
  /**
   * Calculate Euclidean distance between two points
   * @param other Other point to calculate distance to
   * @return Euclidean distance as double
   */
  public double euclideanDistance(Point other);
}

Incremental SSSP

Advanced example showing incremental shortest path updates when edges are removed from the graph.

/**
 * Incremental Single Source Shortest Paths with edge removal
 * Demonstrates dynamic graph updates and incremental computation
 */
public class IncrementalSSSP implements ProgramDescription {
  /**
   * Main entry point for command-line execution
   * @param args Complex parameter list for incremental SSSP
   * @throws Exception if execution fails
   */
  public static void main(String[] args) throws Exception;
  
  /**
   * Program description for help and documentation
   * @return Description of the incremental SSSP algorithm
   */
  public String getDescription();
  
  /**
   * Check if an edge is part of the shortest path tree
   * @param edgeToBeRemoved Edge to check for SSSP membership
   * @param edgesInSSSP DataSet of edges currently in the SSSP tree
   * @return True if edge is in SSSP, false otherwise
   * @throws Exception if check fails
   */
  public static boolean isInSSSP(Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception;
}

Inner Algorithm Classes:

/**
 * Message passing function for invalidating affected paths
 * Sends invalidation messages when edges are removed
 */
public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
  public void sendMessages(Vertex<Long, Double> vertex) throws Exception;
}

/**
 * Vertex update function for recalculating distances
 * Updates vertex distances based on invalidation messages
 */
public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
  public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}

Reusable Algorithm Classes

PageRank Algorithm Implementation

Generic PageRank algorithm implementation for programmatic usage.

/**
 * Generic PageRank algorithm implementation using scatter-gather iteration
 * Can be used as a reusable component in larger applications
 */
public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
  /**
   * Create PageRank algorithm with parameters
   * @param beta Damping factor for random walk (typically 0.85)
   * @param maxIterations Maximum number of iterations
   */
  public PageRank(double beta, int maxIterations);
  
  /**
   * Execute PageRank on the input graph
   * @param network Input graph with Double vertex and edge values
   * @return DataSet of vertices with PageRank scores
   * @throws Exception if algorithm execution fails
   */
  public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}

Inner Algorithm Classes:

/**
 * Message passing function for PageRank score distribution
 * Sends rank messages along outgoing edges
 */
public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
  public void sendMessages(Vertex<K, Double> vertex) throws Exception;
}

/**
 * Vertex update function for PageRank score computation
 * Updates vertex ranks based on incoming messages
 */
public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
  public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}

/**
 * Initialization function for vertex weights
 * Sets initial PageRank values for all vertices
 */
public static final class InitWeights implements MapFunction<Vertex<K, NullValue>, Double> {
  public Double map(Vertex<K, NullValue> value) throws Exception;
}

GSA PageRank Implementation

PageRank implementation using the gather-sum-apply programming model.

/**
 * PageRank algorithm using gather-sum-apply iteration
 * Alternative implementation demonstrating GSA programming model
 */
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
  /**
   * Create GSA PageRank algorithm with parameters
   * @param beta Damping factor for random walk
   * @param maxIterations Maximum number of iterations
   */
  public GSAPageRank(double beta, int maxIterations);
  
  /**
   * Execute GSA PageRank on the input graph
   * @param network Input graph with Double vertex and edge values
   * @return DataSet of vertices with PageRank scores
   * @throws Exception if algorithm execution fails
   */
  public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}

Inner GSA Classes:

/**
 * Gather function for collecting neighbor ranks
 * Gathers PageRank values from neighboring vertices
 */
public static final class GatherRanks implements GatherFunction<Double, Double, Double> {
  public Double gather(Neighbor<Double, Double> neighbor) throws Exception;
}

/**
 * Sum function for aggregating gathered ranks
 * Sums all gathered rank values for each vertex
 */
public static final class SumRanks implements SumFunction<Double, Double, Double> {
  public Double sum(Double newValue, Double currentValue) throws Exception;
}

/**
 * Apply function for updating vertex ranks
 * Applies PageRank formula with damping factor
 */
public static final class UpdateRanks<K> implements ApplyFunction<K, Double, Double> {
  public void apply(Double summedRanks, Vertex<K, Double> vertex) throws Exception;
}

Data Generators

The examples package includes comprehensive data generators for testing and demonstration:

/**
 * PageRank test data generator
 * Provides sample graphs and expected results for PageRank algorithm
 */
public class PageRankData {
  /**
   * Get default edge dataset for PageRank testing
   * @param env Flink ExecutionEnvironment
   * @return DataSet of sample edges
   */
  public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
  
  // Public constants for test data
  public static final String EDGES = "...";
  public static final String RANKS_AFTER_3_ITERATIONS = "...";
}

/**
 * Single Source Shortest Paths test data generator
 * Provides sample graphs and expected SSSP results
 */
public class SingleSourceShortestPathsData {
  /**
   * Get default edge dataset for SSSP testing
   * @param env Flink ExecutionEnvironment
   * @return DataSet of sample edges with weights
   */
  public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
  
  // Public constants
  public static final Long SRC_VERTEX_ID = 1L;
  public static final String EDGES = "...";
  public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "...";
}

/**
 * Music profiles test data generator
 * Provides sample user-song interaction data
 */
public class MusicProfilesData {
  /**
   * Get sample user-song triplet data
   * @param env Flink ExecutionEnvironment
   * @return DataSet of user-song-playcount triplets
   */
  public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env);
  
  /**
   * Get sample mismatch data for filtering
   * @param env Flink ExecutionEnvironment
   * @return DataSet of problematic song IDs
   */
  public static DataSet<String> getMismatches(ExecutionEnvironment env);
}

/**
 * Euclidean graph test data generator
 * Provides sample geometric graphs with point coordinates
 */
public class EuclideanGraphData {
  /**
   * Get default vertex dataset with 2D coordinates
   * @param env Flink ExecutionEnvironment
   * @return DataSet of vertices with Point values
   */
  public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
  
  /**
   * Get corresponding edge dataset
   * @param env Flink ExecutionEnvironment  
   * @return DataSet of edges with distance weights
   */
  public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}

Scala Examples

The package also includes Scala implementations demonstrating the Scala Graph API:

/**
 * Connected Components implementation in Scala
 * Demonstrates Scala Graph API usage with GSA
 */
object ConnectedComponents {
  /**
   * Main entry point for Scala Connected Components
   * @param args Command-line arguments: <edge path> <output path> <num iterations>
   */
  def main(args: Array[String]): Unit
}

/**
 * Single Source Shortest Paths in Scala using scatter-gather
 * Shows Scala functional programming patterns with Gelly
 */
object SingleSourceShortestPaths {
  /**
   * Main entry point for Scala SSSP
   * @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
   */
  def main(args: Array[String]): Unit
}

/**
 * GSA Single Source Shortest Paths in Scala
 * Demonstrates GSA programming model in Scala
 */
object GSASingleSourceShortestPaths {
  /**
   * Main entry point for Scala GSA SSSP
   * @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
   */
  def main(args: Array[String]): Unit
}

Types

// Graph algorithm interfaces
interface GraphAlgorithm<K, VV, EV, T> extends Serializable {
  DataSet<T> run(Graph<K, VV, EV> input) throws Exception;
}

interface ProgramDescription {
  String getDescription();
}

// Gelly programming model interfaces
abstract class ComputeFunction<K, VV, EV, M> {
  public abstract void compute(Vertex<K, VV> vertex, MessageIterator<M> messages) throws Exception;
}

abstract class MessagingFunction<K, VV, EV, M> {
  public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
}

abstract class VertexUpdateFunction<K, VV, M> {
  public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
}

// GSA interfaces
interface GatherFunction<VV, EV, M> {
  M gather(Neighbor<VV, EV> neighbor) throws Exception;
}

interface SumFunction<VV, EV, M> {
  M sum(M newValue, M currentValue) throws Exception;
}

interface ApplyFunction<K, VV, M> {   
  void apply(M newValue, Vertex<K, VV> vertex) throws Exception;
}

// Flink DataSet API types
class DataSet<T> {
  // Distributed dataset operations
}

class Tuple2<T0, T1> {
  public T0 f0;
  public T1 f1;
}

class Tuple3<T0, T1, T2> {
  public T0 f0;
  public T1 f1; 
  public T2 f2;
}