CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

mapreduce-functions.mddocs/

MapReduce Functions

The MapReduce Functions capability enables direct integration of Hadoop Mapper and Reducer functions into Flink workflows, allowing reuse of existing MapReduce logic within Flink's DataSet API while maintaining compatibility with Hadoop's programming model.

Overview

Flink's Hadoop compatibility layer provides wrapper classes that adapt Hadoop MapReduce functions to work as Flink operators. This enables gradual migration from MapReduce to Flink by allowing existing Mapper and Reducer implementations to run within Flink pipelines without modification.

HadoopMapFunction

Wrapper that adapts a Hadoop Mapper to a Flink FlatMapFunction.

@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
    extends RichFlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
    implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
    
    // Constructor with Mapper only (uses default JobConf)
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
    
    // Constructor with Mapper and custom JobConf
    public HadoopMapFunction(
        Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, 
        JobConf conf);
    
    // Flink lifecycle method
    public void open(Configuration parameters) throws Exception;
    
    // Main processing method
    public void flatMap(
        final Tuple2<KEYIN, VALUEIN> value, 
        final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
    
    // Type information method
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

HadoopReduceFunction

Wrapper that adapts a Hadoop Reducer to a non-combinable Flink GroupReduceFunction.

@Public
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
    implements ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
    
    // Constructor with Reducer only (uses default JobConf)
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
    
    // Constructor with Reducer and custom JobConf
    public HadoopReduceFunction(
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, 
        JobConf conf);
    
    // Flink lifecycle method
    public void open(Configuration parameters) throws Exception;
    
    // Main processing method
    public void reduce(
        final Iterable<Tuple2<KEYIN, VALUEIN>> values, 
        final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
    
    // Type information method
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

HadoopReduceCombineFunction

Wrapper that adapts both Hadoop Reducer and Combiner to a combinable Flink GroupReduceFunction.

@Public
public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    extends RichGroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>
    implements GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>, 
               ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>>, Serializable {
    
    // Constructor with Reducer and Combiner (uses default JobConf)
    public HadoopReduceCombineFunction(
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
        Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner);
    
    // Constructor with Reducer, Combiner, and custom JobConf
    public HadoopReduceCombineFunction(
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
        Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
        JobConf conf);
    
    // Flink lifecycle method
    public void open(Configuration parameters) throws Exception;
    
    // Main reduce processing method
    public void reduce(
        final Iterable<Tuple2<KEYIN, VALUEIN>> values, 
        final Collector<Tuple2<KEYOUT, VALUEOUT>> out) throws Exception;
    
    // Combine processing method for optimization
    public void combine(
        final Iterable<Tuple2<KEYIN, VALUEIN>> values, 
        final Collector<Tuple2<KEYIN, VALUEIN>> out) throws Exception;
    
    // Type information method
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

Usage Examples

Basic WordCount with Hadoop MapReduce Functions

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;

// Hadoop Mapper implementation
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
        StringTokenizer tokenizer = new StringTokenizer(value.toString());
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken().toLowerCase());
            output.collect(word, one);
        }
    }
}

// Hadoop Reducer implementation
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    
    @Override
    public void reduce(Text key, Iterator<IntWritable> values, 
                      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        result.set(sum);
        output.collect(key, result);
    }
}

// Use in Flink pipeline
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Read input data
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
    HadoopInputs.readHadoopFile(/* input format configuration */)
);

// Apply Hadoop mapper
DataSet<Tuple2<Text, IntWritable>> mappedData = input
    .flatMap(new HadoopMapFunction<>(new TokenizerMapper()));

// Group by key and apply Hadoop reducer
DataSet<Tuple2<Text, IntWritable>> result = mappedData
    .groupBy(0)
    .reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));

Using Combiner for Optimization

// Combiner that performs partial aggregation
public static class IntSumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    
    @Override
    public void reduce(Text key, Iterator<IntWritable> values, 
                      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        result.set(sum);
        output.collect(key, result);
    }
}

// Use HadoopReduceCombineFunction for better performance
DataSet<Tuple2<Text, IntWritable>> optimizedResult = mappedData
    .groupBy(0)
    .reduceGroup(new HadoopReduceCombineFunction<>(
        new IntSumReducer(),     // Final reducer
        new IntSumCombiner()     // Combiner for pre-aggregation
    ));

Custom Configuration

import org.apache.hadoop.mapred.JobConf;

// Configure Hadoop MapReduce job settings
JobConf jobConf = new JobConf();
jobConf.set("mapreduce.job.name", "Flink-Hadoop Integration");
jobConf.set("custom.parameter", "custom-value");
jobConf.setInt("custom.int.parameter", 42);

// Use custom configuration with MapReduce functions
HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapperWithConfig = 
    new HadoopMapFunction<>(new TokenizerMapper(), jobConf);

HadoopReduceFunction<Text, IntWritable, Text, IntWritable> reducerWithConfig = 
    new HadoopReduceFunction<>(new IntSumReducer(), jobConf);

// Apply in pipeline
DataSet<Tuple2<Text, IntWritable>> result = input
    .flatMap(mapperWithConfig)
    .groupBy(0)
    .reduceGroup(reducerWithConfig);

Complex Data Processing

// Example with custom Writable types
public static class DataRecord implements Writable {
    private String category;
    private double value;
    private long timestamp;
    
