Apache Flink batch processing examples demonstrating various algorithms and use cases including WordCount, PageRank, KMeans clustering, Connected Components, and graph processing
Text processing examples demonstrating tokenization, aggregation, and result output patterns. Includes both tuple-based and POJO-based implementations of the classic WordCount algorithm.
Classic word counting example that computes word occurrence histogram over text files.
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files.
* Usage: WordCount --input <path> --output <path>
*/
public class WordCount {
public static void main(String[] args) throws Exception;
/**
* Tokenizer that splits sentences into words as (word,1) tuples
*/
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
/**
* Splits input string into tokens and emits (word, 1) pairs
* @param value Input string to tokenize
* @param out Collector for emitting word-count pairs
*/
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
}
}Usage Examples:
// Run with file input/output
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
WordCount.main(args);
// Run with default data (prints to stdout)
String[] emptyArgs = {};
WordCount.main(emptyArgs);
// Use tokenizer directly in custom DataSet operations
DataSet<String> text = env.fromElements("hello world", "hello flink");
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new WordCount.Tokenizer())
.groupBy(0)
.sum(1);POJO-based variant of WordCount using custom Word objects instead of tuples.
/**
* POJO-based WordCount implementation demonstrating custom object usage
*/
public class WordCountPojo {
public static void main(String[] args) throws Exception;
/**
* Word POJO with word and frequency fields
*/
public static class Word {
private String word;
private int frequency;
public Word();
public Word(String word, int frequency);
public String getWord();
public void setWord(String word);
public int getFrequency();
public void setFrequency(int frequency);
@Override
public String toString();
}
/**
* Tokenizer that splits sentences into Word POJOs
*/
public static final class Tokenizer
implements FlatMapFunction<String, Word> {
/**
* Splits input string into tokens and emits Word objects
* @param value Input string to tokenize
* @param out Collector for emitting Word objects
*/
public void flatMap(String value, Collector<Word> out);
}
}Usage Examples:
// Run POJO-based word count
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
WordCountPojo.main(args);
// Use Word POJO in custom operations
DataSet<String> text = env.fromElements("hello world");
DataSet<Word> words = text.flatMap(new WordCountPojo.Tokenizer());
DataSet<Word> counts = words
.groupBy("word") // Group by word field
.sum("frequency"); // Sum frequency fieldUtility class providing default text data for testing and examples.
/**
* Provides default data sets for WordCount examples
*/
public class WordCountData {
/**
* Array of Shakespeare text lines used as default input
*/
public static final String[] WORDS;
/**
* Creates DataSet with default text lines
* @param env Execution environment
* @return DataSet containing default text data
*/
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);
}Usage Examples:
import org.apache.flink.examples.java.wordcount.util.WordCountData;
// Use default data in your own applications
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> defaultText = WordCountData.getDefaultTextLineDataSet(env);
// Access raw data directly
String[] textLines = WordCountData.WORDS;
System.out.println("Number of lines: " + textLines.length);Both WordCount examples support flexible input/output options:
// File input with multiple files
String[] args = {
"--input", "/path/to/file1.txt",
"--input", "/path/to/file2.txt",
"--output", "/path/to/output"
};
// Default data (no input parameter)
String[] args = {"--output", "/path/to/output"};
// Print to stdout (no output parameter)
String[] args = {"--input", "/path/to/input.txt"};Standard parameter handling pattern used across word count examples:
// Parse parameters
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// Check for input files
if (params.has("input")) {
// Read from files
for (String input : params.getMultiParameterRequired("input")) {
// Process each input file
}
} else {
// Use default data
text = WordCountData.getDefaultTextLineDataSet(env);
}
// Handle output
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute("WordCount Example");
} else {
counts.print(); // Print to stdout
}The tokenization logic normalizes and splits text:
// Inside Tokenizer.flatMap()
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}// Flink tuple for word-count pairs
import org.apache.flink.api.java.tuple.Tuple2;
Tuple2<String, Integer> wordCount = new Tuple2<>("word", 1);
// Flink collector for emitting results
import org.apache.flink.util.Collector;
// Parameter handling utilities
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.api.java.utils.ParameterTool;Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-batch-2-11