Apache Flink Hadoop Compatibility library that enables interoperability between Apache Flink and Apache Hadoop MapReduce
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-12@1.20.0Apache Flink Hadoop Compatibility provides comprehensive compatibility layers that enable Flink applications to seamlessly integrate with Apache Hadoop MapReduce ecosystem components. It offers adapter classes and utilities for using Hadoop InputFormats and OutputFormats within Flink jobs, supporting both the legacy MapRed API and the newer MapReduce API.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.12</artifactId><version>1.20.2</version></dependency>// Factory classes and utilities
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.hadoopcompatibility.HadoopUtils;
// MapReduce API (modern) - InputFormat and OutputFormat
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
// MapRed API (legacy) - InputFormat and OutputFormat
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
// Function wrappers for MapRed API
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
// Type system integration
import org.apache.flink.api.java.typeutils.WritableTypeInfo;import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
// Create Flink execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read Hadoop text files using legacy MapRed API
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://path/to/input"
)
);
// Process data
DataSet<String> words = input
.flatMap((Tuple2<LongWritable, Text> line, Collector<String> out) -> {
for (String word : line.f1.toString().split("\\s+")) {
if (!word.isEmpty()) {
out.collect(word.toLowerCase());
}
}
})
.returns(String.class);Apache Flink Hadoop Compatibility is built around several key components:
Comprehensive wrapper classes that enable Hadoop InputFormats and OutputFormats to work seamlessly with Flink DataSets. Supports both legacy MapRed and modern MapReduce APIs.
// Factory methods for creating input format wrappers
class HadoopInputs {
static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, String inputPath
);
static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, String inputPath
) throws IOException;
}Type information and serialization support for Hadoop Writable types, enabling seamless integration with Flink's type system and runtime.
class WritableTypeInfo<T extends Writable> extends TypeInformation<T> {
WritableTypeInfo(Class<T> typeClass);
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
}Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic.
class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
}
class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
}Helper methods and utility functions that simplify common Hadoop integration tasks, including parameter parsing and configuration management.
class HadoopUtils {
static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}The library handles various integration scenarios and errors:
Scala API Deprecation: All Scala APIs (org.apache.flink.api.scala.hadoop.*) are deprecated as of Flink 1.20.2 per FLIP-265 and will be removed in a future major version. Users should migrate to the Java APIs which provide equivalent functionality with better long-term support.