0
# Apache Flink Batch Examples
1
2
Apache Flink Batch Examples is a comprehensive collection of batch processing examples demonstrating various algorithms and use cases. It provides executable JAR files and reusable components for WordCount, PageRank, KMeans clustering, Connected Components, graph processing, relational operations, and distributed file operations.
3
4
## Package Information
5
6
- **Package Name**: flink-examples-batch_2.11
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**: Include as Maven dependency:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-examples-batch_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
// Main example classes
22
import org.apache.flink.examples.java.wordcount.WordCount;
23
import org.apache.flink.examples.java.clustering.KMeans;
24
import org.apache.flink.examples.java.graph.PageRank;
25
import org.apache.flink.examples.java.graph.ConnectedComponents;
26
import org.apache.flink.examples.java.relational.WebLogAnalysis;
27
import org.apache.flink.examples.java.distcp.DistCp;
28
29
// Flink API imports
30
import org.apache.flink.api.java.ExecutionEnvironment;
31
import org.apache.flink.api.java.DataSet;
32
import org.apache.flink.api.java.utils.ParameterTool;
33
```
34
35
## Basic Usage
36
37
```java
38
// Run WordCount example programmatically
39
import org.apache.flink.api.java.ExecutionEnvironment;
40
import org.apache.flink.examples.java.wordcount.WordCount;
41
42
public class ExampleUsage {
43
public static void main(String[] args) throws Exception {
44
// Basic execution pattern for any example
45
String[] exampleArgs = {"--input", "/path/to/input.txt", "--output", "/path/to/output"};
46
WordCount.main(exampleArgs);
47
}
48
}
49
```
50
51
## Architecture
52
53
The Apache Flink Batch Examples library is organized around several key architectural patterns:
54
55
- **Executable Examples**: Each example is a standalone main class that can be run as a JAR
56
- **Data Types**: Custom POJOs and tuple types for specific domains (Point, Centroid, Edge, etc.)
57
- **User Functions**: Reusable MapFunction, ReduceFunction, FilterFunction implementations
58
- **Data Providers**: Utility classes providing default datasets for testing
59
- **Iterative Algorithms**: Bulk iteration patterns for algorithms like KMeans and PageRank
60
- **Parameter Handling**: Consistent use of ParameterTool for command-line arguments
61
62
## Capabilities
63
64
### Word Count Processing
65
66
Text processing examples including classic WordCount and POJO-based variants. Features tokenization, aggregation, and result output.
67
68
```java { .api }
69
public class WordCount {
70
public static void main(String[] args) throws Exception;
71
72
public static final class Tokenizer
73
implements FlatMapFunction<String, Tuple2<String, Integer>> {
74
public void flatMap(String value, Collector<Tuple2<String, Integer>> out);
75
}
76
}
77
```
78
79
[Word Count Processing](./word-count.md)
80
81
### Clustering Algorithms
82
83
Machine learning examples with K-Means clustering implementation for 2D data points, including iterative algorithm patterns and custom data types.
84
85
```java { .api }
86
public class KMeans {
87
public static void main(String[] args) throws Exception;
88
89
public static class Point implements Serializable {
90
public double x, y;
91
public Point(double x, double y);
92
public double euclideanDistance(Point other);
93
public Point add(Point other);
94
public Point div(long val);
95
}
96
97
public static class Centroid extends Point {
98
public int id;
99
public Centroid(int id, double x, double y);
100
}
101
}
102
```
103
104
[Clustering Algorithms](./clustering.md)
105
106
### Graph Processing
107
108
Graph algorithms including PageRank, Connected Components, Triangle Enumeration, and Transitive Closure with specialized data types and iterative processing patterns.
109
110
```java { .api }
111
public class PageRank {
112
public static void main(String[] args) throws Exception;
113
114
public static final class RankAssigner
115
implements MapFunction<Long, Tuple2<Long, Double>>;
116
public static final class Dampener
117
implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>>;
118
}
119
120
public class ConnectedComponents {
121
public static void main(String[] args) throws Exception;
122
}
123
```
124
125
[Graph Processing](./graph-processing.md)
126
127
### Relational Processing
128
129
SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples for custom metrics collection.
130
131
```java { .api }
132
public class WebLogAnalysis {
133
public static void main(String[] args) throws Exception;
134
135
public static class FilterDocByKeyWords
136
implements FilterFunction<Tuple2<String, String>>;
137
public static class FilterByRank
138
implements FilterFunction<Tuple3<Integer, String, Integer>>;
139
}
140
141
public class TPCHQuery3 {
142
public static void main(String[] args) throws Exception;
143
144
public static class Lineitem extends Tuple4<Long, Double, Double, String>;
145
public static class Customer extends Tuple2<Long, String>;
146
public static class Order extends Tuple4<Long, Long, String, Long>;
147
}
148
```
149
150
[Relational Processing](./relational-processing.md)
151
152
### Distributed File Operations
153
154
Distributed file copying utility similar to Hadoop DistCp, with custom input formats and parallel file processing capabilities.
155
156
```java { .api }
157
public class DistCp {
158
public static void main(String[] args) throws Exception;
159
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
160
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
161
}
162
163
public class FileCopyTask {
164
public FileCopyTask(Path path, String relativePath);
165
public Path getPath();
166
public String getRelativePath();
167
}
168
```
169
170
[Distributed File Operations](./distributed-copy.md)
171
172
### Miscellaneous Examples
173
174
Additional examples including Pi estimation using Monte Carlo method, collection-based execution patterns, and POJO usage demonstrations.
175
176
```java { .api }
177
public class PiEstimation {
178
public static void main(String[] args) throws Exception;
179
180
public static class Sampler implements MapFunction<Long, Long>;
181
}
182
183
public class CollectionExecutionExample {
184
public static void main(String[] args) throws Exception;
185
186
public static class User {
187
public int userIdentifier;
188
public String name;
189
}
190
}
191
```
192
193
[Miscellaneous Examples](./misc-examples.md)
194
195
## Common Patterns
196
197
### Execution Environment Setup
198
199
All examples follow a consistent pattern for setting up the Flink execution environment:
200
201
```java { .api }
202
// Standard execution environment setup
203
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
204
205
// Parameter handling
206
final ParameterTool params = ParameterTool.fromArgs(args);
207
env.getConfig().setGlobalJobParameters(params);
208
209
// Data source creation
210
DataSet<String> text = env.readTextFile(params.get("input"));
211
212
// Execute job
213
env.execute("Job Name");
214
```
215
216
### Parameter Handling
217
218
Consistent parameter handling across all examples:
219
220
```java { .api }
221
import org.apache.flink.api.java.utils.ParameterTool;
222
import org.apache.flink.api.java.utils.MultipleParameterTool;
223
224
// Single parameter tool
225
ParameterTool params = ParameterTool.fromArgs(args);
226
String inputPath = params.get("input");
227
int iterations = params.getInt("iterations", 10);
228
229
// Multiple parameter tool (for multiple inputs)
230
MultipleParameterTool multiParams = MultipleParameterTool.fromArgs(args);
231
String[] inputs = multiParams.getMultiParameterRequired("input");
232
```
233
234
## Data Utility Classes
235
236
All examples include corresponding data utility classes for testing with default datasets:
237
238
```java { .api }
239
// Word count data
240
import org.apache.flink.examples.java.wordcount.util.WordCountData;
241
DataSet<String> defaultText = WordCountData.getDefaultTextLineDataSet(env);
242
243
// K-means data
244
import org.apache.flink.examples.java.clustering.util.KMeansData;
245
DataSet<Point> points = KMeansData.getDefaultPointDataSet(env);
246
DataSet<Centroid> centroids = KMeansData.getDefaultCentroidDataSet(env);
247
248
// Page rank data
249
import org.apache.flink.examples.java.graph.util.PageRankData;
250
DataSet<Long> pages = PageRankData.getDefaultPagesDataSet(env);
251
DataSet<Tuple2<Long, Long>> links = PageRankData.getDefaultEdgeDataSet(env);
252
```