A collection of example applications demonstrating graph processing algorithms using Apache Flink's Gelly Graph API
—
The execution framework provides a command-line interface and programmatic API for running graph algorithms with configurable inputs and outputs. The central Runner class orchestrates the entire execution pipeline.
Main execution coordinator that handles parameter parsing, component configuration, and job execution.
/**
* Central orchestrator for executing Flink graph algorithms
* Coordinates input sources, algorithms, and output handlers
*/
public class Runner extends ParameterizedBase {
/** Create runner from command-line arguments */
public Runner(String[] args);
/** Get the Flink execution environment (available after calling run()) */
public ExecutionEnvironment getExecutionEnvironment();
/** Get the result DataSet (available after calling run()) */
public DataSet getResult();
/** Setup and configure the job with input, algorithm, and output */
public Runner run() throws Exception;
/** Command-line entry point for algorithm execution */
public static void main(String[] args) throws Exception;
}Usage Examples:
// Command-line execution
public static void main(String[] args) throws Exception {
Runner.main(new String[]{
"--algorithm", "PageRank",
"--input", "CompleteGraph", "--vertex_count", "1000",
"--output", "Print"
});
}
// Programmatic execution
Runner runner = new Runner(new String[]{
"--algorithm", "ConnectedComponents",
"--input", "CSV", "--input_filename", "graph.csv",
"--output", "CSV", "--output_filename", "results.csv"
});
runner.run(); // Setup the job
// Access execution environment and results if needed
ExecutionEnvironment env = runner.getExecutionEnvironment();
DataSet result = runner.getResult();Global parameters that control the execution environment and job behavior.
// Framework-level parameters available on Runner
public class Runner extends ParameterizedBase {
/** Control Flink object reuse optimization */
BooleanParameter disableObjectReuse;
/** Path to write job execution details as JSON */
StringParameter jobDetailsPath;
/** Custom name for the job execution */
StringParameter jobName;
}Factory classes that manage available inputs, algorithms, and outputs for dynamic instantiation.
/**
* Factory for parameterized components with name-based lookup
* @param <T> Component type extending Parameterized
*/
public class ParameterizedFactory<T extends Parameterized> implements Iterable<T> {
/** Add a component class to the factory */
public ParameterizedFactory<T> addClass(Class<? extends T> cls);
/** Get component instance by name (case-insensitive) */
public T get(String name);
/** Iterator over all available component instances */
public Iterator<T> iterator();
}Usage Examples:
// Create custom factory for drivers
ParameterizedFactory<Driver> customDriverFactory =
new ParameterizedFactory<Driver>()
.addClass(PageRank.class)
.addClass(ConnectedComponents.class);
// Get algorithm by name
Driver pageRank = customDriverFactory.get("PageRank");
// Iterate over all available algorithms
for (Driver algorithm : customDriverFactory) {
System.out.println(algorithm.getName() + ": " + algorithm.getShortDescription());
}The framework can export detailed execution information as JSON for analysis and debugging.
/**
* Write job execution details to JSON file
* Includes runtime environment, job ID, execution time, parameters, and accumulators
*/
private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath)
throws IOException;JSON Output Format:
{
"Apache Flink": {
"version": "1.16.3",
"commit ID": "abc123",
"commit date": "2023-03-15"
},
"job_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"runtime_ms": 15420,
"parameters": {
"algorithm": "PageRank",
"input": "CompleteGraph",
"vertex_count": "1000"
},
"accumulators": {
"vertices": "1000",
"edges": "499500"
}
}The framework provides comprehensive command-line help and usage information.
Algorithm Listing:
flink run flink-gelly-examples.jar
# Output: Lists all available algorithms with descriptionsAlgorithm-Specific Usage:
flink run flink-gelly-examples.jar --algorithm PageRank
# Output: Detailed usage for PageRank including input/output optionsFull Execution:
flink run flink-gelly-examples.jar \
--algorithm PageRank \
--damping_factor 0.85 \
--iterations 10 \
--input CompleteGraph --vertex_count 1000 \
--output Print \
--job_name "PageRank Analysis"The framework provides comprehensive error handling with user-friendly messages.
// Exception types thrown by the framework
public class ProgramParametrizationException extends RuntimeException {
// Thrown for invalid parameters, missing components, or configuration errors
}Common Error Scenarios:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-examples-2-12