CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

input-formats.mddocs/

Input Format Integration

The Input Format Integration capability provides comprehensive support for using Hadoop InputFormats within Flink applications. This enables reading data from various Hadoop-compatible sources including HDFS files, HBase tables, and custom data sources.

Overview

Flink's Hadoop compatibility layer wraps Hadoop InputFormats to work seamlessly with Flink's DataSet API. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Hadoop key-value pairs to Flink Tuple2 objects or Scala tuples.

HadoopInputs Utility Class (Java)

The primary entry point for creating Hadoop InputFormat wrappers in Java.

MapRed API Methods

// Read Hadoop FileInputFormat with custom JobConf
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath,
    JobConf job);

// Read Hadoop FileInputFormat with default JobConf
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath);

// Read Hadoop SequenceFile
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
    Class<K> key,
    Class<V> value,
    String inputPath) throws IOException;

// Create wrapper for any Hadoop InputFormat
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    JobConf job);

MapReduce API Methods

// Read Hadoop FileInputFormat with custom Job
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath,
    Job job) throws IOException;

// Read Hadoop FileInputFormat with default Job
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath) throws IOException;

// Create wrapper for any MapReduce InputFormat
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    Job job);

HadoopInputFormat Classes

MapRed HadoopInputFormat

@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> 
    implements ResultTypeQueryable<Tuple2<K, V>> {
    
    // Constructor with JobConf
    public HadoopInputFormat(
        org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
        Class<K> key,
        Class<V> value,
        JobConf job);
    
    // Constructor with default JobConf
    public HadoopInputFormat(
        org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
        Class<K> key,
        Class<V> value);
    
    // Read next record from input
    public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
    
    // Get type information for produced tuples
    public TypeInformation<Tuple2<K, V>> getProducedType();
}

MapReduce HadoopInputFormat

@Public
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
    implements ResultTypeQueryable<Tuple2<K, V>> {
    
    // Constructor with Job
    public HadoopInputFormat(
        org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
        Class<K> key,
        Class<V> value,
        Job job);
    
    // Constructor with default Job
    public HadoopInputFormat(
        org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
        Class<K> key,
        Class<V> value) throws IOException;
    
    // Read next record from input
    public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;
    
    // Get type information for produced tuples
    public TypeInformation<Tuple2<K, V>> getProducedType();
}

Scala Input Formats

Scala HadoopInputs Object

object HadoopInputs {
  // MapRed API methods
  def readHadoopFile[K, V](
    mapredInputFormat: MapredFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String,
    job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def readHadoopFile[K, V](
    mapredInputFormat: MapredFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def readSequenceFile[K, V](
    key: Class[K],
    value: Class[V],
    inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  def createHadoopInput[K, V](
    mapredInputFormat: MapredInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    job: JobConf)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];

  // MapReduce API methods  
  def readHadoopFile[K, V](
    mapreduceInputFormat: MapreduceFileInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    inputPath: String,
    job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];

  def createHadoopInput[K, V](
    mapreduceInputFormat: MapreduceInputFormat[K, V],
    key: Class[K],
    value: Class[V],
    job: Job)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V];
}

Scala HadoopInputFormat Classes

// MapRed Scala InputFormat
@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: JobConf);
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
  def nextRecord(reuse: (K, V)): (K, V);
}

// MapReduce Scala InputFormat
@Public
class HadoopInputFormat[K, V] extends HadoopInputFormatBase[K, V, (K, V)] {
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V], job: Job);
  def this(mapredInputFormat: InputFormat[K, V], keyClass: Class[K], valueClass: Class[V]);
  def nextRecord(reuse: (K, V)): (K, V);
}

Usage Examples

Reading Text Files

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

// Create input format for text files
DataSet<Tuple2<LongWritable, Text>> textData = env.createInput(
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://namenode:port/path/to/textfiles"
    )
);

Reading Sequence Files

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

// Read sequence files with specific key-value types
DataSet<Tuple2<IntWritable, Text>> sequenceData = env.createInput(
    HadoopInputs.readSequenceFile(
        IntWritable.class,
        Text.class,
        "hdfs://namenode:port/path/to/sequence/files"
    )
);

Using Custom InputFormats

import org.apache.hadoop.mapred.JobConf;
import com.example.CustomInputFormat;
import com.example.CustomKey;
import com.example.CustomValue;

// Configure custom input format
JobConf conf = new JobConf();
conf.setInputFormat(CustomInputFormat.class);
conf.set("custom.property", "value");

// Create wrapper for custom InputFormat
DataSet<Tuple2<CustomKey, CustomValue>> customData = env.createInput(
    HadoopInputs.createHadoopInput(
        new CustomInputFormat(),
        CustomKey.class,
        CustomValue.class,
        conf
    )
);

Scala Usage

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

// Read text files with Scala
val textData: DataSet[(LongWritable, Text)] = env.createInput(
  HadoopInputs.readHadoopFile(
    new TextInputFormat(),
    classOf[LongWritable],
    classOf[Text],
    "hdfs://namenode:port/path/to/textfiles"
  )
)

// Extract just the text content
val lines = textData.map(_._2.toString)

Input Split Handling

The Hadoop compatibility layer automatically handles input split distribution across Flink's parallel execution environment.

// Input split wrapper classes (used internally)
@PublicEvolving
public class HadoopInputSplit {
    // MapRed input split wrapper
    // Used internally by HadoopInputFormat
}

@PublicEvolving
public class HadoopInputSplit {
    // MapReduce input split wrapper  
    // Used internally by HadoopInputFormat
}

Error Handling

Input format operations may throw the following exceptions:

  • IOException - When reading from input fails or configuration is invalid
  • ClassNotFoundException - When specified key/value classes cannot be found
  • IllegalArgumentException - When invalid parameters are provided
  • RuntimeException - For various Hadoop-related runtime errors

Always handle these exceptions appropriately in your Flink programs:

try {
    DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
        HadoopInputs.readHadoopFile(
            new TextInputFormat(),
            LongWritable.class, 
            Text.class,
            inputPath
        )
    );
} catch (IOException e) {
    // Handle input/output errors
    logger.error("Failed to create Hadoop input: " + e.getMessage());
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json