Comprehensive wrapper classes that enable Hadoop InputFormats and OutputFormats to work seamlessly with Flink DataSets. The library provides complete support for both the legacy MapRed API (org.apache.hadoop.mapred) and the modern MapReduce API (org.apache.hadoop.mapreduce).
Central factory class providing convenient methods to create Flink InputFormat wrappers for various Hadoop InputFormat types.
/**
* Utility class to use Apache Hadoop InputFormats with Apache Flink
*/
public class HadoopInputs {
/**
* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with JobConf
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param inputPath Path to input files
* @param job JobConf for Hadoop configuration
* @return HadoopInputFormat wrapper for use with Flink
*/
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, String inputPath, JobConf job
);
/**
* Creates a Flink InputFormat wrapper for Hadoop mapred FileInputFormat with default JobConf
* @param mapredInputFormat The Hadoop FileInputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param inputPath Path to input files
* @return HadoopInputFormat wrapper for use with Flink
*/
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, String inputPath
);
/**
* Creates a Flink InputFormat for reading Hadoop sequence files
* @param key The key class type
* @param value The value class type
* @param inputPath Path to sequence files
* @return HadoopInputFormat wrapper for reading sequence files
* @throws IOException if sequence file access fails
*/
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
Class<K> key, Class<V> value, String inputPath
) throws IOException;
/**
* Creates a Flink InputFormat wrapper for any Hadoop mapred InputFormat
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param job JobConf for Hadoop configuration
* @return HadoopInputFormat wrapper for use with Flink
*/
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, JobConf job
);
/**
* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with Job
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param inputPath Path to input files
* @param job Job for Hadoop configuration
* @return HadoopInputFormat wrapper for use with Flink
* @throws IOException if file system access fails
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, String inputPath, Job job
) throws IOException;
/**
* Creates a Flink InputFormat wrapper for Hadoop mapreduce FileInputFormat with default Job
* @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param inputPath Path to input files
* @return HadoopInputFormat wrapper for use with Flink
* @throws IOException if file system access fails
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, String inputPath
) throws IOException;
/**
* Creates a Flink InputFormat wrapper for any Hadoop mapreduce InputFormat
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param job Job for Hadoop configuration
* @return HadoopInputFormat wrapper for use with Flink
*/
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, Job job
);
}Usage Examples:
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.JobConf;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read text files using MapRed API
DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://path/to/input/*.txt"
)
);
// Read sequence files
DataSet<Tuple2<Text, IntWritable>> sequenceData = env.createInput(
HadoopInputs.readSequenceFile(
Text.class,
IntWritable.class,
"hdfs://path/to/sequence/files"
)
);
// Custom InputFormat with configuration
JobConf conf = new JobConf();
conf.set("custom.property", "value");
DataSet<Tuple2<Text, MyWritable>> customData = env.createInput(
HadoopInputs.createHadoopInput(
new MyCustomInputFormat(),
Text.class,
MyWritable.class,
conf
)
);InputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).
/**
* InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink
* @param <K> The key type
* @param <V> The value type
*/
@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapreduce.InputFormat<K, V>> {
/**
* Creates a HadoopInputFormat wrapper with Job configuration
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param job Job for Hadoop configuration
*/
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value, Job job
);
/**
* Creates a HadoopInputFormat wrapper with default configuration
* @param mapreduceInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
* @throws IOException if default Job creation fails
*/
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key, Class<V> value
) throws IOException;
}OutputFormat implementation for the modern Hadoop MapReduce API (org.apache.hadoop.mapreduce).
/**
* OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats with Flink
* @param <K> The key type
* @param <V> The value type
*/
@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapreduce.OutputFormat<K, V>> {
/**
* Creates a HadoopOutputFormat wrapper with Job configuration
* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap
* @param job Job for Hadoop configuration
*/
public HadoopOutputFormat(
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
Job job
);
}InputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
/**
* Wrapper for using HadoopInputFormats (mapred-variant) with Flink
* @param <K> The key type
* @param <V> The value type
*/
@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, org.apache.hadoop.mapred.InputFormat<K, V>> {
/**
* Creates a HadoopInputFormat wrapper with JobConf configuration
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
* @param job JobConf for Hadoop configuration
*/
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value, JobConf job
);
/**
* Creates a HadoopInputFormat wrapper with default JobConf
* @param mapredInputFormat The Hadoop InputFormat to wrap
* @param key The key class type
* @param value The value class type
*/
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key, Class<V> value
);
}OutputFormat implementation for the legacy Hadoop MapRed API (org.apache.hadoop.mapred).
/**
* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink
* @param <K> The key type
* @param <V> The value type
*/
@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, org.apache.hadoop.mapred.OutputFormat<K, V>> {
/**
* Creates a HadoopOutputFormat wrapper with JobConf configuration
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
* @param job JobConf for Hadoop configuration
*/
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
JobConf job
);
/**
* Creates a HadoopOutputFormat wrapper with custom OutputCommitter and JobConf
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
* @param outputCommitterClass Custom OutputCommitter class
* @param job JobConf for Hadoop configuration
*/
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
Class<OutputCommitter> outputCommitterClass, JobConf job
);
}Usage Examples:
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
// Write to text files using MapReduce API
Job outputJob = Job.getInstance();
outputJob.getConfiguration().set("mapreduce.output.textoutputformat.separator", "\t");
DataSet<Tuple2<NullWritable, Text>> results = // ... your data
results.output(new HadoopOutputFormat<>(
new TextOutputFormat<NullWritable, Text>(),
outputJob
));All data is exposed as Flink Tuple2<K,V> objects where f0 is the key and f1 is the value, providing seamless integration with Flink's DataSet API.
All classes are parameterized with key (K) and value (V) types, ensuring compile-time type safety and preventing runtime ClassCastExceptions.
Full support for both Hadoop JobConf (MapRed API) and Job (MapReduce API) configurations, allowing fine-grained control over Hadoop behavior.
Proper IOException propagation for file system and configuration errors, with detailed error messages for troubleshooting integration issues.