Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic within Flink applications. These wrappers bridge the gap between Hadoop's MapReduce programming model and Flink's functional API.
Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction, allowing existing Hadoop Mapper implementations to be used directly in Flink transformations.
/**
* Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction
* @param <KEYIN> Input key type
* @param <VALUEIN> Input value type
* @param <KEYOUT> Output key type
* @param <VALUEOUT> Output value type
*/
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
/**
* Creates a HadoopMapFunction wrapper with default JobConf
* @param hadoopMapper The Hadoop Mapper to wrap
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
/**
* Creates a HadoopMapFunction wrapper with custom JobConf
* @param hadoopMapper The Hadoop Mapper to wrap
* @param conf JobConf for Hadoop configuration
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
/**
* Opens and configures the mapper for processing
* @param openContext Flink's open context for initialization
* @throws Exception if mapper configuration fails
*/
public void open(OpenContext openContext) throws Exception;
/**
* Processes input records using the wrapped Hadoop Mapper
* @param value Input tuple containing key-value pair
* @param out Collector for output tuples
* @throws Exception if mapping operation fails
*/
public void flatMap(
Tuple2<KEYIN, VALUEIN> value,
Collector<Tuple2<KEYOUT, VALUEOUT>> out
) throws Exception;
/**
* Returns output type information for type safety
* @return TypeInformation for output tuples
*/
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Usage Examples:
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
// Custom Hadoop Mapper
public class WordTokenizer implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) {
String[] words = value.toString().toLowerCase().split("\\s+");
for (String w : words) {
if (!w.isEmpty()) {
word.set(w);
output.collect(word, one);
}
}
}
@Override
public void configure(JobConf job) {}
@Override
public void close() {}
}
// Use in Flink application
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> lines = // ... input data
// Apply Hadoop Mapper as Flink function
DataSet<Tuple2<Text, IntWritable>> words = lines.flatMap(
new HadoopMapFunction<>(new WordTokenizer())
);
// Continue with Flink operations
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
.groupBy(0)
.sum(1);Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction, enabling existing Hadoop Reducer logic to work with Flink's grouped operations.
/**
* Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction
* @param <KEYIN> Input key type
* @param <VALUEIN> Input value type
* @param <KEYOUT> Output key type
* @param <VALUEOUT> Output value type
*/
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
/**
* Creates a HadoopReduceFunction wrapper with default JobConf
* @param hadoopReducer The Hadoop Reducer to wrap
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
/**
* Creates a HadoopReduceFunction wrapper with custom JobConf
* @param hadoopReducer The Hadoop Reducer to wrap
* @param conf JobConf for Hadoop configuration
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
/**
* Opens and configures the reducer for processing
* @param openContext Flink's open context for initialization
* @throws Exception if reducer configuration fails
*/
public void open(OpenContext openContext) throws Exception;
/**
* Reduces input records using the wrapped Hadoop Reducer
* @param values Iterable of input tuples with the same key
* @param out Collector for output tuples
* @throws Exception if reduce operation fails
*/
public void reduce(
Iterable<Tuple2<KEYIN, VALUEIN>> values,
Collector<Tuple2<KEYOUT, VALUEOUT>> out
) throws Exception;
/**
* Returns output type information for type safety
* @return TypeInformation for output tuples
*/
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Usage Examples:
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.util.Iterator;
// Custom Hadoop Reducer
public class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
@Override
public void configure(JobConf job) {}
@Override
public void close() {}
}
// Use in Flink application
DataSet<Tuple2<Text, IntWritable>> words = // ... word data grouped by key
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
.groupBy(0)
.reduceGroup(new HadoopReduceFunction<>(new WordCountReducer()));Wrapper that maps both a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction, providing optimal performance through pre-aggregation.
/**
* Wrapper that maps Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction
* @param <KEYIN> Input key type
* @param <VALUEIN> Input value type
* @param <KEYOUT> Output key type
* @param <VALUEOUT> Output value type
*/
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
/**
* Creates a HadoopReduceCombineFunction wrapper with default JobConf
* @param hadoopReducer The Hadoop Reducer to wrap
* @param hadoopCombiner The Hadoop Combiner to wrap
*/
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner
);
/**
* Creates a HadoopReduceCombineFunction wrapper with custom JobConf
* @param hadoopReducer The Hadoop Reducer to wrap
* @param hadoopCombiner The Hadoop Combiner to wrap
* @param conf JobConf for Hadoop configuration
*/
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
JobConf conf
);
/**
* Opens and configures reducer and combiner for processing
* @param openContext Flink's open context for initialization
* @throws Exception if configuration fails
*/
public void open(OpenContext openContext) throws Exception;
/**
* Reduces input records using the wrapped Hadoop Reducer
* @param values Iterable of input tuples with the same key
* @param out Collector for output tuples
* @throws Exception if reduce operation fails
*/
public void reduce(
Iterable<Tuple2<KEYIN, VALUEIN>> values,
Collector<Tuple2<KEYOUT, VALUEOUT>> out
) throws Exception;
/**
* Combines input records using the wrapped Hadoop Combiner for pre-aggregation
* @param values Iterable of input tuples to combine
* @param out Collector for combined tuples
* @throws Exception if combine operation fails
*/
public void combine(
Iterable<Tuple2<KEYIN, VALUEIN>> values,
Collector<Tuple2<KEYIN, VALUEIN>> out
) throws Exception;
/**
* Returns output type information for type safety
* @return TypeInformation for output tuples
*/
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Usage Examples:
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
// Hadoop Combiner (same as reducer in this case)
public class WordCountCombiner implements Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
@Override public void configure(JobConf job) {}
@Override public void close() {}
}
// Use combinable reduce function
DataSet<Tuple2<Text, IntWritable>> words = // ... word data
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
.groupBy(0)
.reduceGroup(new HadoopReduceCombineFunction<>(
new WordCountReducer(), // Final reducer
new WordCountCombiner() // Pre-aggregation combiner
));All function wrappers support Hadoop configuration through JobConf:
JobConf conf = new JobConf();
conf.set("mapred.textoutputformat.separator", "\t");
conf.setInt("mapred.max.split.size", 1024 * 1024);
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunc =
new HadoopMapFunction<>(new WordTokenizer(), conf);Function wrappers properly propagate Hadoop exceptions:
Best practices for optimal performance:
Existing MapReduce jobs can be directly translated to Flink:
// Original MapReduce job structure
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordTokenizer.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
// Equivalent Flink application
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input = // ... input
DataSet<Tuple2<Text, IntWritable>> result = input
.flatMap(new HadoopMapFunction<>(new WordTokenizer()))
.groupBy(0)
.reduceGroup(new HadoopReduceCombineFunction<>(
new WordCountReducer(),
new WordCountReducer() // Same as combiner
));