Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The MapReduce Functions capability enables direct integration of Hadoop Mapper and Reducer functions into Flink workflows, allowing reuse of existing MapReduce logic within Flink's DataSet API while maintaining compatibility with Hadoop's programming model.
Flink's Hadoop compatibility layer provides wrapper classes that adapt Hadoop MapReduce functions to work as Flink operators. This enables gradual migration from MapReduce to Flink by allowing existing Mapper and Reducer implementations to run within Flink pipelines without modification.
Wrapper that adapts a Hadoop Mapper to a Flink FlatMapFunction.
@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
// Constructor with Mapper only (uses default JobConf)
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
// Constructor with Mapper and custom JobConf
public HadoopMapFunction(
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
JobConf conf);
// Flink lifecycle method
public void open(Configuration parameters) throws Exception;
// Main processing method
public void flatMap(
final Tuple2<KEYIN, VALUEIN> value,
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
// Type information method
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Wrapper that adapts a Hadoop Reducer to a non-combinable Flink GroupReduceFunction.
@Public
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
// Constructor with Reducer only (uses default JobConf)
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
// Constructor with Reducer and custom JobConf
public HadoopReduceFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
JobConf conf);
// Flink lifecycle method
public void open(Configuration parameters) throws Exception;
// Main processing method
public void reduce(
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
// Type information method
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Wrapper that adapts both Hadoop Reducer and Combiner to a combinable Flink GroupReduceFunction.
@Public
public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
// Constructor with Reducer and Combiner (uses default JobConf)
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner);
// Constructor with Reducer, Combiner, and custom JobConf
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
JobConf conf);
// Flink lifecycle method
public void open(Configuration parameters) throws Exception;
// Main reduce processing method
public void reduce(
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
// Combine processing method for optimization
public void combine(
final Iterable<Tuple2<KEYIN, VALUEIN>> values,
final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception;
// Type information method
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
// Hadoop Mapper implementation
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().toLowerCase());
output.collect(word, one);
}
}
}
// Hadoop Reducer implementation
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
}
// Use in Flink pipeline
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read input data
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.readHadoopFile(/* input format configuration */)
);
// Apply Hadoop mapper
DataSet<Tuple2<Text, IntWritable>> mappedData = input
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));
// Group by key and apply Hadoop reducer
DataSet<Tuple2<Text, IntWritable>> result = mappedData
.groupBy(0)
.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));// Combiner that performs partial aggregation
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
result.set(sum);
output.collect(key, result);
}
}
// Use HadoopReduceCombineFunction for better performance
DataSet<Tuple2<Text, IntWritable>> optimizedResult = mappedData
.groupBy(0)
.reduceGroup(new HadoopReduceCombineFunction<>(
new IntSumReducer(), // Final reducer
new IntSumCombiner() // Combiner for pre-aggregation
));import org.apache.hadoop.mapred.JobConf;
// Configure Hadoop MapReduce job settings
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.job.name", "Flink-Hadoop Integration");
jobConf.set("custom.parameter", "custom-value");
jobConf.setInt("custom.int.parameter", 42);
// Use custom configuration with MapReduce functions
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapperWithConfig =
new HadoopMapFunction<>(new TokenizerMapper(), jobConf);
HadoopReduceFunction<Text, IntWritable, Text, IntWritable> reducerWithConfig =
new HadoopReduceFunction<>(new IntSumReducer(), jobConf);
// Apply in pipeline
DataSet<Tuple2<Text, IntWritable>> result = input
.flatMap(mapperWithConfig)
.groupBy(0)
.reduceGroup(reducerWithConfig);// Example with custom Writable types
public static class DataRecord implements Writable {
private String category;
private double value;
private long timestamp;
// Writable implementation...
}
public static class CategoryKey implements Writable {
private String category;
private int hour;
// Writable implementation...
}
// Mapper that processes complex records
public static class DataProcessor extends Mapper<LongWritable, DataRecord, CategoryKey, DataRecord> {
private CategoryKey outputKey = new CategoryKey();
@Override
public void map(LongWritable key, DataRecord value,
OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
// Extract hour from timestamp
int hour = (int) (value.getTimestamp() / 3600000) % 24;
outputKey.setCategory(value.getCategory());
outputKey.setHour(hour);
output.collect(outputKey, value);
}
}
// Reducer that aggregates by category and hour
public static class CategoryAggregator extends Reducer<CategoryKey, DataRecord, CategoryKey, DataRecord> {
private DataRecord result = new DataRecord();
@Override
public void reduce(CategoryKey key, Iterator<DataRecord> values,
OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
double sum = 0.0;
int count = 0;
while (values.hasNext()) {
sum += values.next().getValue();
count++;
}
result.setCategory(key.getCategory());
result.setValue(sum / count); // Average
result.setTimestamp(System.currentTimeMillis());
output.collect(key, result);
}
}
// Use in Flink pipeline
DataSet<Tuple2<CategoryKey, DataRecord>> aggregatedData = rawData
.flatMap(new HadoopMapFunction<>(new DataProcessor()))
.groupBy(0)
.reduceGroup(new HadoopReduceFunction<>(new CategoryAggregator()));// Enable object reuse for better performance with Hadoop functions
env.getConfig().enableObjectReuse();
// This is particularly beneficial when using Hadoop functions as they
// typically create many temporary objects// Always use combiners when possible for commutative and associative operations
// This reduces network traffic and improves performance
// Good candidates for combiners:
// - Sum operations
// - Count operations
// - Min/Max operations
// - Set union operations
// Bad candidates for combiners:
// - Operations that need to see all values
// - Non-associative operations
// - Operations with side effectsJobConf conf = new JobConf();
// Configure memory settings
conf.setInt("mapreduce.map.memory.mb", 1024);
conf.setInt("mapreduce.reduce.memory.mb", 2048);
// Configure JVM options
conf.set("mapreduce.map.java.opts", "-Xmx800m");
conf.set("mapreduce.reduce.java.opts", "-Xmx1600m");
// Configure buffer sizes
conf.setInt("io.sort.mb", 256);
conf.setFloat("io.sort.spill.percent", 0.8f);try {
DataSet<Tuple2<Text, IntWritable>> result = input
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()))
.groupBy(0)
.reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));
result.print();
env.execute();
} catch (Exception e) {
// Handle various exceptions
if (e.getCause() instanceof IOException) {
logger.error("I/O error in Hadoop function: " + e.getMessage());
} else if (e.getCause() instanceof InterruptedException) {
logger.error("Hadoop function was interrupted: " + e.getMessage());
} else {
logger.error("Unexpected error: " + e.getMessage());
}
}// Create test data
List<Tuple2<LongWritable, Text>> testInput = Arrays.asList(
new Tuple2<>(new LongWritable(1), new Text("hello world")),
new Tuple2<>(new LongWritable(2), new Text("hello flink"))
);
DataSet<Tuple2<LongWritable, Text>> input = env.fromCollection(testInput);
// Test mapper output
DataSet<Tuple2<Text, IntWritable>> mapperOutput = input
.flatMap(new HadoopMapFunction<>(new TokenizerMapper()));
// Collect and verify results in tests
List<Tuple2<Text, IntWritable>> results = mapperOutput.collect();
assertEquals(3, results.size()); // "hello", "world", "hello", "flink"// Replace Hadoop Identity operations with Flink map
// Before: Hadoop IdentityMapper
DataSet<Tuple2<K, V>> output = input.map(tuple -> tuple);
// Replace simple aggregations with Flink reduce
// Before: Hadoop sum reducer
DataSet<Tuple2<Text, IntWritable>> sums = input
.groupBy(0)
.reduce((a, b) -> new Tuple2<>(a.f0, new IntWritable(a.f1.get() + b.f1.get())));
// Replace filtering with Flink filter
// Before: Hadoop filtering mapper
DataSet<Tuple2<K, V>> filtered = input.filter(tuple -> someCondition(tuple.f1));Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11