or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md
tile.json

function-wrappers.mddocs/

Function Wrappers

Wrapper classes that adapt Hadoop Mapper and Reducer implementations to work as Flink functions, enabling reuse of existing MapReduce logic within Flink applications. These wrappers bridge the gap between Hadoop's MapReduce programming model and Flink's functional API.

Capabilities

HadoopMapFunction

Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction, allowing existing Hadoop Mapper implementations to be used directly in Flink transformations.

/**
 * Wrapper that maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction
 * @param <KEYIN> Input key type
 * @param <VALUEIN> Input value type  
 * @param <KEYOUT> Output key type
 * @param <VALUEOUT> Output value type
 */
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
    implements FlatMapFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>, 
               ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
               
    /**
     * Creates a HadoopMapFunction wrapper with default JobConf
     * @param hadoopMapper The Hadoop Mapper to wrap
     */
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
    
    /**
     * Creates a HadoopMapFunction wrapper with custom JobConf
     * @param hadoopMapper The Hadoop Mapper to wrap
     * @param conf JobConf for Hadoop configuration
     */
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
    
    /**
     * Opens and configures the mapper for processing
     * @param openContext Flink's open context for initialization
     * @throws Exception if mapper configuration fails
     */
    public void open(OpenContext openContext) throws Exception;
    
    /**
     * Processes input records using the wrapped Hadoop Mapper
     * @param value Input tuple containing key-value pair
     * @param out Collector for output tuples
     * @throws Exception if mapping operation fails
     */
    public void flatMap(
        Tuple2<KEYIN, VALUEIN> value, 
        Collector<Tuple2<KEYOUT, VALUEOUT>> out
    ) throws Exception;
    
    /**
     * Returns output type information for type safety
     * @return TypeInformation for output tuples
     */
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

Usage Examples:

import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;

// Custom Hadoop Mapper
public class WordTokenizer implements Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    @Override
    public void map(LongWritable key, Text value, 
                   OutputCollector<Text, IntWritable> output, Reporter reporter) {
        String[] words = value.toString().toLowerCase().split("\\s+");
        for (String w : words) {
            if (!w.isEmpty()) {
                word.set(w);
                output.collect(word, one);
            }
        }
    }
    
    @Override
    public void configure(JobConf job) {}
    
    @Override  
    public void close() {}
}

// Use in Flink application
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<LongWritable, Text>> lines = // ... input data

// Apply Hadoop Mapper as Flink function
DataSet<Tuple2<Text, IntWritable>> words = lines.flatMap(
    new HadoopMapFunction<>(new WordTokenizer())
);

// Continue with Flink operations
DataSet<Tuple2<Text, IntWritable>> wordCounts = words
    .groupBy(0)
    .sum(1);

HadoopReduceFunction

Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction, enabling existing Hadoop Reducer logic to work with Flink's grouped operations.

/**
 * Wrapper that maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction
 * @param <KEYIN> Input key type
 * @param <VALUEIN> Input value type
 * @param <KEYOUT> Output key type  
 * @param <VALUEOUT> Output value type
 */
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
               ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
               
    /**
     * Creates a HadoopReduceFunction wrapper with default JobConf
     * @param hadoopReducer The Hadoop Reducer to wrap
     */
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
    
    /**
     * Creates a HadoopReduceFunction wrapper with custom JobConf  
     * @param hadoopReducer The Hadoop Reducer to wrap
     * @param conf JobConf for Hadoop configuration
     */
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
    
    /**
     * Opens and configures the reducer for processing
     * @param openContext Flink's open context for initialization
     * @throws Exception if reducer configuration fails
     */
    public void open(OpenContext openContext) throws Exception;
    
    /**
     * Reduces input records using the wrapped Hadoop Reducer
     * @param values Iterable of input tuples with the same key
     * @param out Collector for output tuples
     * @throws Exception if reduce operation fails
     */
    public void reduce(
        Iterable<Tuple2<KEYIN, VALUEIN>> values,
        Collector<Tuple2<KEYOUT, VALUEOUT>> out
    ) throws Exception;
    
    /**
     * Returns output type information for type safety
     * @return TypeInformation for output tuples
     */
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

Usage Examples:

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.util.Iterator;

// Custom Hadoop Reducer
public class WordCountReducer implements Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    
    @Override
    public void reduce(Text key, Iterator<IntWritable> values,
                      OutputCollector<Text, IntWritable> output, Reporter reporter) {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        result.set(sum);
        output.collect(key, result);
    }
    
    @Override
    public void configure(JobConf job) {}
    
    @Override
    public void close() {}
}

// Use in Flink application
DataSet<Tuple2<Text, IntWritable>> words = // ... word data grouped by key

DataSet<Tuple2<Text, IntWritable>> wordCounts = words
    .groupBy(0)
    .reduceGroup(new HadoopReduceFunction<>(new WordCountReducer()));

HadoopReduceCombineFunction

Wrapper that maps both a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction, providing optimal performance through pre-aggregation.

