A collection of example applications demonstrating graph processing algorithms using Apache Flink's Gelly Graph API
—
Result output system supporting multiple formats for algorithm results including console display, file export, and verification utilities. All outputs implement the standardized Output interface.
Base interface that all result output handlers must implement.
/**
* Base interface for all result output handlers
* @param <T> Result data type
*/
public interface Output<T> extends Parameterized {
/** Write algorithm results to specified destination */
void write(String executionName, PrintStream out, DataSet<T> data) throws Exception;
}Base implementation providing common functionality for output handlers.
/**
* Base class for output handlers with common parameter handling
*/
public abstract class OutputBase<T> extends ParameterizedBase implements Output<T> {
// Common functionality for all output implementations
}Displays algorithm results directly to console with optional execution plan visualization.
/**
* Console output handler for displaying results to stdout
*/
public class Print extends OutputBase<T> {
/** Print Flink execution plan before results */
BooleanParameter printExecutionPlan;
}Usage Examples:
# Basic console output
--output Print
# Print with execution plan
--output Print --print_execution_planOutput Format Examples:
PageRank Results:
Vertex(1, 0.234567)
Vertex(2, 0.456789)
Vertex(3, 0.123456)
Analytics:
- Total vertices: 1000
- Total edges: 499500
- Iterations: 12
- Convergence achieved: trueWrites algorithm results to CSV files with configurable formatting.
/**
* CSV file output for writing results to comma-separated files
*/
public class CSV extends OutputBase<T> {
/** Output file path */
StringParameter outputFilename;
/** Field separator character (default: comma) */
StringParameter fieldDelimiter;
/** Include header row in output */
BooleanParameter includeHeader;
/** Number format for floating-point values */
StringParameter numberFormat;
}Usage Examples:
# Basic CSV output
--output CSV --output_filename results.csv
# CSV with custom delimiter and formatting
--output CSV --output_filename results.tsv --field_delimiter "\t" --include_headerCSV Output Format Examples:
# PageRank results
vertex_id,pagerank_score
1,0.234567
2,0.456789
3,0.123456
# Edge list results
source,target,weight
1,2,0.5
2,3,1.0
3,1,0.8
# Connected components results
vertex_id,component_id
1,1
2,1
3,2Computes cryptographic hash of results for verification and testing purposes.
/**
* Hash output for result verification and testing
* Computes deterministic hash of algorithm results
*/
public class Hash extends OutputBase<T> {
/** Hash algorithm to use (default: MD5) */
StringParameter hashAlgorithm;
/** Output hash to file instead of console */
StringParameter hashOutputFile;
}Usage Examples:
# Compute MD5 hash of results
--output Hash
# Use SHA-256 and save to file
--output Hash --hash_algorithm SHA-256 --hash_output_file results.hashHash Output Format:
Result Hash (MD5): a1b2c3d4e5f6789012345678901234567890abcd
Record Count: 1000All output handlers support common configuration options inherited from their base classes.
// Common parameters available across output types
public abstract class OutputBase<T> extends ParameterizedBase implements Output<T> {
// Inherits getName(), getUsage(), configure() from ParameterizedBase
}Output handlers are selected by name through the command-line interface or programmatically.
// Output factory for dynamic selection
ParameterizedFactory<Output> outputFactory = new ParameterizedFactory<Output>()
.addClass(Print.class)
.addClass(CSV.class)
.addClass(Hash.class);// Configure print output
Print printOutput = new Print();
printOutput.configure(ParameterTool.fromArgs(new String[]{
"--print_execution_plan"
}));
// Write results
printOutput.write("PageRank Analysis", System.out, resultDataSet);// Configure CSV output
CSV csvOutput = new CSV();
csvOutput.configure(ParameterTool.fromArgs(new String[]{
"--output_filename", "pagerank_results.csv",
"--field_delimiter", ",",
"--include_header"
}));
// Write results to file
csvOutput.write("Connected Components", System.out, componentResults);// Configure hash output for testing
Hash hashOutput = new Hash();
hashOutput.configure(ParameterTool.fromArgs(new String[]{
"--hash_algorithm", "SHA-256"
}));
// Compute and display hash
hashOutput.write("Algorithm Verification", System.out, testResults);// Get output handler from factory
ParameterizedFactory<Output> factory = createOutputFactory();
Output output = factory.get("CSV");
if (output != null) {
output.configure(parameters);
output.write(executionName, System.out, results);
}// Write results to multiple destinations
List<Output> outputs = Arrays.asList(
factory.get("Print"),
factory.get("CSV"),
factory.get("Hash")
);
for (Output output : outputs) {
output.configure(parameters);
output.write(executionName, System.out, results);
}Different algorithms produce different result types that outputs must handle:
// Common result types handled by outputs
DataSet<Vertex<K, VV>> // Vertex-centric results (PageRank, ConnectedComponents)
DataSet<Edge<K, EV>> // Edge-centric results (EdgeList)
DataSet<Tuple3<K, K, Double>> // Similarity scores (AdamicAdar, JaccardIndex)
DataSet<Tuple2<K, Double>> // Clustering coefficients
DataSet<Tuple3<K, K, K>> // Triangle listings
DataSet<String> // Graph metrics and statisticsOutput handlers provide comprehensive error handling for common failure scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-examples-2-12