Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The Input Format Integration capability provides comprehensive support for using Hadoop InputFormats within Flink applications. This enables reading data from various Hadoop-compatible sources including HDFS files, HBase tables, and custom data sources.
Flink's Hadoop compatibility layer wraps Hadoop InputFormats to work seamlessly with Flink's DataSet API. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Hadoop key-value pairs to Flink Tuple2 objects or Scala tuples.
The primary entry point for creating Hadoop InputFormat wrappers in Java.
// Read Hadoop FileInputFormat with custom JobConf
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath,
JobConf job);
// Read Hadoop FileInputFormat with default JobConf
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
String inputPath);
// Read Hadoop SequenceFile
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
Class<K> key,
Class<V> value,
String inputPath) throws IOException;
// Create wrapper for any Hadoop InputFormat
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
JobConf job);// Read Hadoop FileInputFormat with custom Job
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
String inputPath,
Job job) throws IOException;
// Read Hadoop FileInputFormat with default Job
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
String inputPath) throws IOException;
// Create wrapper for any MapReduce InputFormat
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
Job job);@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>> {
// Constructor with JobConf
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value,
JobConf job);
// Constructor with default JobConf
public HadoopInputFormat(
org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
Class<K> key,
Class<V> value);
// Read next record from input
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
// Get type information for produced tuples
public TypeInformation<Tuple2<K, V>> getProducedType();
}@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
implements ResultTypeQueryable<Tuple2<K, V>> {
// Constructor with Job
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value,
Job job);
// Constructor with default Job
public HadoopInputFormat(
org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
Class<K> key,
Class<V> value) throws IOException;
// Read next record from input
public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
// Get type information for produced tuples
public TypeInformation<Tuple2<K, V>> getProducedType();
}object HadoopInputs {
// MapRed API methods
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String,
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def readSequenceFile[K, V](
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
def createHadoopInput[K, V](
mapredInputFormat: MapredInputFormat[K, V],
key: Class[K],
value: Class[V],
job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
// MapReduce API methods
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String,
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
def createHadoopInput[K, V](
mapreduceInputFormat: MapreduceInputFormat[K, V],
key: Class[K],
value: Class[V],
job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
}// MapRed Scala InputFormat
@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
def nextRecord(reuse: (K, V)): (K, V);
}
// MapReduce Scala InputFormat
@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
def nextRecord(reuse: (K, V)): (K, V);
}import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
// Create input format for text files
DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
"hdfs://namenode:port/path/to/textfiles"
)
);import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
// Read sequence files with specific key-value types
DataSet<Tuple2<IntWritable, Text>> sequenceData = env.createInput(
HadoopInputs.readSequenceFile(
IntWritable.class,
Text.class,
"hdfs://namenode:port/path/to/sequence/files"
)
);import org.apache.hadoop.mapred.JobConf;
import com.example.CustomInputFormat;
import com.example.CustomKey;
import com.example.CustomValue;
// Configure custom input format
JobConf conf = new JobConf();
conf.setInputFormat(CustomInputFormat.class);
conf.set("custom.property", "value");
// Create wrapper for custom InputFormat
DataSet<Tuple2<CustomKey, CustomValue>> customData = env.createInput(
HadoopInputs.createHadoopInput(
new CustomInputFormat(),
CustomKey.class,
CustomValue.class,
conf
)
);import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}
// Read text files with Scala
val textData: DataSet[(LongWritable, Text)] = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
classOf[LongWritable],
classOf[Text],
"hdfs://namenode:port/path/to/textfiles"
)
)
// Extract just the text content
val lines = textData.map(_._2.toString)The Hadoop compatibility layer automatically handles input split distribution across Flink's parallel execution environment.
// Input split wrapper classes (used internally)
@PublicEvolving
public class HadoopInputSplit {
// MapRed input split wrapper
// Used internally by HadoopInputFormat
}
@PublicEvolving
public class HadoopInputSplit {
// MapReduce input split wrapper
// Used internally by HadoopInputFormat
}Input format operations may throw the following exceptions:
IOException - When reading from input fails or configuration is invalidClassNotFoundException - When specified key/value classes cannot be foundIllegalArgumentException - When invalid parameters are providedRuntimeException - For various Hadoop-related runtime errorsAlways handle these exceptions appropriately in your Flink programs:
try {
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat(),
LongWritable.class,
Text.class,
inputPath
)
);
} catch (IOException e) {
// Handle input/output errors
logger.error("Failed to create Hadoop input: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11