Text processing examples demonstrating tokenization, aggregation, and result output patterns. Includes both tuple-based and POJO-based implementations of the classic WordCount algorithm.
Classic word counting example that computes word occurrence histogram over text files.
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files.
* Usage: WordCount --input <path> --output <path>
*/
public class WordCount {
public static void main(String[] args) throws Exception;
/**
* Tokenizer that splits sentences into words as (word,1) tuples
*/
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
/**
* Splits input string into tokens and emits (word, 1) pairs
* @param value Input string to tokenize
* @param out Collector for emitting word-count pairs
*/
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
}
}Usage Examples:
// Run with file input/output
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
WordCount.main(args);
// Run with default data (prints to stdout)
String[] emptyArgs = {};
WordCount.main(emptyArgs);
// Use tokenizer directly in custom DataSet operations
DataSet<String> text = env.fromElements("hello world", "hello flink");
DataSet<Tuple2<String, Integer>> counts = text
.flatMap(new WordCount.Tokenizer())
.groupBy(0)
.sum(1);POJO-based variant of WordCount using custom Word objects instead of tuples.
/**
* POJO-based WordCount implementation demonstrating custom object usage
*/
public class WordCountPojo {
public static void main(String[] args) throws Exception;
/**
* Word POJO with word and frequency fields
*/
public static class Word {
private String word;
private int frequency;
public Word();
public Word(String word, int frequency);
public String getWord();
public void setWord(String word);
public int getFrequency();
public void setFrequency(int frequency);
@Override
public String toString();
}
/**
* Tokenizer that splits sentences into Word POJOs
*/
public static final class Tokenizer
implements FlatMapFunction<String, Word> {
/**
* Splits input string into tokens and emits Word objects
* @param value Input string to tokenize
* @param out Collector for emitting Word objects
*/
public void flatMap(String value, Collector<Word> out);
}
}Usage Examples:
// Run POJO-based word count
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
WordCountPojo.main(args);
// Use Word POJO in custom operations
DataSet<String> text = env.fromElements("hello world");
DataSet<Word> words = text.flatMap(new WordCountPojo.Tokenizer());
DataSet<Word> counts = words
.groupBy("word") // Group by word field
.sum("frequency"); // Sum frequency fieldUtility class providing default text data for testing and examples.
/**
* Provides default data sets for WordCount examples
*/
public class WordCountData {
/**
* Array of Shakespeare text lines used as default input
*/
public static final String[] WORDS;
/**
* Creates DataSet with default text lines
* @param env Execution environment
* @return DataSet containing default text data
*/
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);
}Usage Examples:
import org.apache.flink.examples.java.wordcount.util.WordCountData;
// Use default data in your own applications
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> defaultText = WordCountData.getDefaultTextLineDataSet(env);
// Access raw data directly
String[] textLines = WordCountData.WORDS;
System.out.println("Number of lines: " + textLines.length);Both WordCount examples support flexible input/output options:
// File input with multiple files
String[] args = {
"--input", "/path/to/file1.txt",
"--input", "/path/to/file2.txt",
"--output", "/path/to/output"
};
// Default data (no input parameter)
String[] args = {"--output", "/path/to/output"};
// Print to stdout (no output parameter)
String[] args = {"--input", "/path/to/input.txt"};Standard parameter handling pattern used across word count examples:
// Parse parameters
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// Check for input files
if (params.has("input")) {
// Read from files
for (String input : params.getMultiParameterRequired("input")) {
// Process each input file
}
} else {
// Use default data
text = WordCountData.getDefaultTextLineDataSet(env);
}
// Handle output
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute("WordCount Example");
} else {
counts.print(); // Print to stdout
}The tokenization logic normalizes and splits text:
// Inside Tokenizer.flatMap()
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}// Flink tuple for word-count pairs
import org.apache.flink.api.java.tuple.Tuple2;
Tuple2<String, Integer> wordCount = new Tuple2<>("word", 1);
// Flink collector for emitting results
import org.apache.flink.util.Collector;
// Parameter handling utilities
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.api.java.utils.ParameterTool;