0
# MapReduce Functions
1
2
The MapReduce Functions capability enables direct integration of Hadoop Mapper and Reducer functions into Flink workflows, allowing reuse of existing MapReduce logic within Flink's DataSet API while maintaining compatibility with Hadoop's programming model.
3
4
## Overview
5
6
Flink's Hadoop compatibility layer provides wrapper classes that adapt Hadoop MapReduce functions to work as Flink operators. This enables gradual migration from MapReduce to Flink by allowing existing Mapper and Reducer implementations to run within Flink pipelines without modification.
7
8
## HadoopMapFunction
9
10
Wrapper that adapts a Hadoop Mapper to a Flink FlatMapFunction.
11
12
```java { .api }
13
@Public
14
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
15
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
16
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
17
18
// Constructor with Mapper only (uses default JobConf)
19
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
20
21
// Constructor with Mapper and custom JobConf
22
public HadoopMapFunction(
23
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
24
JobConf conf);
25
26
// Flink lifecycle method
27
public void open(Configuration parameters) throws Exception;
28
29
// Main processing method
30
public void flatMap(
31
final Tuple2<KEYIN, VALUEIN> value,
32
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
33
34
// Type information method
35
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
36
}
37
```
38
39
## HadoopReduceFunction
40
41
Wrapper that adapts a Hadoop Reducer to a non-combinable Flink GroupReduceFunction.
42
43
```java { .api }
44
@Public
45
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
46
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
47
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
48
49
// Constructor with Reducer only (uses default JobConf)
50
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
51
52
// Constructor with Reducer and custom JobConf
53
public HadoopReduceFunction(
54
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
55
JobConf conf);
56
57
// Flink lifecycle method
58
public void open(Configuration parameters) throws Exception;
59
60
// Main processing method
61
public void reduce(
62
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
63
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
64
65
// Type information method
66
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
67
}
68
```
69
70
## HadoopReduceCombineFunction
71
72
Wrapper that adapts both Hadoop Reducer and Combiner to a combinable Flink GroupReduceFunction.
73
74
```java { .api }
75
@Public
76
public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
77
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
78
implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
79
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
80
81
// Constructor with Reducer and Combiner (uses default JobConf)
82
public HadoopReduceCombineFunction(
83
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
84
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner);
85
86
// Constructor with Reducer, Combiner, and custom JobConf
87
public HadoopReduceCombineFunction(
88
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
89
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
90
JobConf conf);
91
92
// Flink lifecycle method
93
public void open(Configuration parameters) throws Exception;
94
95
// Main reduce processing method
96
public void reduce(
97
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
98
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
99
100
// Combine processing method for optimization
101
public void combine(
102
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
103
final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception;
104
105
// Type information method
106
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
107
}
108
```
109
110
## Usage Examples
111
112
### Basic WordCount with Hadoop MapReduce Functions
113
114
```java
115
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
116
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
117
import org.apache.hadoop.mapred.Mapper;
118
import org.apache.hadoop.mapred.Reducer;
119
import org.apache.hadoop.io.Text;
120
import org.apache.hadoop.io.IntWritable;
121
import org.apache.hadoop.io.LongWritable;
122
123
// Hadoop Mapper implementation
124
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
125
private final static IntWritable one = new IntWritable(1);
126
private Text word = new Text();
127
128
@Override
129
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
130
Reporter reporter) throws IOException {
131
StringTokenizer tokenizer = new StringTokenizer(value.toString());
132
while (tokenizer.hasMoreTokens()) {
133
word.set(tokenizer.nextToken().toLowerCase());
134
output.collect(word, one);
135
}
136
}
137
}
138
139
// Hadoop Reducer implementation
140
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
141
private IntWritable result = new IntWritable();
142
143
@Override
144
public void reduce(Text key, Iterator<IntWritable> values,
145
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
146
int sum = 0;
147
while (values.hasNext()) {
148
sum += values.next().get();
149
}
150
result.set(sum);
151
output.collect(key, result);
152
}
153
}
154
155
// Use in Flink pipeline
156
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
157
158
// Read input data
159
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
160
HadoopInputs.readHadoopFile(/* input format configuration */)
161
);
162
163
// Apply Hadoop mapper
164
DataSet<Tuple2<Text, IntWritable>> mappedData = input
165
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));
166
167
// Group by key and apply Hadoop reducer
168
DataSet<Tuple2<Text, IntWritable>> result = mappedData
169
.groupBy(0)
170
.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));
171
```
172
173
### Using Combiner for Optimization
174
175
```java
176
// Combiner that performs partial aggregation
177
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
178
private IntWritable result = new IntWritable();
179
180
@Override
181
public void reduce(Text key, Iterator<IntWritable> values,
182
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
183
int sum = 0;
184
while (values.hasNext()) {
185
sum += values.next().get();
186
}
187
result.set(sum);
188
output.collect(key, result);
189
}
190
}
191
192
// Use HadoopReduceCombineFunction for better performance
193
DataSet<Tuple2<Text, IntWritable>> optimizedResult = mappedData
194
.groupBy(0)
195
.reduceGroup(new HadoopReduceCombineFunction<>(
196
new IntSumReducer(), // Final reducer
197
new IntSumCombiner() // Combiner for pre-aggregation
198
));
199
```
200
201
### Custom Configuration
202
203
```java
204
import org.apache.hadoop.mapred.JobConf;
205
206
// Configure Hadoop MapReduce job settings
207
JobConf jobConf = new JobConf();
208
jobConf.set("mapreduce.job.name", "Flink-Hadoop Integration");
209
jobConf.set("custom.parameter", "custom-value");
210
jobConf.setInt("custom.int.parameter", 42);
211
212
// Use custom configuration with MapReduce functions
213
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapperWithConfig =
214
new HadoopMapFunction<>(new TokenizerMapper(), jobConf);
215
216
HadoopReduceFunction<Text, IntWritable, Text, IntWritable> reducerWithConfig =
217
new HadoopReduceFunction<>(new IntSumReducer(), jobConf);
218
219
// Apply in pipeline
220
DataSet<Tuple2<Text, IntWritable>> result = input
221
.flatMap(mapperWithConfig)
222
.groupBy(0)
223
.reduceGroup(reducerWithConfig);
224
```
225
226
### Complex Data Processing
227
228
```java
229
// Example with custom Writable types
230
public static class DataRecord implements Writable {
231
private String category;
232
private double value;
233
private long timestamp;
234
235
// Writable implementation...
236
}
237
238
public static class CategoryKey implements Writable {
239
private String category;
240
private int hour;
241
242
// Writable implementation...
243
}
244
245
// Mapper that processes complex records
246
public static class DataProcessor extends Mapper<LongWritable, DataRecord, CategoryKey, DataRecord> {
247
private CategoryKey outputKey = new CategoryKey();
248
249
@Override
250
public void map(LongWritable key, DataRecord value,
251
OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
252
// Extract hour from timestamp
253
int hour = (int) (value.getTimestamp() / 3600000) % 24;
254
255
outputKey.setCategory(value.getCategory());
256
outputKey.setHour(hour);
257
258
output.collect(outputKey, value);
259
}
260
}
261
262
// Reducer that aggregates by category and hour
263
public static class CategoryAggregator extends Reducer<CategoryKey, DataRecord, CategoryKey, DataRecord> {
264
private DataRecord result = new DataRecord();
265
266
@Override
267
public void reduce(CategoryKey key, Iterator<DataRecord> values,
268
OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
269
double sum = 0.0;
270
int count = 0;
271
272
while (values.hasNext()) {
273
sum += values.next().getValue();
274
count++;
275
}
276
277
result.setCategory(key.getCategory());
278
result.setValue(sum / count); // Average
279
result.setTimestamp(System.currentTimeMillis());
280
281
output.collect(key, result);
282
}
283
}
284
285
// Use in Flink pipeline
286
DataSet<Tuple2<CategoryKey, DataRecord>> aggregatedData = rawData
287
.flatMap(new HadoopMapFunction<>(new DataProcessor()))
288
.groupBy(0)
289
.reduceGroup(new HadoopReduceFunction<>(new CategoryAggregator()));
290
```
291
292
## Performance Considerations
293
294
### Object Reuse
295
296
```java
297
// Enable object reuse for better performance with Hadoop functions
298
env.getConfig().enableObjectReuse();
299
300
// This is particularly beneficial when using Hadoop functions as they
301
// typically create many temporary objects
302
```
303
304
### Combiner Usage
305
306
```java
307
// Always use combiners when possible for commutative and associative operations
308
// This reduces network traffic and improves performance
309
310
// Good candidates for combiners:
311
// - Sum operations
312
// - Count operations
313
// - Min/Max operations
314
// - Set union operations
315
316
// Bad candidates for combiners:
317
// - Operations that need to see all values
318
// - Non-associative operations
319
// - Operations with side effects
320
```
321
322
### Configuration Tuning
323
324
```java
325
JobConf conf = new JobConf();
326
327
// Configure memory settings
328
conf.setInt("mapreduce.map.memory.mb", 1024);
329
conf.setInt("mapreduce.reduce.memory.mb", 2048);
330
331
// Configure JVM options
332
conf.set("mapreduce.map.java.opts", "-Xmx800m");
333
conf.set("mapreduce.reduce.java.opts", "-Xmx1600m");
334
335
// Configure buffer sizes
336
conf.setInt("io.sort.mb", 256);
337
conf.setFloat("io.sort.spill.percent", 0.8f);
338
```
339
340
## Error Handling
341
342
```java
343
try {
344
DataSet<Tuple2<Text, IntWritable>> result = input
345
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()))
346
.groupBy(0)
347
.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));
348
349
result.print();
350
env.execute();
351
352
} catch (Exception e) {
353
// Handle various exceptions
354
if (e.getCause() instanceof IOException) {
355
logger.error("I/O error in Hadoop function: " + e.getMessage());
356
} else if (e.getCause() instanceof InterruptedException) {
357
logger.error("Hadoop function was interrupted: " + e.getMessage());
358
} else {
359
logger.error("Unexpected error: " + e.getMessage());
360
}
361
}
362
```
363
364
## Migration Best Practices
365
366
### Gradual Migration Strategy
367
368
1. **Start with Input/Output**: Use Hadoop InputFormats and OutputFormats with native Flink operations
369
2. **Migrate Logic Gradually**: Replace Hadoop functions one by one with native Flink operations
370
3. **Optimize Performance**: Use Flink-native operations for better performance where possible
371
4. **Maintain Compatibility**: Keep Hadoop functions for complex logic that's hard to rewrite
372
373
### Testing Hadoop Functions in Flink
374
375
```java
376
// Create test data
377
List<Tuple2<LongWritable, Text>> testInput = Arrays.asList(
378
new Tuple2<>(new LongWritable(1), new Text("hello world")),
379
new Tuple2<>(new LongWritable(2), new Text("hello flink"))
380
);
381
382
DataSet<Tuple2<LongWritable, Text>> input = env.fromCollection(testInput);
383
384
// Test mapper output
385
DataSet<Tuple2<Text, IntWritable>> mapperOutput = input
386
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));
387
388
// Collect and verify results in tests
389
List<Tuple2<Text, IntWritable>> results = mapperOutput.collect();
390
assertEquals(3, results.size()); // "hello", "world", "hello", "flink"
391
```
392
393
### Common Migration Patterns
394
395
```java
396
// Replace Hadoop Identity operations with Flink map
397
// Before: Hadoop IdentityMapper
398
DataSet<Tuple2<K, V>> output = input.map(tuple -> tuple);
399
400
// Replace simple aggregations with Flink reduce
401
// Before: Hadoop sum reducer
402
DataSet<Tuple2<Text, IntWritable>> sums = input
403
.groupBy(0)
404
.reduce((a, b) -> new Tuple2<>(a.f0, new IntWritable(a.f1.get() + b.f1.get())));
405
406
// Replace filtering with Flink filter
407
// Before: Hadoop filtering mapper
408
DataSet<Tuple2<K, V>> filtered = input.filter(tuple -> someCondition(tuple.f1));
409
```