Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing
—
Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic within Flink applications. Supports the legacy mapred API with automatic type conversion between Hadoop and Flink data types.
Wrapper that converts a Hadoop Mapper into a Flink FlatMapFunction.
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
/**
* Constructor with Hadoop Mapper
* @param hadoopMapper The Hadoop Mapper to wrap
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
/**
* Constructor with Hadoop Mapper and configuration
* @param hadoopMapper The Hadoop Mapper to wrap
* @param conf JobConf configuration for the mapper
*/
public HadoopMapFunction(
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper,
JobConf conf
);
/**
* Open method called before processing starts
* @param openContext Runtime context for the function
* @throws Exception if initialization fails
*/
public void open(OpenContext openContext) throws Exception;
/**
* Process a single input record through the Hadoop Mapper
* @param value Input record as Tuple2<KEYIN, VALUEIN>
* @param out Collector for output records
* @throws Exception if processing fails
*/
public void flatMap(
final Tuple2<KEYIN, VALUEIN> value,
final Collector<Tuple2<KEYOUT, VALUEOUT>> out
) throws Exception;
/**
* Get type information for the produced output type
* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>
*/
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}Usage Example:
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
// Example Hadoop Mapper that extracts word lengths
public static class WordLengthMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String[] words = value.toString().split("\\s+");
for (String word : words) {
output.collect(new Text(word), new IntWritable(word.length()));
}
}
public void configure(JobConf job) {}
public void close() throws IOException {}
}
// Wrap the Hadoop Mapper for use in Flink
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunction =
new HadoopMapFunction<>(new WordLengthMapper());
// Use in Flink DataSet API
DataSet<Tuple2<LongWritable, Text>> input = // ... your input dataset
DataSet<Tuple2<Text, IntWritable>> mapped = input.flatMap(mapFunction);Wrapper that converts a Hadoop Reducer into a Flink window function for both keyed and non-keyed streams.
public final class HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, KEYIN, GlobalWindow>
implements AllWindowFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>, GlobalWindow>,
ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
/**
* Constructor with Hadoop Reducer
* @param hadoopReducer The Hadoop Reducer to wrap
*/
public HadoopReducerWrappedFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
/**
* Constructor with Hadoop Reducer and configuration
* @param hadoopReducer The Hadoop Reducer to wrap
* @param conf JobConf configuration for the reducer
*/
public HadoopReducerWrappedFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
JobConf conf
);
/**
* Open method called before processing starts
* @param openContext Runtime context for the function
* @throws Exception if initialization fails
*/
public void open(OpenContext openContext) throws Exception;
/**
* Get type information for the produced output type
* @return TypeInformation for Tuple2<KEYOUT, VALUEOUT>
*/
public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
/**
* Apply function for keyed windows
* @param key The key for this window
* @param globalWindow The window (always GlobalWindow)
* @param iterable Input records for this key
* @param collector Collector for output records
* @throws Exception if processing fails
*/
public void apply(
KEYIN key,
GlobalWindow globalWindow,
Iterable<Tuple2<KEYIN, VALUEIN>> iterable,
Collector<Tuple2<KEYOUT, VALUEOUT>> collector
) throws Exception;
/**
* Apply function for non-keyed windows (all data in single partition)
* @param globalWindow The window (always GlobalWindow)
* @param iterable All input records
* @param collector Collector for output records
* @throws Exception if processing fails
*/
public void apply(
GlobalWindow globalWindow,
Iterable<Tuple2<KEYIN, VALUEIN>> iterable,
Collector<Tuple2<KEYOUT, VALUEOUT>> collector
) throws Exception;
}Usage Example:
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
// Example Hadoop Reducer that sums values by key
public static class SumReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
public void configure(JobConf job) {}
public void close() throws IOException {}
}
// Wrap the Hadoop Reducer for use in Flink
HadoopReducerWrappedFunction<Text, IntWritable, Text, IntWritable> reduceFunction =
new HadoopReducerWrappedFunction<>(new SumReducer());
// Use in Flink streaming API with keyed windows
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Text, IntWritable>> stream = // ... your input stream
stream
.keyBy(tuple -> tuple.f0) // Key by the Text field
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100)) // Trigger every 100 elements
.apply(reduceFunction)
.print();
// Use in batch API (DataSet)
DataSet<Tuple2<Text, IntWritable>> dataset = // ... your input dataset
DataSet<Tuple2<Text, IntWritable>> reduced = dataset
.groupBy(0) // Group by key (f0)
.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {
public void reduce(Iterable<Tuple2<Text, IntWritable>> values,
Collector<Tuple2<Text, IntWritable>> out) throws Exception {
// Convert to format expected by HadoopReducerWrappedFunction
reduceFunction.apply(GlobalWindow.get(), values, out);
}
});Passing JobConf configuration to wrapped MapReduce functions.
import org.apache.hadoop.mapred.JobConf;
// Configure Hadoop job parameters
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.map.memory.mb", "2048");
jobConf.set("custom.parameter", "value");
// Pass configuration to mapper
HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapFunction =
new HadoopMapFunction<>(new MyHadoopMapper(), jobConf);
// Pass configuration to reducer
HadoopReducerWrappedFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceFunction =
new HadoopReducerWrappedFunction<>(new MyHadoopReducer(), jobConf);Combining multiple Hadoop operations in a Flink pipeline.
// Chain mapper and reducer operations
DataSet<Tuple2<LongWritable, Text>> input = // ... input dataset
DataSet<Tuple2<Text, IntWritable>> result = input
.flatMap(mapFunction1) // First Hadoop mapper
.flatMap(mapFunction2) // Second Hadoop mapper
.groupBy(0) // Group by key
.reduceGroup(new GroupReduceFunction<Tuple2<Text, IntWritable>, Tuple2<Text, IntWritable>>() {
public void reduce(Iterable<Tuple2<Text, IntWritable>> values,
Collector<Tuple2<Text, IntWritable>> out) throws Exception {
reduceFunction.apply(GlobalWindow.get(), values, out);
}
});Handling type conversion between Flink and Hadoop types.
// Example with custom Writable types
public static class CustomWritable implements Writable {
private String data;
public void write(DataOutput out) throws IOException {
out.writeUTF(data);
}
public void readFields(DataInput in) throws IOException {
data = in.readUTF();
}
// getters/setters...
}
// Mapper using custom types
public static class CustomMapper implements Mapper<LongWritable, Text, Text, CustomWritable> {
public void map(LongWritable key, Text value,
OutputCollector<Text, CustomWritable> output, Reporter reporter) throws IOException {
CustomWritable custom = new CustomWritable();
custom.setData(value.toString().toUpperCase());
output.collect(new Text("processed"), custom);
}
public void configure(JobConf job) {}
public void close() throws IOException {}
}
// Use with proper type information
HadoopMapFunction<LongWritable, Text, Text, CustomWritable> customMapFunction =
new HadoopMapFunction<>(new CustomMapper());
// Flink will automatically handle Writable serialization
DataSet<Tuple2<Text, CustomWritable>> output = input.flatMap(customMapFunction);Handling errors and monitoring MapReduce function execution.
public static class MonitoredMapper implements Mapper<LongWritable, Text, Text, IntWritable> {
private Counter recordCounter;
private Counter errorCounter;
public void configure(JobConf job) {
// Access counters from configuration if needed
}
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
try {
// Processing logic
String[] words = value.toString().split("\\s+");
for (String word : words) {
output.collect(new Text(word), new IntWritable(1));
reporter.progress(); // Report progress
}
reporter.incrCounter("Records", "Processed", 1);
} catch (Exception e) {
reporter.incrCounter("Records", "Errors", 1);
throw new IOException("Processing failed", e);
}
}
public void close() throws IOException {}
}Tuple2<KEYIN, VALUEIN> where f0=key, f1=valueTuple2<KEYOUT, VALUEOUT> where f0=key, f1=valueIterable<Tuple2<KEYIN, VALUEIN>> grouped by keyTuple2<KEYOUT, VALUEOUT> where f0=key, f1=valueJobConf configuration is passed through to the wrapped Hadoop functions, maintaining compatibility with existing MapReduce code.
Automatic type information extraction ensures proper serialization and type safety within Flink's type system.
Hadoop Reporter interface is supported for progress reporting and counter updates, though some functionality may be limited in the Flink execution environment.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility