CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility

Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing

Pending
Overview
Eval results
Files

input-formats.mddocs/

Hadoop Input Formats

Comprehensive integration for reading data from Hadoop InputFormats in Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic key-value pair conversion to Flink Tuple2 objects.

Capabilities

File Input Format Reading (mapred API)

Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the legacy mapred API.

/**
 * Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API)
 * @param mapredInputFormat The Hadoop FileInputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type  
 * @param inputPath The path to read input data from
 * @param job JobConf configuration for the Hadoop job
 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat
 */
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath,
    JobConf job
);

/**
 * Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapred API) with default JobConf
 * @param mapredInputFormat The Hadoop FileInputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type
 * @param inputPath The path to read input data from
 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat
 */
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath
);

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

// Reading text files using TextInputFormat
HadoopInputFormat<LongWritable, Text> textInput = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://data/input.txt"
    );

// With custom JobConf
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.fileinputformat.split.minsize", "1048576");

HadoopInputFormat<LongWritable, Text> configuredInput = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://data/input.txt",
        jobConf
    );

File Input Format Reading (mapreduce API)

Creates Flink InputFormat wrappers for Hadoop FileInputFormats using the newer mapreduce API.

/**
 * Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API)
 * @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type
 * @param inputPath The path to read input data from
 * @param job Job configuration for the Hadoop job
 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat
 * @throws IOException if the Job configuration cannot be processed
 */
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath,
    Job job
) throws IOException;

/**
 * Creates a Flink InputFormat that wraps the given Hadoop FileInputFormat (mapreduce API) with default Job
 * @param mapreduceInputFormat The Hadoop FileInputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type
 * @param inputPath The path to read input data from
 * @return A Flink InputFormat that wraps the Hadoop FileInputFormat
 * @throws IOException if the Job configuration cannot be created
 */
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath
) throws IOException;

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

// Reading text files using mapreduce API
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> mapreduceInput = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://data/input.txt"
    );

// With custom Job configuration
Job job = Job.getInstance();
job.getConfiguration().set("mapreduce.input.textinputformat.record.delimiter", "\n");

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<LongWritable, Text> configuredMapreduceInput = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://data/input.txt",
        job
    );

Sequence File Reading

Specialized method for reading Hadoop sequence files with automatic format configuration.

/**
 * Creates a Flink InputFormat to read a Hadoop sequence file for the given key and value classes
 * @param key The class of the key type
 * @param value The class of the value type
 * @param inputPath The path to the sequence file
 * @return A Flink InputFormat that wraps a Hadoop SequenceFileInputFormat
 * @throws IOException if the sequence file cannot be accessed
 */
public static <K, V> HadoopInputFormat<K, V> readSequenceFile(
    Class<K> key,
    Class<V> value,
    String inputPath
) throws IOException;

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

// Reading sequence files
HadoopInputFormat<IntWritable, Text> sequenceInput = 
    HadoopInputs.readSequenceFile(
        IntWritable.class,
        Text.class,
        "hdfs://data/sequence.seq"
    );

Generic Input Format Creation (mapred API)

Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapred API.

/**
 * Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapred API)
 * @param mapredInputFormat The Hadoop InputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type
 * @param job JobConf configuration for the Hadoop job
 * @return A Flink InputFormat that wraps the Hadoop InputFormat
 */
public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    JobConf job
);

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;

// Using KeyValueTextInputFormat
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");

HadoopInputFormat<Text, Text> keyValueInput = 
    HadoopInputs.createHadoopInput(
        new KeyValueTextInputFormat(),
        Text.class,
        Text.class,
        jobConf
    );

Generic Input Format Creation (mapreduce API)

Creates Flink InputFormat wrappers for any Hadoop InputFormat using the mapreduce API.

/**
 * Creates a Flink InputFormat that wraps the given Hadoop InputFormat (mapreduce API)
 * @param mapreduceInputFormat The Hadoop InputFormat to wrap
 * @param key The class of the key type
 * @param value The class of the value type
 * @param job Job configuration for the Hadoop job
 * @return A Flink InputFormat that wraps the Hadoop InputFormat
 */
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
    Class<K> key,
    Class<V> value,
    Job job
);

Usage Example:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;

// Using mapreduce KeyValueTextInputFormat
Job job = Job.getInstance();
job.getConfiguration().set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<Text, Text> mapreduceKeyValueInput = 
    HadoopInputs.createHadoopInput(
        new KeyValueTextInputFormat(),
        Text.class,
        Text.class,
        job
    );

Input Format Classes

HadoopInputFormat (mapred API)

Wrapper class for Hadoop InputFormats using the mapred API.

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
        implements ResultTypeQueryable<Tuple2<K, V>> {

    /**
     * Constructor with full configuration
     * @param mapredInputFormat The Hadoop InputFormat to wrap
     * @param key The class of the key type
     * @param value The class of the value type
     * @param job JobConf configuration
     */
    public HadoopInputFormat(
        org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
        Class<K> key,
        Class<V> value,
        JobConf job
    );

    /**
     * Constructor with default JobConf
     * @param mapredInputFormat The Hadoop InputFormat to wrap
     * @param key The class of the key type
     * @param value The class of the value type
     */
    public HadoopInputFormat(
        org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
        Class<K> key,
        Class<V> value
    );

    /**
     * Read the next record from the Hadoop InputFormat
     * @param record Reusable record object
     * @return The next record as a Tuple2
     * @throws IOException if reading fails
     */
    public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

    /**
     * Get type information for the produced Tuple2 type
     * @return TypeInformation for Tuple2<K, V>
     */
    public TypeInformation<Tuple2<K, V>> getProducedType();
}

HadoopInputFormat (mapreduce API)

Wrapper class for Hadoop InputFormats using the mapreduce API.

public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>>
        implements ResultTypeQueryable<Tuple2<K, V>> {

    /**
     * Constructor with full configuration
     * @param mapreduceInputFormat The Hadoop InputFormat to wrap
     * @param key The class of the key type
     * @param value The class of the value type
     * @param job Job configuration
     */
    public HadoopInputFormat(
        org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
        Class<K> key,
        Class<V> value,
        Job job
    );

    /**
     * Constructor with default Job configuration
     * @param mapreduceInputFormat The Hadoop InputFormat to wrap
     * @param key The class of the key type
     * @param value The class of the value type
     * @throws IOException if Job configuration cannot be created
     */
    public HadoopInputFormat(
        org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat,
        Class<K> key,
        Class<V> value
    ) throws IOException;

    /**
     * Read the next record from the Hadoop InputFormat
     * @param record Reusable record object
     * @return The next record as a Tuple2
     * @throws IOException if reading fails
     */
    public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException;

    /**
     * Get type information for the produced Tuple2 type
     * @return TypeInformation for Tuple2<K, V>
     */
    public TypeInformation<Tuple2<K, V>> getProducedType();
}

Key Design Patterns

Type Safety

All InputFormat wrappers are generically typed with <K, V> parameters, ensuring compile-time type safety and proper type inference in Flink applications.

Tuple2 Convention

All input formats produce Tuple2<K, V> objects where:

  • f0 contains the key of type K
  • f1 contains the value of type V

Configuration Flexibility

Both JobConf (mapred) and Job (mapreduce) configuration objects are supported, with convenient overloads providing default configurations when not specified.

Exception Handling

IOException is thrown for I/O operations, maintaining consistency with Hadoop's exception handling patterns.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility

docs

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

type-system.md

utilities.md

tile.json