or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md
tile.json

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-hadoop-compatibility_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11@1.14.0

index.mddocs/

Apache Flink Hadoop Compatibility

Apache Flink Hadoop Compatibility provides a comprehensive integration layer between Apache Flink and Hadoop MapReduce ecosystems. It enables seamless use of existing Hadoop InputFormats and OutputFormats within Flink applications, supporting both legacy MapRed API and modern MapReduce API, with full Java and Scala language bindings.

Package Information

  • Package Name: flink-hadoop-compatibility_2.11
  • Package Type: Maven
  • Language: Java/Scala
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_2.11</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

Java:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.hadoopcompatibility.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;

Scala:

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat

Basic Usage

Reading Hadoop Files with Java:

import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create Hadoop InputFormat for reading text files
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,
        Text.class,
        "hdfs://path/to/input"
    )
);

// Process the data
DataSet<String> lines = input.map(tuple -> tuple.f1.toString());

Reading Hadoop Files with Scala:

import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.api.scala._
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val env = ExecutionEnvironment.getExecutionEnvironment

// Create Hadoop InputFormat for reading text files
val input: DataSet[(LongWritable, Text)] = env.createInput(
  HadoopInputs.readHadoopFile(
    new TextInputFormat(),
    classOf[LongWritable],
    classOf[Text],
    "hdfs://path/to/input"
  )
)

// Process the data
val lines = input.map(_._2.toString)

Architecture

The Hadoop Compatibility library is structured around several key architectural components:

  • Entry Point Utilities: HadoopInputs classes provide convenient factory methods for creating Flink wrappers
  • Input/Output Format Wrappers: Bridge classes that adapt Hadoop formats to Flink's InputFormat and OutputFormat interfaces
  • Type System Integration: WritableTypeInfo and related classes ensure proper serialization of Hadoop types within Flink
  • Dual API Support: Complete coverage of both legacy MapRed API and modern MapReduce API
  • Language Bindings: Native Java and Scala APIs with appropriate language conventions
  • MapReduce Function Compatibility: Wrappers to use Hadoop Mapper and Reducer functions directly in Flink

Capabilities

Input Format Integration

Primary utilities for reading data from Hadoop InputFormats into Flink DataSets, supporting both file-based and custom InputFormats with automatic type conversion.

// Java MapRed API
public static <K, V> HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat,
    Class<K> key, Class<V> value, String inputPath, JobConf job);

public static <K, V> HadoopInputFormat<K, V> createHadoopInput(
    org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat,
    Class<K> key, Class<V> value, JobConf job);

// Java MapReduce API  
public static <K, V> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> readHadoopFile(
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat,
    Class<K> key, Class<V> value, String inputPath, Job job) throws IOException;

Input Format Integration

Output Format Integration

Complete support for writing Flink DataSets to Hadoop OutputFormats, enabling integration with existing Hadoop data storage systems and custom output processing.

// Java MapRed OutputFormat wrapper
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
    public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job);
    public void writeRecord(Tuple2<K, V> record) throws IOException;
}

// Java MapReduce OutputFormat wrapper
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
    public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job);
    public void writeRecord(Tuple2<K, V> record) throws IOException;
}

Output Format Integration

Type System and Serialization

Advanced type system integration allowing Hadoop Writable types to work seamlessly within Flink's type system, with optimized serialization and comparison.

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
    public WritableTypeInfo(Class<T> typeClass);
    public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);
    public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
}

Type System Integration

MapReduce Function Compatibility

Direct integration of Hadoop Mapper and Reducer functions into Flink workflows, supporting both simple and complex MapReduce patterns with combine functionality.

public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
    extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
}

public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>> {
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
}

MapReduce Functions

Scala API Bindings

Native Scala API providing idiomatic interfaces with implicit type information, tuple syntax, and functional programming patterns for Hadoop integration.

object HadoopInputs {
  def readHadoopFile[K, V](
    mapredInputFormat: MapredFileInputFormat[K, V],
    key: Class[K], value: Class[V], inputPath: String, job: JobConf)
    (implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
    
  def createHadoopInput[K, V](
    mapredInputFormat: MapredInputFormat[K, V],
    key: Class[K], value: Class[V], job: JobConf)
    (implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V];
}

Scala API

Configuration and Utilities

Utility functions for Hadoop configuration management, command-line argument parsing, and seamless integration with existing Hadoop tooling and workflows.

public class HadoopUtils {
    public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException;
}

Configuration Utilities

Common Type Definitions

// Core Flink types used throughout the API
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;

// Hadoop configuration types
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;

// Common Hadoop Writable types
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;