0
# Execution Framework
1
2
The execution framework provides a command-line interface and programmatic API for running graph algorithms with configurable inputs and outputs. The central `Runner` class orchestrates the entire execution pipeline.
3
4
## Capabilities
5
6
### Runner Class
7
8
Main execution coordinator that handles parameter parsing, component configuration, and job execution.
9
10
```java { .api }
11
/**
12
* Central orchestrator for executing Flink graph algorithms
13
* Coordinates input sources, algorithms, and output handlers
14
*/
15
public class Runner extends ParameterizedBase {
16
/** Create runner from command-line arguments */
17
public Runner(String[] args);
18
19
/** Get the Flink execution environment (available after calling run()) */
20
public ExecutionEnvironment getExecutionEnvironment();
21
22
/** Get the result DataSet (available after calling run()) */
23
public DataSet getResult();
24
25
/** Setup and configure the job with input, algorithm, and output */
26
public Runner run() throws Exception;
27
28
/** Command-line entry point for algorithm execution */
29
public static void main(String[] args) throws Exception;
30
}
31
```
32
33
**Usage Examples:**
34
35
```java
36
// Command-line execution
37
public static void main(String[] args) throws Exception {
38
Runner.main(new String[]{
39
"--algorithm", "PageRank",
40
"--input", "CompleteGraph", "--vertex_count", "1000",
41
"--output", "Print"
42
});
43
}
44
45
// Programmatic execution
46
Runner runner = new Runner(new String[]{
47
"--algorithm", "ConnectedComponents",
48
"--input", "CSV", "--input_filename", "graph.csv",
49
"--output", "CSV", "--output_filename", "results.csv"
50
});
51
52
runner.run(); // Setup the job
53
// Access execution environment and results if needed
54
ExecutionEnvironment env = runner.getExecutionEnvironment();
55
DataSet result = runner.getResult();
56
```
57
58
### Framework Parameters
59
60
Global parameters that control the execution environment and job behavior.
61
62
```java { .api }
63
// Framework-level parameters available on Runner
64
public class Runner extends ParameterizedBase {
65
/** Control Flink object reuse optimization */
66
BooleanParameter disableObjectReuse;
67
68
/** Path to write job execution details as JSON */
69
StringParameter jobDetailsPath;
70
71
/** Custom name for the job execution */
72
StringParameter jobName;
73
}
74
```
75
76
### Component Factories
77
78
Factory classes that manage available inputs, algorithms, and outputs for dynamic instantiation.
79
80
```java { .api }
81
/**
82
* Factory for parameterized components with name-based lookup
83
* @param <T> Component type extending Parameterized
84
*/
85
public class ParameterizedFactory<T extends Parameterized> implements Iterable<T> {
86
/** Add a component class to the factory */
87
public ParameterizedFactory<T> addClass(Class<? extends T> cls);
88
89
/** Get component instance by name (case-insensitive) */
90
public T get(String name);
91
92
/** Iterator over all available component instances */
93
public Iterator<T> iterator();
94
}
95
```
96
97
**Usage Examples:**
98
99
```java
100
// Create custom factory for drivers
101
ParameterizedFactory<Driver> customDriverFactory =
102
new ParameterizedFactory<Driver>()
103
.addClass(PageRank.class)
104
.addClass(ConnectedComponents.class);
105
106
// Get algorithm by name
107
Driver pageRank = customDriverFactory.get("PageRank");
108
109
// Iterate over all available algorithms
110
for (Driver algorithm : customDriverFactory) {
111
System.out.println(algorithm.getName() + ": " + algorithm.getShortDescription());
112
}
113
```
114
115
### Job Details Output
116
117
The framework can export detailed execution information as JSON for analysis and debugging.
118
119
```java { .api }
120
/**
121
* Write job execution details to JSON file
122
* Includes runtime environment, job ID, execution time, parameters, and accumulators
123
*/
124
private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath)
125
throws IOException;
126
```
127
128
**JSON Output Format:**
129
130
```json
131
{
132
"Apache Flink": {
133
"version": "1.16.3",
134
"commit ID": "abc123",
135
"commit date": "2023-03-15"
136
},
137
"job_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
138
"runtime_ms": 15420,
139
"parameters": {
140
"algorithm": "PageRank",
141
"input": "CompleteGraph",
142
"vertex_count": "1000"
143
},
144
"accumulators": {
145
"vertices": "1000",
146
"edges": "499500"
147
}
148
}
149
```
150
151
### Command-Line Interface
152
153
The framework provides comprehensive command-line help and usage information.
154
155
**Algorithm Listing:**
156
157
```bash
158
flink run flink-gelly-examples.jar
159
# Output: Lists all available algorithms with descriptions
160
```
161
162
**Algorithm-Specific Usage:**
163
164
```bash
165
flink run flink-gelly-examples.jar --algorithm PageRank
166
# Output: Detailed usage for PageRank including input/output options
167
```
168
169
**Full Execution:**
170
171
```bash
172
flink run flink-gelly-examples.jar \
173
--algorithm PageRank \
174
--damping_factor 0.85 \
175
--iterations 10 \
176
--input CompleteGraph --vertex_count 1000 \
177
--output Print \
178
--job_name "PageRank Analysis"
179
```
180
181
### Error Handling
182
183
The framework provides comprehensive error handling with user-friendly messages.
184
185
```java { .api }
186
// Exception types thrown by the framework
187
public class ProgramParametrizationException extends RuntimeException {
188
// Thrown for invalid parameters, missing components, or configuration errors
189
}
190
```
191
192
**Common Error Scenarios:**
193
194
- Missing required parameters: Clear message indicating which parameter is missing
195
- Unknown algorithm/input/output names: Lists available options
196
- Invalid parameter values: Specific validation error with acceptable ranges
197
- Unrequested parameters: Lists parameters that were provided but not used by any component