0
# Function Wrappers
1
2
Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic within Flink applications. These wrappers bridge the gap between Hadoop's MapReduce programming model and Flink's functional API.
3
4
## Capabilities
5
6
### HadoopMapFunction
7
8
Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction, allowing existing Hadoop Mapper implementations to be used directly in Flink transformations.
9
10
```java { .api }
11
/**
12
* Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction
13
* @param <KEYIN> Input key type
14
* @param <VALUEIN> Input value type
15
* @param <KEYOUT> Output key type
16
* @param <VALUEOUT> Output value type
17
*/
18
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
19
implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
20
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
21
22
/**
23
* Creates a HadoopMapFunction wrapper with default JobConf
24
* @param hadoopMapper The Hadoop Mapper to wrap
25
*/
26
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
27
28
/**
29
* Creates a HadoopMapFunction wrapper with custom JobConf
30
* @param hadoopMapper The Hadoop Mapper to wrap
31
* @param conf JobConf for Hadoop configuration
32
*/
33
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
34
35
/**
36
* Opens and configures the mapper for processing
37
* @param openContext Flink's open context for initialization
38
* @throws Exception if mapper configuration fails
39
*/
40
public void open(OpenContext openContext) throws Exception;
41
42
/**
43
* Processes input records using the wrapped Hadoop Mapper
44
* @param value Input tuple containing key-value pair
45
* @param out Collector for output tuples
46
* @throws Exception if mapping operation fails
47
*/
48
public void flatMap(
49
Tuple2<KEYIN, VALUEIN> value,
50
Collector<Tuple2<KEYOUT, VALUEOUT>> out
51
) throws Exception;
52
53
/**
54
* Returns output type information for type safety
55
* @return TypeInformation for output tuples
56
*/
57
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
58
}
59
```
60
61
**Usage Examples:**
62
63
```java
64
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
65
import org.apache.flink.api.java.ExecutionEnvironment;
66
import org.apache.flink.api.java.DataSet;
67
import org.apache.hadoop.io.LongWritable;
68
import org.apache.hadoop.io.Text;
69
import org.apache.hadoop.mapred.Mapper;
70
import org.apache.hadoop.mapred.OutputCollector;
71
import org.apache.hadoop.mapred.Reporter;
72
import org.apache.hadoop.mapred.JobConf;
73
74
// Custom Hadoop Mapper
75
public class WordTokenizer implements Mapper<LongWritable, Text, Text, IntWritable> {
76
private final static IntWritable one = new IntWritable(1);
77
private Text word = new Text();
78
79
@Override
80
public void map(LongWritable key, Text value,
81
OutputCollector<Text, IntWritable> output, Reporter reporter) {
82
String[] words = value.toString().toLowerCase().split("\\s+");
83
for (String w : words) {
84
if (!w.isEmpty()) {
85
word.set(w);
86
output.collect(word, one);
87
}
88
}
89
}
90
91
@Override
92
public void configure(JobConf job) {}
93
94
@Override
95
public void close() {}
96
}
97
98
// Use in Flink application
99
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
100
101
DataSet<Tuple2<LongWritable, Text>> lines = // ... input data
102
103
// Apply Hadoop Mapper as Flink function
104
DataSet<Tuple2<Text, IntWritable>> words = lines.flatMap(
105
new HadoopMapFunction<>(new WordTokenizer())
106
);
107
108
// Continue with Flink operations
109
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
110
.groupBy(0)
111
.sum(1);
112
```
113
114
### HadoopReduceFunction
115
116
Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction, enabling existing Hadoop Reducer logic to work with Flink's grouped operations.
117
118
```java { .api }
119
/**
120
* Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction
121
* @param <KEYIN> Input key type
122
* @param <VALUEIN> Input value type
123
* @param <KEYOUT> Output key type
124
* @param <VALUEOUT> Output value type
125
*/
126
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
127
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
128
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
129
130
/**
131
* Creates a HadoopReduceFunction wrapper with default JobConf
132
* @param hadoopReducer The Hadoop Reducer to wrap
133
*/
134
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
135
136
/**
137
* Creates a HadoopReduceFunction wrapper with custom JobConf
138
* @param hadoopReducer The Hadoop Reducer to wrap
139
* @param conf JobConf for Hadoop configuration
140
*/
141
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
142
143
/**
144
* Opens and configures the reducer for processing
145
* @param openContext Flink's open context for initialization
146
* @throws Exception if reducer configuration fails
147
*/
148
public void open(OpenContext openContext) throws Exception;
149
150
/**
151
* Reduces input records using the wrapped Hadoop Reducer
152
* @param values Iterable of input tuples with the same key
153
* @param out Collector for output tuples
154
* @throws Exception if reduce operation fails
155
*/
156
public void reduce(
157
Iterable<Tuple2<KEYIN, VALUEIN>> values,
158
Collector<Tuple2<KEYOUT, VALUEOUT>> out
159
) throws Exception;
160
161
/**
162
* Returns output type information for type safety
163
* @return TypeInformation for output tuples
164
*/
165
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
166
}
167
```
168
169
**Usage Examples:**
170
171
```java
172
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
173
import org.apache.hadoop.mapred.Reducer;
174
import org.apache.hadoop.mapred.OutputCollector;
175
import org.apache.hadoop.mapred.Reporter;
176
import java.util.Iterator;
177
178
// Custom Hadoop Reducer
179
public class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
180
private IntWritable result = new IntWritable();
181
182
@Override
183
public void reduce(Text key, Iterator<IntWritable> values,
184
OutputCollector<Text, IntWritable> output, Reporter reporter) {
185
int sum = 0;
186
while (values.hasNext()) {
187
sum += values.next().get();
188
}
189
result.set(sum);
190
output.collect(key, result);
191
}
192
193
@Override
194
public void configure(JobConf job) {}
195
196
@Override
197
public void close() {}
198
}
199
200
// Use in Flink application
201
DataSet<Tuple2<Text, IntWritable>> words = // ... word data grouped by key
202
203
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
204
.groupBy(0)
205
.reduceGroup(new HadoopReduceFunction<>(new WordCountReducer()));
206
```
207
208
### HadoopReduceCombineFunction
209
210
Wrapper that maps both a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction, providing optimal performance through pre-aggregation.
211
212
```java { .api }
213
/**
214
* Wrapper that maps Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction
215
* @param <KEYIN> Input key type
216
* @param <VALUEIN> Input value type
217
* @param <KEYOUT> Output key type
218
* @param <VALUEOUT> Output value type
219
*/
220
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
221
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
222
GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
223
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
224
225
/**
226
* Creates a HadoopReduceCombineFunction wrapper with default JobConf
227
* @param hadoopReducer The Hadoop Reducer to wrap
228
* @param hadoopCombiner The Hadoop Combiner to wrap
229
*/
230
public HadoopReduceCombineFunction(
231
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
232
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner
233
);
234
235
/**
236
* Creates a HadoopReduceCombineFunction wrapper with custom JobConf
237
* @param hadoopReducer The Hadoop Reducer to wrap
238
* @param hadoopCombiner The Hadoop Combiner to wrap
239
* @param conf JobConf for Hadoop configuration
240
*/
241
public HadoopReduceCombineFunction(
242
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
243
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
244
JobConf conf
245
);
246
247
/**
248
* Opens and configures reducer and combiner for processing
249
* @param openContext Flink's open context for initialization
250
* @throws Exception if configuration fails
251
*/
252
public void open(OpenContext openContext) throws Exception;
253
254
/**
255
* Reduces input records using the wrapped Hadoop Reducer
256
* @param values Iterable of input tuples with the same key
257
* @param out Collector for output tuples
258
* @throws Exception if reduce operation fails
259
*/
260
public void reduce(
261
Iterable<Tuple2<KEYIN, VALUEIN>> values,
262
Collector<Tuple2<KEYOUT, VALUEOUT>> out
263
) throws Exception;
264
265
/**
266
* Combines input records using the wrapped Hadoop Combiner for pre-aggregation
267
* @param values Iterable of input tuples to combine
268
* @param out Collector for combined tuples
269
* @throws Exception if combine operation fails
270
*/
271
public void combine(
272
Iterable<Tuple2<KEYIN, VALUEIN>> values,
273
Collector<Tuple2<KEYIN, VALUEIN>> out
274
) throws Exception;
275
276
/**
277
* Returns output type information for type safety
278
* @return TypeInformation for output tuples
279
*/
280
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
281
}
282
```
283
284
**Usage Examples:**
285
286
```java
287
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
288
289
// Hadoop Combiner (same as reducer in this case)
290
public class WordCountCombiner implements Reducer<Text, IntWritable, Text, IntWritable> {
291
private IntWritable result = new IntWritable();
292
293
@Override
294
public void reduce(Text key, Iterator<IntWritable> values,
295
OutputCollector<Text, IntWritable> output, Reporter reporter) {
296
int sum = 0;
297
while (values.hasNext()) {
298
sum += values.next().get();
299
}
300
result.set(sum);
301
output.collect(key, result);
302
}
303
304
@Override public void configure(JobConf job) {}
305
@Override public void close() {}
306
}
307
308
// Use combinable reduce function
309
DataSet<Tuple2<Text, IntWritable>> words = // ... word data
310
311
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
312
.groupBy(0)
313
.reduceGroup(new HadoopReduceCombineFunction<>(
314
new WordCountReducer(), // Final reducer
315
new WordCountCombiner() // Pre-aggregation combiner
316
));
317
```
318
319
## Integration Patterns
320
321
### Configuration Management
322
323
All function wrappers support Hadoop configuration through JobConf:
324
325
```java
326
JobConf conf = new JobConf();
327
conf.set("mapred.textoutputformat.separator", "\t");
328
conf.setInt("mapred.max.split.size", 1024 * 1024);
329
330
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunc =
331
new HadoopMapFunction<>(new WordTokenizer(), conf);
332
```
333
334
### Error Handling
335
336
Function wrappers properly propagate Hadoop exceptions:
337
338
- **Configuration errors**: Invalid JobConf settings are reported during open()
339
- **Processing errors**: Mapper/Reducer exceptions are propagated to Flink
340
- **Resource cleanup**: Proper cleanup of Hadoop resources on failure or completion
341
342
### Performance Optimization
343
344
Best practices for optimal performance:
345
346
1. **Use combiners**: HadoopReduceCombineFunction for pre-aggregation
347
2. **Configure parallelism**: Adjust Flink parallelism based on data size
348
3. **Memory management**: Configure appropriate heap sizes for Hadoop operations
349
4. **Reuse objects**: Hadoop's object reuse patterns are preserved
350
351
## Migration from MapReduce
352
353
### Direct Translation
354
355
Existing MapReduce jobs can be directly translated to Flink:
356
357
```java
358
// Original MapReduce job structure
359
Job job = Job.getInstance(conf, "word count");
360
job.setJarByClass(WordCount.class);
361
job.setMapperClass(WordTokenizer.class);
362
job.setCombinerClass(WordCountReducer.class);
363
job.setReducerClass(WordCountReducer.class);
364
365
// Equivalent Flink application
366
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
367
DataSet<Tuple2<LongWritable, Text>> input = // ... input
368
DataSet<Tuple2<Text, IntWritable>> result = input
369
.flatMap(new HadoopMapFunction<>(new WordTokenizer()))
370
.groupBy(0)
371
.reduceGroup(new HadoopReduceCombineFunction<>(
372
new WordCountReducer(),
373
new WordCountReducer() // Same as combiner
374
));
375
```
376
377
### Advantages over MapReduce
378
379
1. **Iterative algorithms**: Flink's cyclic data flows vs MapReduce's acyclic model
380
2. **Memory management**: Flink's managed memory vs Hadoop's disk-based shuffling
381
3. **Real-time processing**: Stream processing capabilities alongside batch
382
4. **Lower latency**: Reduced job startup and coordination overhead