or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md
tile.json

tessl/maven-org-apache-flink--flink-hadoop-compatibility

Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-hadoop-compatibility@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility@2.1.0

index.mddocs/

Flink Hadoop Compatibility

Flink Hadoop Compatibility is an Apache Flink library that provides seamless integration between Apache Flink and Apache Hadoop ecosystems. It enables Flink applications to use existing Hadoop InputFormats, OutputFormats, and MapReduce functions without modification, supporting both the legacy mapred API and the newer mapreduce API.

Package Information

  • Package Name: org.apache.flink:flink-hadoop-compatibility
  • Package Type: maven
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility</artifactId>
      <version>2.1.0</version>
    </dependency>

Core Imports

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReducerWrappedFunction;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;

Basic Usage

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;

// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create Hadoop input format wrapper
HadoopInputFormat<LongWritable, Text> hadoopInput = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://input/path"
    );

// Use as Flink DataSet source
DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(hadoopInput);

// Process data using Flink operations
dataset.map(record -> record.f1.toString().toUpperCase())
       .print();

Architecture

Flink Hadoop Compatibility is built around several key components:

  • Input/Output Format Wrappers: Adapts Hadoop InputFormats and OutputFormats to work with Flink's data processing model
  • API Version Support: Dual support for legacy mapred API and newer mapreduce API variants
  • Type System Integration: Custom TypeInformation and serialization for Hadoop Writable types
  • Function Wrappers: Adapters that convert Hadoop Mappers and Reducers into Flink functions
  • Utility Classes: Helper methods for configuration parsing and format creation

All wrapped formats produce and consume Tuple2<K, V> objects where f0 is the key and f1 is the value, maintaining compatibility with Hadoop's key-value paradigm.

Capabilities

Hadoop Input Integration

Core functionality for reading data from Hadoop InputFormats, supporting both file-based and generic input sources with full type safety.

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath
);

public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    String inputPath,
    JobConf job
);

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
    Class<K> key,
    Class<V> value,
    JobConf job
);

Input Formats

Hadoop Output Integration

Functionality for writing data to Hadoop OutputFormats, enabling Flink applications to output data in Hadoop-compatible formats.

public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
    public HadoopOutputFormat(
        org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
        JobConf job
    );
    
    public void writeRecord(Tuple2<K, V> record) throws IOException;
}

Output Formats

MapReduce Function Integration

Wrappers that convert Hadoop Mappers and Reducers into Flink-compatible functions, enabling reuse of existing MapReduce logic.

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
    extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
    
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
    
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
    
    public void flatMap(
        Tuple2<KEYIN, VALUEIN> value,
        Collector<Tuple2<KEYOUT, VALUEOUT>> out
    ) throws Exception;
}

MapReduce Functions

Type System Integration

Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration with Flink's type system.

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
    public WritableTypeInfo(Class<T> typeClass);
    
    static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
    
    public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
}

Type System

Configuration Utilities

Utility functions for handling Hadoop configuration and command-line argument parsing.

public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;

Utilities

Common Types

// Core Flink tuple type used throughout the API
import org.apache.flink.api.java.tuple.Tuple2;

// Hadoop configuration types
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;

// Flink type system integration
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;