Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11@1.14.0Apache Flink Hadoop Compatibility provides a comprehensive integration layer between Apache Flink and Hadoop MapReduce ecosystems. It enables seamless use of existing Hadoop InputFormats and OutputFormats within Flink applications, supporting both legacy MapRed API and modern MapReduce API, with full Java and Scala language bindings.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.14.6</version>
</dependency>Java:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;Scala:
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormatReading Hadoop Files with Java:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create Hadoop InputFormat for reading text files
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://path/to/input"
)
);
// Process the data
DataSet<String> lines = input.map(tuple -> tuple.f1.toString());Reading Hadoop Files with Scala:
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.api.scala._
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
val env = ExecutionEnvironment.getExecutionEnvironment
// Create Hadoop InputFormat for reading text files
val input: DataSet[(LongWritable, Text)] = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"hdfs://path/to/input"
)
)
// Process the data
val lines = input.map(_._2.toString)The Hadoop Compatibility library is structured around several key architectural components:
HadoopInputs classes provide convenient factory methods for creating Flink wrappersWritableTypeInfo and related classes ensure proper serialization of Hadoop types within FlinkPrimary utilities for reading data from Hadoop InputFormats into Flink DataSets, supporting both file-based and custom InputFormats with automatic type conversion.
// Java MapRed API
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, String inputPath, JobConf job);
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, JobConf job);
// Java MapReduce API
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, String inputPath, Job job) throws IOException;Complete support for writing Flink DataSets to Hadoop OutputFormats, enabling integration with existing Hadoop data storage systems and custom output processing.
// Java MapRed OutputFormat wrapper
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job);
public void writeRecord(Tuple2<K, V> record) throws IOException;
}
// Java MapReduce OutputFormat wrapper
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job);
public void writeRecord(Tuple2<K, V> record) throws IOException;
}Advanced type system integration allowing Hadoop Writable types to work seamlessly within Flink's type system, with optimized serialization and comparison.
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
public WritableTypeInfo(Class<T> typeClass);
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
}Direct integration of Hadoop Mapper and Reducer functions into Flink workflows, supporting both simple and complex MapReduce patterns with combine functionality.
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
}
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
}Native Scala API providing idiomatic interfaces with implicit type information, tuple syntax, and functional programming patterns for Hadoop integration.
object HadoopInputs {
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K], value: Class[V], inputPath: String, job: JobConf)
(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def createHadoopInput[K, V](
mapredInputFormat: MapredInputFormat[K, V],
key: Class[K], value: Class[V], job: JobConf)
(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
}Utility functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.
public class HadoopUtils {
public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}// Core Flink types used throughout the API
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
// Hadoop configuration types
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
// Common Hadoop Writable types
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;