0
# Word Count Processing
1
2
Text processing examples demonstrating tokenization, aggregation, and result output patterns. Includes both tuple-based and POJO-based implementations of the classic WordCount algorithm.
3
4
## Capabilities
5
6
### WordCount Example
7
8
Classic word counting example that computes word occurrence histogram over text files.
9
10
```java { .api }
11
/**
12
* Implements the "WordCount" program that computes a simple word occurrence histogram over text files.
13
* Usage: WordCount --input <path> --output <path>
14
*/
15
public class WordCount {
16
public static void main(String[] args) throws Exception;
17
18
/**
19
* Tokenizer that splits sentences into words as (word,1) tuples
20
*/
21
public static final class Tokenizer
22
implements FlatMapFunction<String, Tuple2<String, Integer>> {
23
/**
24
* Splits input string into tokens and emits (word, 1) pairs
25
* @param value Input string to tokenize
26
* @param out Collector for emitting word-count pairs
27
*/
28
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
29
}
30
}
31
```
32
33
**Usage Examples:**
34
35
```java
36
// Run with file input/output
37
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
38
WordCount.main(args);
39
40
// Run with default data (prints to stdout)
41
String[] emptyArgs = {};
42
WordCount.main(emptyArgs);
43
44
// Use tokenizer directly in custom DataSet operations
45
DataSet<String> text = env.fromElements("hello world", "hello flink");
46
DataSet<Tuple2<String, Integer>> counts = text
47
.flatMap(new WordCount.Tokenizer())
48
.groupBy(0)
49
.sum(1);
50
```
51
52
### WordCount POJO Example
53
54
POJO-based variant of WordCount using custom Word objects instead of tuples.
55
56
```java { .api }
57
/**
58
* POJO-based WordCount implementation demonstrating custom object usage
59
*/
60
public class WordCountPojo {
61
public static void main(String[] args) throws Exception;
62
63
/**
64
* Word POJO with word and frequency fields
65
*/
66
public static class Word {
67
private String word;
68
private int frequency;
69
70
public Word();
71
public Word(String word, int frequency);
72
73
public String getWord();
74
public void setWord(String word);
75
public int getFrequency();
76
public void setFrequency(int frequency);
77
78
@Override
79
public String toString();
80
}
81
82
/**
83
* Tokenizer that splits sentences into Word POJOs
84
*/
85
public static final class Tokenizer
86
implements FlatMapFunction<String, Word> {
87
/**
88
* Splits input string into tokens and emits Word objects
89
* @param value Input string to tokenize
90
* @param out Collector for emitting Word objects
91
*/
92
public void flatMap(String value, Collector<Word> out);
93
}
94
}
95
```
96
97
**Usage Examples:**
98
99
```java
100
// Run POJO-based word count
101
String[] args = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
102
WordCountPojo.main(args);
103
104
// Use Word POJO in custom operations
105
DataSet<String> text = env.fromElements("hello world");
106
DataSet<Word> words = text.flatMap(new WordCountPojo.Tokenizer());
107
DataSet<Word> counts = words
108
.groupBy("word") // Group by word field
109
.sum("frequency"); // Sum frequency field
110
```
111
112
### Default Data Provider
113
114
Utility class providing default text data for testing and examples.
115
116
```java { .api }
117
/**
118
* Provides default data sets for WordCount examples
119
*/
120
public class WordCountData {
121
/**
122
* Array of Shakespeare text lines used as default input
123
*/
124
public static final String[] WORDS;
125
126
/**
127
* Creates DataSet with default text lines
128
* @param env Execution environment
129
* @return DataSet containing default text data
130
*/
131
public static DataSet<String> getDefaultTextLineDataSet(ExecutionEnvironment env);
132
}
133
```
134
135
**Usage Examples:**
136
137
```java
138
import org.apache.flink.examples.java.wordcount.util.WordCountData;
139
140
// Use default data in your own applications
141
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
142
DataSet<String> defaultText = WordCountData.getDefaultTextLineDataSet(env);
143
144
// Access raw data directly
145
String[] textLines = WordCountData.WORDS;
146
System.out.println("Number of lines: " + textLines.length);
147
```
148
149
## Common Usage Patterns
150
151
### Input/Output Handling
152
153
Both WordCount examples support flexible input/output options:
154
155
```java
156
// File input with multiple files
157
String[] args = {
158
"--input", "/path/to/file1.txt",
159
"--input", "/path/to/file2.txt",
160
"--output", "/path/to/output"
161
};
162
163
// Default data (no input parameter)
164
String[] args = {"--output", "/path/to/output"};
165
166
// Print to stdout (no output parameter)
167
String[] args = {"--input", "/path/to/input.txt"};
168
```
169
170
### Parameter Processing
171
172
Standard parameter handling pattern used across word count examples:
173
174
```java
175
// Parse parameters
176
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
177
178
// Check for input files
179
if (params.has("input")) {
180
// Read from files
181
for (String input : params.getMultiParameterRequired("input")) {
182
// Process each input file
183
}
184
} else {
185
// Use default data
186
text = WordCountData.getDefaultTextLineDataSet(env);
187
}
188
189
// Handle output
190
if (params.has("output")) {
191
counts.writeAsCsv(params.get("output"), "\n", " ");
192
env.execute("WordCount Example");
193
} else {
194
counts.print(); // Print to stdout
195
}
196
```
197
198
### Tokenization Logic
199
200
The tokenization logic normalizes and splits text:
201
202
```java
203
// Inside Tokenizer.flatMap()
204
String[] tokens = value.toLowerCase().split("\\W+");
205
206
for (String token : tokens) {
207
if (token.length() > 0) {
208
out.collect(new Tuple2<>(token, 1));
209
}
210
}
211
```
212
213
## Types
214
215
### Core Types
216
217
```java { .api }
218
// Flink tuple for word-count pairs
219
import org.apache.flink.api.java.tuple.Tuple2;
220
Tuple2<String, Integer> wordCount = new Tuple2<>("word", 1);
221
222
// Flink collector for emitting results
223
import org.apache.flink.util.Collector;
224
225
// Parameter handling utilities
226
import org.apache.flink.api.java.utils.MultipleParameterTool;
227
import org.apache.flink.api.java.utils.ParameterTool;
228
```