0
# MapReduce Function Integration
1
2
Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic within Flink applications. Supports the legacy mapred API with automatic type conversion between Hadoop and Flink data types.
3
4
## Capabilities
5
6
### Hadoop Mapper Integration
7
8
Wrapper that converts a Hadoop Mapper into a Flink FlatMapFunction.
9
10
```java { .api }
11
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
12
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
13
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
14
15
/**
16
* Constructor with Hadoop Mapper
17
* @param hadoopMapper The Hadoop Mapper to wrap
18
*/
19
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
20
21
/**
22
* Constructor with Hadoop Mapper and configuration
23
* @param hadoopMapper The Hadoop Mapper to wrap
24
* @param conf JobConf configuration for the mapper
25
*/
26
public HadoopMapFunction(
27
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
28
JobConf conf
29
);
30
31
/**
32
* Open method called before processing starts
33
* @param openContext Runtime context for the function
34
* @throws Exception if initialization fails
35
*/
36
public void open(OpenContext openContext) throws Exception;
37
38
/**
39
* Process a single input record through the Hadoop Mapper
40
* @param value Input record as Tuple2<KEYIN, VALUEIN>
41
* @param out Collector for output records
42
* @throws Exception if processing fails
43
*/
44
public void flatMap(
45
final Tuple2<KEYIN, VALUEIN> value,
46
final Collector<Tuple2<KEYOUT, VALUEOUT>> out
47
) throws Exception;
48
49
/**
50
* Get type information for the produced output type
51
* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>
52
*/
53
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
54
}
55
```
56
57
**Usage Example:**
58
59
```java
60
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
61
import org.apache.flink.api.java.DataSet;
62
import org.apache.flink.api.java.tuple.Tuple2;
63
import org.apache.hadoop.mapred.Mapper;
64
import org.apache.hadoop.mapred.JobConf;
65
import org.apache.hadoop.io.LongWritable;
66
import org.apache.hadoop.io.Text;
67
import org.apache.hadoop.io.IntWritable;
68
69
// Example Hadoop Mapper that extracts word lengths
70
public static class WordLengthMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
71
public void map(LongWritable key, Text value,
72
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
73
String[] words = value.toString().split("\\s+");
74
for (String word : words) {
75
output.collect(new Text(word), new IntWritable(word.length()));
76
}
77
}
78
79
public void configure(JobConf job) {}
80
public void close() throws IOException {}
81
}
82
83
// Wrap the Hadoop Mapper for use in Flink
84
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunction =
85
new HadoopMapFunction<>(new WordLengthMapper());
86
87
// Use in Flink DataSet API
88
DataSet<Tuple2<LongWritable, Text>> input = // ... your input dataset
89
DataSet<Tuple2<Text, IntWritable>> mapped = input.flatMap(mapFunction);
90
```
91
92
### Hadoop Reducer Integration
93
94
Wrapper that converts a Hadoop Reducer into a Flink window function for both keyed and non-keyed streams.
95
96
```java { .api }
97
public final class HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
98
extends RichWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow>
99
implements AllWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>,
100
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
101
102
/**
103
* Constructor with Hadoop Reducer
104
* @param hadoopReducer The Hadoop Reducer to wrap
105
*/
106
public HadoopReducerWrappedFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
107
108
/**
109
* Constructor with Hadoop Reducer and configuration
110
* @param hadoopReducer The Hadoop Reducer to wrap
111
* @param conf JobConf configuration for the reducer
112
*/
113
public HadoopReducerWrappedFunction(
114
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
115
JobConf conf
116
);
117
118
/**
119
* Open method called before processing starts
120
* @param openContext Runtime context for the function
121
* @throws Exception if initialization fails
122
*/
123
public void open(OpenContext openContext) throws Exception;
124
125
/**
126
* Get type information for the produced output type
127
* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>
128
*/
129
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
130
131
/**
132
* Apply function for keyed windows
133
* @param key The key for this window
134
* @param globalWindow The window (always GlobalWindow)
135
* @param iterable Input records for this key
136
* @param collector Collector for output records
137
* @throws Exception if processing fails
138
*/
139
public void apply(
140
KEYIN key,
141
GlobalWindow globalWindow,
142
Iterable<Tuple2<KEYIN, VALUEIN>> iterable,
143
Collector<Tuple2<KEYOUT, VALUEOUT>> collector
144
) throws Exception;
145
146
/**
147
* Apply function for non-keyed windows (all data in single partition)
148
* @param globalWindow The window (always GlobalWindow)
149
* @param iterable All input records
150
* @param collector Collector for output records
151
* @throws Exception if processing fails
152
*/
153
public void apply(
154
GlobalWindow globalWindow,
155
Iterable<Tuple2<KEYIN, VALUEIN>> iterable,
156
Collector<Tuple2<KEYOUT, VALUEOUT>> collector
157
) throws Exception;
158
}
159
```
160
161
**Usage Example:**
162
163
```java
164
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
165
import org.apache.flink.api.java.DataSet;
166
import org.apache.flink.api.java.tuple.Tuple2;
167
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
168
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
169
import org.apache.hadoop.mapred.Reducer;
170
import org.apache.hadoop.io.Text;
171
import org.apache.hadoop.io.IntWritable;
172
173
// Example Hadoop Reducer that sums values by key
174
public static class SumReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
175
public void reduce(Text key, Iterator<IntWritable> values,
176
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
177
int sum = 0;
178
while (values.hasNext()) {
179
sum += values.next().get();
180
}
181
output.collect(key, new IntWritable(sum));
182
}
183
184
public void configure(JobConf job) {}
185
public void close() throws IOException {}
186
}
187
188
// Wrap the Hadoop Reducer for use in Flink
189
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> reduceFunction =
190
new HadoopReducerWrappedFunction<>(new SumReducer());
191
192
// Use in Flink streaming API with keyed windows
193
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
194
DataStream<Tuple2<Text, IntWritable>> stream = // ... your input stream
195
196
stream
197
.keyBy(tuple -> tuple.f0) // Key by the Text field
198
.window(GlobalWindows.create())
199
.trigger(CountTrigger.of(100)) // Trigger every 100 elements
200
.apply(reduceFunction)
201
.print();
202
203
// Use in batch API (DataSet)
204
DataSet<Tuple2<Text, IntWritable>> dataset = // ... your input dataset
205
DataSet<Tuple2<Text, IntWritable>> reduced = dataset
206
.groupBy(0) // Group by key (f0)
207
.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {
208
public void reduce(Iterable<Tuple2<Text, IntWritable>> values,
209
Collector<Tuple2<Text, IntWritable>> out) throws Exception {
210
// Convert to format expected by HadoopReducerWrappedFunction
211
reduceFunction.apply(GlobalWindow.get(), values, out);
212
}
213
});
214
```
215
216
## Advanced Usage Patterns
217
218
### Configuration Integration
219
220
Passing JobConf configuration to wrapped MapReduce functions.
221
222
```java
223
import org.apache.hadoop.mapred.JobConf;
224
225
// Configure Hadoop job parameters
226
JobConf jobConf = new JobConf();
227
jobConf.set("mapreduce.map.memory.mb", "2048");
228
jobConf.set("custom.parameter", "value");
229
230
// Pass configuration to mapper
231
HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapFunction =
232
new HadoopMapFunction<>(new MyHadoopMapper(), jobConf);
233
234
// Pass configuration to reducer
235
HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceFunction =
236
new HadoopReducerWrappedFunction<>(new MyHadoopReducer(), jobConf);
237
```
238
239
### Chain Map-Reduce Operations
240
241
Combining multiple Hadoop operations in a Flink pipeline.
242
243
```java
244
// Chain mapper and reducer operations
245
DataSet<Tuple2<LongWritable, Text>> input = // ... input dataset
246
247
DataSet<Tuple2<Text, IntWritable>> result = input
248
.flatMap(mapFunction1) // First Hadoop mapper
249
.flatMap(mapFunction2) // Second Hadoop mapper
250
.groupBy(0) // Group by key
251
.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {
252
public void reduce(Iterable<Tuple2<Text, IntWritable>> values,
253
Collector<Tuple2<Text, IntWritable>> out) throws Exception {
254
reduceFunction.apply(GlobalWindow.get(), values, out);
255
}
256
});
257
```
258
259
### Type Safety and Conversion
260
261
Handling type conversion between Flink and Hadoop types.
262
263
```java
264
// Example with custom Writable types
265
public static class CustomWritable implements Writable {
266
private String data;
267
268
public void write(DataOutput out) throws IOException {
269
out.writeUTF(data);
270
}
271
272
public void readFields(DataInput in) throws IOException {
273
data = in.readUTF();
274
}
275
276
// getters/setters...
277
}
278
279
// Mapper using custom types
280
public static class CustomMapper implements Mapper<LongWritable, Text, Text, CustomWritable> {
281
public void map(LongWritable key, Text value,
282
OutputCollector<Text, CustomWritable> output, Reporter reporter) throws IOException {
283
CustomWritable custom = new CustomWritable();
284
custom.setData(value.toString().toUpperCase());
285
output.collect(new Text("processed"), custom);
286
}
287
288
public void configure(JobConf job) {}
289
public void close() throws IOException {}
290
}
291
292
// Use with proper type information
293
HadoopMapFunction<LongWritable, Text, Text, CustomWritable> customMapFunction =
294
new HadoopMapFunction<>(new CustomMapper());
295
296
// Flink will automatically handle Writable serialization
297
DataSet<Tuple2<Text, CustomWritable>> output = input.flatMap(customMapFunction);
298
```
299
300
### Error Handling and Monitoring
301
302
Handling errors and monitoring MapReduce function execution.
303
304
```java
305
public static class MonitoredMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
306
private Counter recordCounter;
307
private Counter errorCounter;
308
309
public void configure(JobConf job) {
310
// Access counters from configuration if needed
311
}
312
313
public void map(LongWritable key, Text value,
314
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
315
try {
316
// Processing logic
317
String[] words = value.toString().split("\\s+");
318
for (String word : words) {
319
output.collect(new Text(word), new IntWritable(1));
320
reporter.progress(); // Report progress
321
}
322
reporter.incrCounter("Records", "Processed", 1);
323
} catch (Exception e) {
324
reporter.incrCounter("Records", "Errors", 1);
325
throw new IOException("Processing failed", e);
326
}
327
}
328
329
public void close() throws IOException {}
330
}
331
```
332
333
## Key Design Patterns
334
335
### Tuple2 Convention
336
- Input to mappers: `Tuple2<KEYIN, VALUEIN>` where f0=key, f1=value
337
- Output from mappers: `Tuple2<KEYOUT, VALUEOUT>` where f0=key, f1=value
338
- Input to reducers: `Iterable<Tuple2<KEYIN, VALUEIN>>` grouped by key
339
- Output from reducers: `Tuple2<KEYOUT, VALUEOUT>` where f0=key, f1=value
340
341
### Configuration Inheritance
342
JobConf configuration is passed through to the wrapped Hadoop functions, maintaining compatibility with existing MapReduce code.
343
344
### Type Information
345
Automatic type information extraction ensures proper serialization and type safety within Flink's type system.
346
347
### Progress Reporting
348
Hadoop Reporter interface is supported for progress reporting and counter updates, though some functionality may be limited in the Flink execution environment.