0
# Hadoop Compatibility
1
2
Hadoop MapReduce API compatibility layer for Flink, enabling seamless reuse of existing Hadoop Mapper and Reducer implementations within Flink batch programs.
3
4
## Capabilities
5
6
### HadoopMapFunction
7
8
Wraps Hadoop Mapper (mapred API) as a Flink FlatMapFunction for seamless integration.
9
10
```java { .api }
11
/**
12
* Wraps Hadoop Mapper (mapred API) to Flink FlatMapFunction
13
* @param <KEYIN> Input key type for the Hadoop mapper
14
* @param <VALUEIN> Input value type for the Hadoop mapper
15
* @param <KEYOUT> Output key type from the Hadoop mapper
16
* @param <VALUEOUT> Output value type from the Hadoop mapper
17
*/
18
@Public
19
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
20
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
21
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
22
23
/**
24
* Creates a HadoopMapFunction with a Hadoop mapper
25
* @param hadoopMapper The Hadoop Mapper instance to wrap
26
*/
27
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
28
29
/**
30
* Creates a HadoopMapFunction with a Hadoop mapper and job configuration
31
* @param hadoopMapper The Hadoop Mapper instance to wrap
32
* @param conf Hadoop JobConf with configuration parameters
33
*/
34
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
35
36
/**
37
* Returns the type information for the output tuples
38
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
39
*/
40
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
41
}
42
```
43
44
**Usage Example:**
45
46
```java
47
import org.apache.flink.api.java.ExecutionEnvironment;
48
import org.apache.flink.api.java.DataSet;
49
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
50
import org.apache.flink.api.java.tuple.Tuple2;
51
import org.apache.hadoop.mapred.Mapper;
52
import org.apache.hadoop.mapred.JobConf;
53
import org.apache.hadoop.io.Text;
54
import org.apache.hadoop.io.LongWritable;
55
56
// Your existing Hadoop Mapper
57
public class WordCountMapper implements Mapper<LongWritable, Text, Text, LongWritable> {
58
private final static LongWritable one = new LongWritable(1);
59
private Text word = new Text();
60
61
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output,
62
Reporter reporter) throws IOException {
63
String[] words = value.toString().split("\\s+");
64
for (String w : words) {
65
word.set(w);
66
output.collect(word, one);
67
}
68
}
69
70
public void configure(JobConf job) {}
71
public void close() throws IOException {}
72
}
73
74
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
75
76
// Wrap Hadoop mapper in Flink function
77
HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =
78
new HadoopMapFunction<>(new WordCountMapper());
79
80
// Use in Flink program
81
DataSet<Tuple2<LongWritable, Text>> input = // ... your input data
82
DataSet<Tuple2<Text, LongWritable>> words = input.flatMap(mapFunction);
83
```
84
85
### HadoopReduceFunction
86
87
Wraps Hadoop Reducer (mapred API) as a non-combinable Flink GroupReduceFunction.
88
89
```java { .api }
90
/**
91
* Wraps Hadoop Reducer (mapred API) to non-combinable Flink GroupReduceFunction
92
* @param <KEYIN> Input key type for the Hadoop reducer
93
* @param <VALUEIN> Input value type for the Hadoop reducer
94
* @param <KEYOUT> Output key type from the Hadoop reducer
95
* @param <VALUEOUT> Output value type from the Hadoop reducer
96
*/
97
@Public
98
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
99
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
100
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
101
102
/**
103
* Creates a HadoopReduceFunction with a Hadoop reducer
104
* @param hadoopReducer The Hadoop Reducer instance to wrap
105
*/
106
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
107
108
/**
109
* Creates a HadoopReduceFunction with a Hadoop reducer and job configuration
110
* @param hadoopReducer The Hadoop Reducer instance to wrap
111
* @param conf Hadoop JobConf with configuration parameters
112
*/
113
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
114
115
/**
116
* Returns the type information for the output tuples
117
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
118
*/
119
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
120
}
121
```
122
123
**Usage Example:**
124
125
```java
126
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
127
import org.apache.hadoop.mapred.Reducer;
128
import org.apache.hadoop.mapred.OutputCollector;
129
import org.apache.hadoop.mapred.Reporter;
130
131
// Your existing Hadoop Reducer
132
public class WordCountReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
133
public void reduce(Text key, Iterator<LongWritable> values,
134
OutputCollector<Text, LongWritable> output, Reporter reporter)
135
throws IOException {
136
long count = 0;
137
while (values.hasNext()) {
138
count += values.next().get();
139
}
140
output.collect(key, new LongWritable(count));
141
}
142
143
public void configure(JobConf job) {}
144
public void close() throws IOException {}
145
}
146
147
// Wrap Hadoop reducer in Flink function
148
HadoopReduceFunction<Text, LongWritable, Text, LongWritable> reduceFunction =
149
new HadoopReduceFunction<>(new WordCountReducer());
150
151
// Use in Flink program (after grouping by key)
152
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
153
.groupBy(0) // Group by key (position 0 in tuple)
154
.reduceGroup(reduceFunction);
155
```
156
157
### HadoopReduceCombineFunction
158
159
Wraps both Hadoop Reducer and Combiner (mapred API) as a combinable Flink GroupReduceFunction for optimized processing.
160
161
```java { .api }
162
/**
163
* Wraps Hadoop Reducer and Combiner (mapred API) to combinable Flink GroupReduceFunction
164
* @param <KEYIN> Input key type for the Hadoop reducer
165
* @param <VALUEIN> Input value type for the Hadoop reducer
166
* @param <KEYOUT> Output key type from the Hadoop reducer
167
* @param <VALUEOUT> Output value type from the Hadoop reducer
168
*/
169
@Public
170
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
171
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
172
implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
173
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
174
175
/**
176
* Creates a HadoopReduceCombineFunction with separate reducer and combiner
177
* @param hadoopReducer The Hadoop Reducer instance to wrap
178
* @param hadoopCombiner The Hadoop Combiner instance to wrap
179
*/
180
public HadoopReduceCombineFunction(
181
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
182
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner);
183
184
/**
185
* Creates a HadoopReduceCombineFunction with reducer, combiner, and job configuration
186
* @param hadoopReducer The Hadoop Reducer instance to wrap
187
* @param hadoopCombiner The Hadoop Combiner instance to wrap
188
* @param conf Hadoop JobConf with configuration parameters
189
*/
190
public HadoopReduceCombineFunction(
191
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
192
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
193
JobConf conf);
194
195
/**
196
* Returns the type information for the output tuples
197
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
198
*/
199
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
200
}
201
```
202
203
**Usage Example:**
204
205
```java
206
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
207
208
// Use the same reducer as both combiner and reducer for efficiency
209
WordCountReducer reducer = new WordCountReducer();
210
WordCountReducer combiner = new WordCountReducer();
211
212
// Wrap with combine functionality
213
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceCombineFunction =
214
new HadoopReduceCombineFunction<>(reducer, combiner);
215
216
// Use in Flink program with automatic combining
217
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
218
.groupBy(0)
219
.reduceGroup(reduceCombineFunction);
220
```
221
222
### HadoopOutputCollector
223
224
Wraps Flink Collector as Hadoop OutputCollector for compatibility with Hadoop mapper/reducer implementations.
225
226
```java { .api }
227
/**
228
* Wraps Flink OutputCollector as Hadoop OutputCollector
229
* @param <KEY> Key type for collected output
230
* @param <VALUE> Value type for collected output
231
*/
232
public class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
233
234
/**
235
* Creates a new HadoopOutputCollector
236
*/
237
public HadoopOutputCollector();
238
239
/**
240
* Sets the wrapped Flink collector
241
* @param flinkCollector The Flink collector to wrap
242
*/
243
public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector);
244
245
/**
246
* Collects a key-value pair (implementation of Hadoop OutputCollector interface)
247
* @param key The key to collect
248
* @param val The value to collect
249
*/
250
public void collect(final KEY key, final VALUE val) throws IOException;
251
}
252
```
253
254
### HadoopTupleUnwrappingIterator
255
256
Wraps Flink Tuple2 iterator to provide an iterator over values only, compatible with Hadoop reducer input format.
257
258
```java { .api }
259
/**
260
* Wraps Flink Tuple2 iterator into iterator over values only
261
* @param <KEY> Key type of the tuples
262
* @param <VALUE> Value type of the tuples
263
*/
264
public class HadoopTupleUnwrappingIterator<KEY,VALUE>
265
extends TupleUnwrappingIterator<VALUE, KEY>
266
implements java.io.Serializable {
267
268
/**
269
* Creates a new HadoopTupleUnwrappingIterator
270
* @param keySerializer Serializer for the key type
271
*/
272
public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer);
273
274
/**
275
* Sets the Flink iterator to wrap
276
* @param iterator The Flink Tuple2 iterator
277
*/
278
public void set(final Iterator<Tuple2<KEY,VALUE>> iterator);
279
280
/**
281
* Checks if more elements are available
282
* @return true if more elements exist, false otherwise
283
*/
284
public boolean hasNext();
285
286
/**
287
* Returns the next value in the iteration
288
* @return The next value
289
*/
290
public VALUE next();
291
292
/**
293
* Returns the current key associated with the last returned value
294
* @return The current key
295
*/
296
public KEY getCurrentKey();
297
298
/**
299
* Remove operation is not supported
300
* @throws UnsupportedOperationException Always thrown
301
*/
302
public void remove();
303
}
304
```
305
306
## Complete Word Count Example
307
308
Here's a complete example showing how to use Hadoop MapReduce components in a Flink program:
309
310
```java
311
import org.apache.flink.api.java.ExecutionEnvironment;
312
import org.apache.flink.api.java.DataSet;
313
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
314
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
315
import org.apache.flink.api.java.tuple.Tuple2;
316
import org.apache.hadoop.mapred.*;
317
import org.apache.hadoop.io.Text;
318
import org.apache.hadoop.io.LongWritable;
319
320
public class FlinkHadoopWordCount {
321
public static void main(String[] args) throws Exception {
322
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
323
324
// Read input text
325
DataSet<String> text = env.fromElements(
326
"Hello World",
327
"Hello Flink",
328
"Hello Hadoop"
329
);
330
331
// Convert to Hadoop input format (line number, text)
332
DataSet<Tuple2<LongWritable, Text>> hadoopInput = text
333
.map((String line) -> new Tuple2<>(
334
new LongWritable(0),
335
new Text(line)
336
));
337
338
// Use Hadoop Mapper
339
HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =
340
new HadoopMapFunction<>(new WordCountMapper());
341
342
DataSet<Tuple2<Text, LongWritable>> words = hadoopInput.flatMap(mapFunction);
343
344
// Use Hadoop Reducer with Combiner
345
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceFunction =
346
new HadoopReduceCombineFunction<>(
347
new WordCountReducer(), // reducer
348
new WordCountReducer() // combiner (same logic)
349
);
350
351
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
352
.groupBy(0)
353
.reduceGroup(reduceFunction);
354
355
wordCounts.print();
356
env.execute("Hadoop-Flink Word Count");
357
}
358
}
359
```
360
361
## Common Types
362
363
```java { .api }
364
import org.apache.flink.api.common.functions.RichFlatMapFunction;
365
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
366
import org.apache.flink.api.common.functions.GroupCombineFunction;
367
import org.apache.flink.api.common.typeinfo.TypeInformation;
368
import org.apache.flink.util.Collector;
369
import org.apache.flink.api.java.tuple.Tuple2;
370
import org.apache.hadoop.mapred.Mapper;
371
import org.apache.hadoop.mapred.Reducer;
372
import org.apache.hadoop.mapred.OutputCollector;
373
import org.apache.hadoop.mapred.Reporter;
374
import org.apache.hadoop.mapred.JobConf;
375
import java.io.Serializable;
376
import java.util.Iterator;
377
```