Apache Flink batch processing examples demonstrating various algorithms and use cases including WordCount, PageRank, KMeans clustering, Connected Components, and graph processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-examples-batch_2-11@1.14.0Apache Flink Batch Examples is a comprehensive collection of batch processing examples demonstrating various algorithms and use cases. It provides executable JAR files and reusable components for WordCount, PageRank, KMeans clustering, Connected Components, graph processing, relational operations, and distributed file operations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-batch_2.11</artifactId>
<version>1.14.6</version>
</dependency>// Main example classes
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.distcp.DistCp;
// Flink API imports
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.utils.ParameterTool;// Run WordCount example programmatically
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.wordcount.WordCount;
public class ExampleUsage {
public static void main(String[] args) throws Exception {
// Basic execution pattern for any example
String[] exampleArgs = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
WordCount.main(exampleArgs);
}
}The Apache Flink Batch Examples library is organized around several key architectural patterns:
Text processing examples including classic WordCount and POJO-based variants. Features tokenization, aggregation, and result output.
public class WordCount {
public static void main(String[] args) throws Exception;
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
}
}Machine learning examples with K-Means clustering implementation for 2D data points, including iterative algorithm patterns and custom data types.
public class KMeans {
public static void main(String[] args) throws Exception;
public static class Point implements Serializable {
public double x, y;
public Point(double x, double y);
public double euclideanDistance(Point other);
public Point add(Point other);
public Point div(long val);
}
public static class Centroid extends Point {
public int id;
public Centroid(int id, double x, double y);
}
}Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure with specialized data types and iterative processing patterns.
public class PageRank {
public static void main(String[] args) throws Exception;
public static final class RankAssigner
implements MapFunction<Long, Tuple2<Long, Double>>;
public static final class Dampener
implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>>;
}
public class ConnectedComponents {
public static void main(String[] args) throws Exception;
}SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples for custom metrics collection.
public class WebLogAnalysis {
public static void main(String[] args) throws Exception;
public static class FilterDocByKeyWords
implements FilterFunction<Tuple2<String, String>>;
public static class FilterByRank
implements FilterFunction<Tuple3<Integer, String, Integer>>;
}
public class TPCHQuery3 {
public static void main(String[] args) throws Exception;
public static class Lineitem extends Tuple4<Long, Double, Double, String>;
public static class Customer extends Tuple2<Long, String>;
public static class Order extends Tuple4<Long, Long, String, Long>;
}Distributed file copying utility similar to Hadoop DistCp, with custom input formats and parallel file processing capabilities.
public class DistCp {
public static void main(String[] args) throws Exception;
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
}
public class FileCopyTask {
public FileCopyTask(Path path, String relativePath);
public Path getPath();
public String getRelativePath();
}Additional examples including Pi estimation using Monte Carlo method, collection-based execution patterns, and POJO usage demonstrations.
public class PiEstimation {
public static void main(String[] args) throws Exception;
public static class Sampler implements MapFunction<Long, Long>;
}
public class CollectionExecutionExample {
public static void main(String[] args) throws Exception;
public static class User {
public int userIdentifier;
public String name;
}
}All examples follow a consistent pattern for setting up the Flink execution environment:
// Standard execution environment setup
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Parameter handling
final ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
// Data source creation
DataSet<String> text = env.readTextFile(params.get("input"));
// Execute job
env.execute("Job Name");Consistent parameter handling across all examples:
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.java.utils.MultipleParameterTool;
// Single parameter tool
ParameterTool params = ParameterTool.fromArgs(args);
String inputPath = params.get("input");
int iterations = params.getInt("iterations", 10);
// Multiple parameter tool (for multiple inputs)
MultipleParameterTool multiParams = MultipleParameterTool.fromArgs(args);
String[] inputs = multiParams.getMultiParameterRequired("input");All examples include corresponding data utility classes for testing with default datasets:
// Word count data
import org.apache.flink.examples.java.wordcount.util.WordCountData;
DataSet<String> defaultText = WordCountData.getDefaultTextLineDataSet(env);
// K-means data
import org.apache.flink.examples.java.clustering.util.KMeansData;
DataSet<Point> points = KMeansData.getDefaultPointDataSet(env);
DataSet<Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);
// Page rank data
import org.apache.flink.examples.java.graph.util.PageRankData;
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);