A collection of example applications demonstrating graph processing algorithms using Apache Flink's Gelly Graph API
—
Bidirectional transformation system for preprocessing input graphs and postprocessing algorithm results. Transformations enable type conversions, data scaling, and format adaptations while maintaining result consistency.
Base interface for bidirectional transformations supporting both input preprocessing and result postprocessing.
/**
* Bidirectional transformation interface
* @param <II> Input data type (before transformation)
* @param <IO> Input output type (after input transformation)
* @param <RI> Result input type (before reverse transformation)
* @param <RO> Result output type (after reverse transformation)
*/
public interface Transform<II, IO, RI, RO> extends Parameterized {
/** Human-readable transform identifier */
String getIdentity();
/** Forward transformation applied to input data */
IO transformInput(II input) throws Exception;
/** Reverse transformation applied to algorithm results */
RO transformResult(RI result) throws Exception;
}Interface for components that support transformations.
/**
* Indicates that a component supports transformations
*/
public interface Transformable {
/** Get list of transforms supported by this component */
List<Transform> getTransformers();
}Transforms graph vertex and edge key types between different numeric representations with automatic bounds checking and reverse mapping support.
/**
* Transform graph key types for memory optimization and compatibility
* @param <VV> Vertex value type (unchanged)
* @param <EV> Edge value type (unchanged)
*/
public class GraphKeyTypeTransform<VV, EV> extends ParameterizedBase
implements Transform<Graph<LongValue, VV, EV>, Graph<?, VV, EV>, DataSet<?>, DataSet<LongValue>> {
/** Target key type for transformation */
ChoiceParameter type;
/** Disable reverse transformation for results */
BooleanParameter disableTypeReversal;
/** Create transform with vertex count for bounds checking */
public GraphKeyTypeTransform(long vertexCount);
}Supported Key Types:
// Available transformation target types
public enum KeyType {
BYTE, // 8-bit signed integer (-128 to 127)
SHORT, // 16-bit signed integer (-32,768 to 32,767)
INT, // 32-bit signed integer
LONG, // 64-bit signed integer (default)
FLOAT, // 32-bit floating point
DOUBLE, // 64-bit floating point
STRING // String representation
}Usage Examples:
// Transform large vertex IDs to smaller integers
GraphKeyTypeTransform<NullValue, NullValue> transform =
new GraphKeyTypeTransform<>(10000); // Max 10K vertices
transform.configure(ParameterTool.fromArgs(new String[]{
"--type", "short" // Use 16-bit integers instead of 64-bit
}));
// Apply transformation
Graph<ShortValue, NullValue, NullValue> compactGraph =
transform.transformInput(originalGraph);
// Results are automatically transformed back to original type
DataSet<LongValue> originalResults =
transform.transformResult(algorithmResults);Command-Line Usage:
# Transform to 32-bit integers for memory efficiency
--type int
# Transform to bytes for very small graphs (up to 127 vertices)
--type byte
# Transform to strings for debugging
--type string
# Disable reverse transformation (keep results in transformed type)
--type short --disable_type_reversalEnhanced LongValue implementation with correct hash code computation for use in transformations.
/**
* LongValue with proper hash code implementation
* Used internally by transformations for consistent hashing
*/
public class LongValueWithProperHashCode extends LongValue {
/** Construct from long value */
public LongValueWithProperHashCode(long value);
/** Proper hash code implementation */
public int hashCode();
/** Enhanced equals implementation */
public boolean equals(Object obj);
}Transformations are automatically applied to inputs that implement the Transformable interface.
// Example of transformable input
public class TransformableInput extends GeneratedGraph implements Transformable {
private GraphKeyTypeTransform<NullValue, NullValue> keyTransform;
@Override
public List<Transform> getTransformers() {
return Arrays.asList(keyTransform);
}
@Override
public Graph create(ExecutionEnvironment env) throws Exception {
Graph originalGraph = generateGraph(env); // Generate with LongValue keys
return keyTransform.transformInput(originalGraph); // Transform to smaller keys
}
}Results are automatically transformed back to original format after algorithm execution.
// Transformation pipeline in Runner
List<Transform> transforms = new ArrayList<>();
// Collect transforms from input and algorithm
if (input instanceof Transformable) {
transforms.addAll(((Transformable) input).getTransformers());
}
if (algorithm instanceof Transformable) {
transforms.addAll(((Transformable) algorithm).getTransformers());
}
// Apply forward transforms to input
Graph transformedGraph = input.create(env);
for (Transform transform : transforms) {
transformedGraph = transform.transformInput(transformedGraph);
}
// Run algorithm on transformed graph
DataSet result = algorithm.plan(transformedGraph);
// Apply reverse transforms to results (in reverse order)
Collections.reverse(transforms);
for (Transform transform : transforms) {
result = transform.transformResult(result);
}// Large graph with 1M vertices - use integer keys to save memory
String[] args = {
"--algorithm", "PageRank",
"--input", "CompleteGraph",
"--vertex_count", "1000000",
"--type", "int", // Use 32-bit integers instead of 64-bit
"--output", "CSV",
"--output_filename", "pagerank_results.csv"
};
Runner runner = new Runner(args);
runner.run(); // Automatically applies transformations// Transform string vertex IDs to numeric for algorithm compatibility
String[] args = {
"--algorithm", "ConnectedComponents",
"--input", "CSV",
"--input_filename", "string_vertices.csv",
"--type", "long", // Convert strings to longs
"--output", "CSV",
"--output_filename", "components.csv"
// Results will be transformed back to original string format
};Transformations automatically validate that vertex counts fit within target type ranges.
// Automatic bounds validation
GraphKeyTypeTransform transform = new GraphKeyTypeTransform(100000);
transform.configure(ParameterTool.fromArgs(new String[]{"--type", "short"}));
// Throws exception: vertex count 100000 exceeds SHORT range (-32768 to 32767)| Target Type | Range | Memory Savings | Use Case |
|---|---|---|---|
byte | -128 to 127 | 87.5% | Tiny graphs (< 128 vertices) |
short | -32,768 to 32,767 | 75% | Small graphs (< 32K vertices) |
int | -2.1B to 2.1B | 50% | Medium graphs (< 2B vertices) |
long | Full range | 0% | Default (no transformation) |
float | 32-bit float | 50% | Approximate numeric keys |
double | 64-bit float | 0% | High-precision numeric keys |
string | Variable | Variable | Debugging and compatibility |
Transformations provide comprehensive error handling for type conversion issues:
// Common transformation errors
public class TransformationException extends Exception {
// Thrown for:
// - Vertex count exceeds target type range
// - Invalid type conversion (e.g., non-numeric strings to numbers)
// - Reverse transformation failures
// - Memory allocation errors during transformation
}Error Examples:
Error: Vertex count 100000 exceeds range for type 'short' (-32768 to 32767)
Error: Cannot convert vertex key 'invalid_number' to numeric type
Error: Transformation failed: insufficient memory for key mapping tableInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-examples-2-12