Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility@2.1.0Flink Hadoop Compatibility is an Apache Flink library that provides seamless integration between Apache Flink and Apache Hadoop ecosystems. It enables Flink applications to use existing Hadoop InputFormats, OutputFormats, and MapReduce functions without modification, supporting both the legacy mapred API and the newer mapreduce API.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create Hadoop input format wrapper
HadoopInputFormat<LongWritable, Text> hadoopInput =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://input/path"
);
// Use as Flink DataSet source
DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(hadoopInput);
// Process data using Flink operations
dataset.map(record -> record.f1.toString().toUpperCase())
.print();Flink Hadoop Compatibility is built around several key components:
All wrapped formats produce and consume Tuple2<K, V> objects where f0 is the key and f1 is the value, maintaining compatibility with Hadoop's key-value paradigm.
Core functionality for reading data from Hadoop InputFormats, supporting both file-based and generic input sources with full type safety.
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath
);
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath,
JobConf job
);
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
JobConf job
);Functionality for writing data to Hadoop OutputFormats, enabling Flink applications to output data in Hadoop-compatible formats.
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
JobConf job
);
public void writeRecord(Tuple2<K, V> record) throws IOException;
}Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic.
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
public void flatMap(
Tuple2<KEYIN, VALUEIN> value,
Collector<Tuple2<KEYOUT, VALUEOUT>> out
) throws Exception;
}Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration with Flink's type system.
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
public WritableTypeInfo(Class<T> typeClass);
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
}Utility functions for handling Hadoop configuration and command-line argument parsing.
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;// Core Flink tuple type used throughout the API
import org.apache.flink.api.java.tuple.Tuple2;
// Hadoop configuration types
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
// Flink type system integration
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;