/**
 * Wrapper that maps Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction
 * @param <KEYIN> Input key type
 * @param <VALUEIN> Input value type
 * @param <KEYOUT> Output key type
 * @param <VALUEOUT> Output value type
 */
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    implements GroupReduceFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYOUT, VALUEOUT>>,
               GroupCombineFunction<Tuple2<KEYIN, VALUEIN>, Tuple2<KEYIN, VALUEIN>>,
               ResultTypeQueryable<Tuple2<KEYOUT, VALUEOUT>> {
               
    /**
     * Creates a HadoopReduceCombineFunction wrapper with default JobConf
     * @param hadoopReducer The Hadoop Reducer to wrap
     * @param hadoopCombiner The Hadoop Combiner to wrap
     */
    public HadoopReduceCombineFunction(
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
        Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner
    );
    
    /**
     * Creates a HadoopReduceCombineFunction wrapper with custom JobConf
     * @param hadoopReducer The Hadoop Reducer to wrap
     * @param hadoopCombiner The Hadoop Combiner to wrap
     * @param conf JobConf for Hadoop configuration
     */
    public HadoopReduceCombineFunction(
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
        Reducer<KEYIN, VALUEIN, KEYIN, VALUEIN> hadoopCombiner,
        JobConf conf
    );
    
    /**
     * Opens and configures reducer and combiner for processing
     * @param openContext Flink's open context for initialization
     * @throws Exception if configuration fails
     */
    public void open(OpenContext openContext) throws Exception;
    
    /**
     * Reduces input records using the wrapped Hadoop Reducer
     * @param values Iterable of input tuples with the same key
     * @param out Collector for output tuples
     * @throws Exception if reduce operation fails
     */
    public void reduce(
        Iterable<Tuple2<KEYIN, VALUEIN>> values,
        Collector<Tuple2<KEYOUT, VALUEOUT>> out
    ) throws Exception;
    
    /**
     * Combines input records using the wrapped Hadoop Combiner for pre-aggregation
     * @param values Iterable of input tuples to combine
     * @param out Collector for combined tuples
     * @throws Exception if combine operation fails
     */
    public void combine(
        Iterable<Tuple2<KEYIN, VALUEIN>> values,
        Collector<Tuple2<KEYIN, VALUEIN>> out
    ) throws Exception;
    
    /**
     * Returns output type information for type safety
     * @return TypeInformation for output tuples
     */
    public TypeInformation<Tuple2<KEYOUT, VALUEOUT>> getProducedType();
}

Usage Examples:

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;

// Hadoop Combiner (same as reducer in this case)
public class WordCountCombiner implements Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    
    @Override
    public void reduce(Text key, Iterator<IntWritable> values,
                      OutputCollector<Text, IntWritable> output, Reporter reporter) {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        result.set(sum);
        output.collect(key, result);
    }
    
    @Override public void configure(JobConf job) {}
    @Override public void close() {}
}

// Use combinable reduce function
DataSet<Tuple2<Text, IntWritable>> words = // ... word data

DataSet<Tuple2<Text, IntWritable>> wordCounts = words
    .groupBy(0)
    .reduceGroup(new HadoopReduceCombineFunction<>(
        new WordCountReducer(),    // Final reducer
        new WordCountCombiner()    // Pre-aggregation combiner
    ));

Integration Patterns

Configuration Management

All function wrappers support Hadoop configuration through JobConf:

JobConf conf = new JobConf();
conf.set("mapred.textoutputformat.separator", "\t");
conf.setInt("mapred.max.split.size", 1024 * 1024);

HadoopMapFunction<LongWritable, Text, Text, IntWritable> mapFunc = 
    new HadoopMapFunction<>(new WordTokenizer(), conf);

Error Handling

Function wrappers properly propagate Hadoop exceptions:

  • Configuration errors: Invalid JobConf settings are reported during open()
  • Processing errors: Mapper/Reducer exceptions are propagated to Flink
  • Resource cleanup: Proper cleanup of Hadoop resources on failure or completion

Performance Optimization

Best practices for optimal performance:

  1. Use combiners: HadoopReduceCombineFunction for pre-aggregation
  2. Configure parallelism: Adjust Flink parallelism based on data size
  3. Memory management: Configure appropriate heap sizes for Hadoop operations
  4. Reuse objects: Hadoop's object reuse patterns are preserved

Migration from MapReduce

Direct Translation

Existing MapReduce jobs can be directly translated to Flink:

// Original MapReduce job structure
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordTokenizer.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);

// Equivalent Flink application
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input = // ... input
DataSet<Tuple2<Text, IntWritable>> result = input
    .flatMap(new HadoopMapFunction<>(new WordTokenizer()))
    .groupBy(0)
    .reduceGroup(new HadoopReduceCombineFunction<>(
        new WordCountReducer(),
        new WordCountReducer() // Same as combiner
    ));

Advantages over MapReduce

  1. Iterative algorithms: Flink's cyclic data flows vs MapReduce's acyclic model
  2. Memory management: Flink's managed memory vs Hadoop's disk-based shuffling
  3. Real-time processing: Stream processing capabilities alongside batch
  4. Lower latency: Reduced job startup and coordination overhead