    // Writable implementation...
}

public static class CategoryKey implements Writable {
    private String category;
    private int hour;
    
    // Writable implementation...
}

// Mapper that processes complex records
public static class DataProcessor extends Mapper<LongWritable, DataRecord, CategoryKey, DataRecord> {
    private CategoryKey outputKey = new CategoryKey();
    
    @Override
    public void map(LongWritable key, DataRecord value, 
                   OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
        // Extract hour from timestamp
        int hour = (int) (value.getTimestamp() / 3600000) % 24;
        
        outputKey.setCategory(value.getCategory());
        outputKey.setHour(hour);
        
        output.collect(outputKey, value);
    }
}

// Reducer that aggregates by category and hour
public static class CategoryAggregator extends Reducer<CategoryKey, DataRecord, CategoryKey, DataRecord> {
    private DataRecord result = new DataRecord();
    
    @Override
    public void reduce(CategoryKey key, Iterator<DataRecord> values, 
                      OutputCollector<CategoryKey, DataRecord> output, Reporter reporter) throws IOException {
        double sum = 0.0;
        int count = 0;
        
        while (values.hasNext()) {
            sum += values.next().getValue();
            count++;
        }
        
        result.setCategory(key.getCategory());
        result.setValue(sum / count); // Average
        result.setTimestamp(System.currentTimeMillis());
        
        output.collect(key, result);
    }
}

// Use in Flink pipeline
DataSet<Tuple2<CategoryKey, DataRecord>> aggregatedData = rawData
    .flatMap(new HadoopMapFunction<>(new DataProcessor()))
    .groupBy(0)
    .reduceGroup(new HadoopReduceFunction<>(new CategoryAggregator()));

Performance Considerations

Object Reuse

// Enable object reuse for better performance with Hadoop functions
env.getConfig().enableObjectReuse();

// This is particularly beneficial when using Hadoop functions as they
// typically create many temporary objects

Combiner Usage

// Always use combiners when possible for commutative and associative operations
// This reduces network traffic and improves performance

// Good candidates for combiners:
// - Sum operations
// - Count operations  
// - Min/Max operations
// - Set union operations

// Bad candidates for combiners:
// - Operations that need to see all values
// - Non-associative operations
// - Operations with side effects

Configuration Tuning

JobConf conf = new JobConf();

// Configure memory settings
conf.setInt("mapreduce.map.memory.mb", 1024);
conf.setInt("mapreduce.reduce.memory.mb", 2048);

// Configure JVM options
conf.set("mapreduce.map.java.opts", "-Xmx800m");
conf.set("mapreduce.reduce.java.opts", "-Xmx1600m");

// Configure buffer sizes
conf.setInt("io.sort.mb", 256);
conf.setFloat("io.sort.spill.percent", 0.8f);

Error Handling

try {
    DataSet<Tuple2<Text, IntWritable>> result = input
        .flatMap(new HadoopMapFunction<>(new TokenizerMapper()))
        .groupBy(0)
        .reduceGroup(new HadoopReduceFunction<>(new IntSumReducer()));
    
    result.print();
    env.execute();
    
} catch (Exception e) {
    // Handle various exceptions
    if (e.getCause() instanceof IOException) {
        logger.error("I/O error in Hadoop function: " + e.getMessage());
    } else if (e.getCause() instanceof InterruptedException) {
        logger.error("Hadoop function was interrupted: " + e.getMessage());
    } else {
        logger.error("Unexpected error: " + e.getMessage());
    }
}

Migration Best Practices

Gradual Migration Strategy

  1. Start with Input/Output: Use Hadoop InputFormats and OutputFormats with native Flink operations
  2. Migrate Logic Gradually: Replace Hadoop functions one by one with native Flink operations
  3. Optimize Performance: Use Flink-native operations for better performance where possible
  4. Maintain Compatibility: Keep Hadoop functions for complex logic that's hard to rewrite

Testing Hadoop Functions in Flink

// Create test data
List<Tuple2<LongWritable, Text>> testInput = Arrays.asList(
    new Tuple2<>(new LongWritable(1), new Text("hello world")),
    new Tuple2<>(new LongWritable(2), new Text("hello flink"))
);

DataSet<Tuple2<LongWritable, Text>> input = env.fromCollection(testInput);

// Test mapper output
DataSet<Tuple2<Text, IntWritable>> mapperOutput = input
    .flatMap(new HadoopMapFunction<>(new TokenizerMapper()));

// Collect and verify results in tests
List<Tuple2<Text, IntWritable>> results = mapperOutput.collect();
assertEquals(3, results.size()); // "hello", "world", "hello", "flink"

Common Migration Patterns

// Replace Hadoop Identity operations with Flink map
// Before: Hadoop IdentityMapper
DataSet<Tuple2<K, V>> output = input.map(tuple -> tuple);

// Replace simple aggregations with Flink reduce
// Before: Hadoop sum reducer
DataSet<Tuple2<Text, IntWritable>> sums = input
    .groupBy(0)
    .reduce((a, b) -> new Tuple2<>(a.f0, new IntWritable(a.f1.get() + b.f1.get())));

// Replace filtering with Flink filter
// Before: Hadoop filtering mapper
DataSet<Tuple2<K, V>> filtered = input.filter(tuple -> someCondition(tuple.f1));

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json