Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing
—
Comprehensive integration for reading data from Hadoop InputFormats in Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic key-value pair conversion to Flink Tuple2 objects.
Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the legacy mapred API.
/**
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API)
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param inputPath The path to read input data from
* @param job JobConf configuration for the Hadoop job
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
*/
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath,
JobConf job
);
/**
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API) with default JobConf
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param inputPath The path to read input data from
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
*/
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath
);Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
// Reading text files using TextInputFormat
HadoopInputFormat<LongWritable, Text> textInput =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://data/input.txt"
);
// With custom JobConf
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.fileinputformat.split.minsize", "1048576");
HadoopInputFormat<LongWritable, Text> configuredInput =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://data/input.txt",
jobConf
);Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the newer mapreduce API.
/**
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API)
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param inputPath The path to read input data from
* @param job Job configuration for the Hadoop job
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
* @throws IOException if the Job configuration cannot be processed
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
String inputPath,
Job job
) throws IOException;
/**
* Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API) with default Job
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param inputPath The path to read input data from
* @return A Flink InputFormat that wraps the Hadoop FileInputFormat
* @throws IOException if the Job configuration cannot be created
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
String inputPath
) throws IOException;Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
// Reading text files using mapreduce API
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> mapreduceInput =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://data/input.txt"
);
// With custom Job configuration
Job job = Job.getInstance();
job.getConfiguration().set("mapreduce.input.textinputformat.record.delimiter", "\n");
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> configuredMapreduceInput =
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://data/input.txt",
job
);Specialized method for reading Hadoop sequence files with automatic format configuration.
/**
* Creates a Flink InputFormat to read a Hadoop sequence file for the given key and value classes
* @param key The class of the key type
* @param value The class of the value type
* @param inputPath The path to the sequence file
* @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat
* @throws IOException if the sequence file cannot be accessed
*/
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
Class<K> key,
Class<V> value,
String inputPath
) throws IOException;Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
// Reading sequence files
HadoopInputFormat<IntWritable, Text> sequenceInput =
HadoopInputs.readSequenceFile(
IntWritable.class,
Text.class,
"hdfs://data/sequence.seq"
);Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapred API.
/**
* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapred API)
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param job JobConf configuration for the Hadoop job
* @return A Flink InputFormat that wraps the Hadoop InputFormat
*/
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
JobConf job
);Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
// Using KeyValueTextInputFormat
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
HadoopInputFormat<Text, Text> keyValueInput =
HadoopInputs.createHadoopInput(
new KeyValueTextInputFormat(),
Text.class,
Text.class,
jobConf
);Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapreduce API.
/**
* Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapreduce API)
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param job Job configuration for the Hadoop job
* @return A Flink InputFormat that wraps the Hadoop InputFormat
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
Job job
);Usage Example:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
// Using mapreduce KeyValueTextInputFormat
Job job = Job.getInstance();
job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<Text, Text> mapreduceKeyValueInput =
HadoopInputs.createHadoopInput(
new KeyValueTextInputFormat(),
Text.class,
Text.class,
job
);Wrapper class for Hadoop InputFormats using the mapred API.
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>> {
/**
* Constructor with full configuration
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param job JobConf configuration
*/
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
JobConf job
);
/**
* Constructor with default JobConf
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
*/
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value
);
/**
* Read the next record from the Hadoop InputFormat
* @param record Reusable record object
* @return The next record as a Tuple2
* @throws IOException if reading fails
*/
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
/**
* Get type information for the produced Tuple2 type
* @return TypeInformation for Tuple2<K, V>
*/
public TypeInformation<Tuple2<K, V>> getProducedType();
}Wrapper class for Hadoop InputFormats using the mapreduce API.
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>> {
/**
* Constructor with full configuration
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @param job Job configuration
*/
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
Job job
);
/**
* Constructor with default Job configuration
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The class of the key type
* @param value The class of the value type
* @throws IOException if Job configuration cannot be created
*/
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value
) throws IOException;
/**
* Read the next record from the Hadoop InputFormat
* @param record Reusable record object
* @return The next record as a Tuple2
* @throws IOException if reading fails
*/
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
/**
* Get type information for the produced Tuple2 type
* @return TypeInformation for Tuple2<K, V>
*/
public TypeInformation<Tuple2<K, V>> getProducedType();
}All InputFormat wrappers are generically typed with <K, V> parameters, ensuring compile-time type safety and proper type inference in Flink applications.
All input formats produce Tuple2<K, V> objects where:
f0 contains the key of type Kf1 contains the value of type VBoth JobConf (mapred) and Job (mapreduce) configuration objects are supported, with convenient overloads providing default configurations when not specified.
IOException is thrown for I/O operations, maintaining consistency with Hadoop's exception handling patterns.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility