CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-batch-connectors

Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources

Pending
Overview
Eval results
Files

hadoop.mddocs/

Hadoop Compatibility

Hadoop MapReduce API compatibility layer for Flink, enabling seamless reuse of existing Hadoop Mapper and Reducer implementations within Flink batch programs.

Capabilities

HadoopMapFunction

Wraps Hadoop Mapper (mapred API) as a Flink FlatMapFunction for seamless integration.

/**
 * Wraps Hadoop Mapper (mapred API) to Flink FlatMapFunction
 * @param <KEYIN> Input key type for the Hadoop mapper
 * @param <VALUEIN> Input value type for the Hadoop mapper  
 * @param <KEYOUT> Output key type from the Hadoop mapper
 * @param <VALUEOUT> Output value type from the Hadoop mapper
 */
@Public
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
        implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
    
    /**
     * Creates a HadoopMapFunction with a Hadoop mapper
     * @param hadoopMapper The Hadoop Mapper instance to wrap
     */
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
    
    /**
     * Creates a HadoopMapFunction with a Hadoop mapper and job configuration
     * @param hadoopMapper The Hadoop Mapper instance to wrap
     * @param conf Hadoop JobConf with configuration parameters
     */
    public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
    
    /**
     * Returns the type information for the output tuples
     * @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
     */
    public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}

Usage Example:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;

// Your existing Hadoop Mapper
public class WordCountMapper implements Mapper<LongWritable, Text, Text, LongWritable> {
    private final static LongWritable one = new LongWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, 
                   Reporter reporter) throws IOException {
        String[] words = value.toString().split("\\s+");
        for (String w : words) {
            word.set(w);
            output.collect(word, one);
        }
    }
    
    public void configure(JobConf job) {}
    public void close() throws IOException {}
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Wrap Hadoop mapper in Flink function
HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction = 
    new HadoopMapFunction<>(new WordCountMapper());

// Use in Flink program
DataSet<Tuple2<LongWritable, Text>> input = // ... your input data
DataSet<Tuple2<Text, LongWritable>> words = input.flatMap(mapFunction);

HadoopReduceFunction

Wraps Hadoop Reducer (mapred API) as a non-combinable Flink GroupReduceFunction.

/**
 * Wraps Hadoop Reducer (mapred API) to non-combinable Flink GroupReduceFunction
 * @param <KEYIN> Input key type for the Hadoop reducer
 * @param <VALUEIN> Input value type for the Hadoop reducer
 * @param <KEYOUT> Output key type from the Hadoop reducer
 * @param <VALUEOUT> Output value type from the Hadoop reducer
 */
@Public
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
        implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
    
    /**
     * Creates a HadoopReduceFunction with a Hadoop reducer
     * @param hadoopReducer The Hadoop Reducer instance to wrap
     */
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
    
    /**
     * Creates a HadoopReduceFunction with a Hadoop reducer and job configuration
     * @param hadoopReducer The Hadoop Reducer instance to wrap
     * @param conf Hadoop JobConf with configuration parameters
     */
    public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
    
    /**
     * Returns the type information for the output tuples
     * @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
     */
    public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}

Usage Example:

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

// Your existing Hadoop Reducer
public class WordCountReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
    public void reduce(Text key, Iterator<LongWritable> values,
                      OutputCollector<Text, LongWritable> output, Reporter reporter) 
                      throws IOException {
        long count = 0;
        while (values.hasNext()) {
            count += values.next().get();
        }
        output.collect(key, new LongWritable(count));
    }
    
    public void configure(JobConf job) {}
    public void close() throws IOException {}
}

// Wrap Hadoop reducer in Flink function
HadoopReduceFunction<Text, LongWritable, Text, LongWritable> reduceFunction = 
    new HadoopReduceFunction<>(new WordCountReducer());

// Use in Flink program (after grouping by key)
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
    .groupBy(0)  // Group by key (position 0 in tuple)
    .reduceGroup(reduceFunction);

HadoopReduceCombineFunction

Wraps both Hadoop Reducer and Combiner (mapred API) as a combinable Flink GroupReduceFunction for optimized processing.

/**
 * Wraps Hadoop Reducer and Combiner (mapred API) to combinable Flink GroupReduceFunction
 * @param <KEYIN> Input key type for the Hadoop reducer
 * @param <VALUEIN> Input value type for the Hadoop reducer
 * @param <KEYOUT> Output key type from the Hadoop reducer
 * @param <VALUEOUT> Output value type from the Hadoop reducer
 */
@Public
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
        extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
        implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
                   ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
    
    /**
     * Creates a HadoopReduceCombineFunction with separate reducer and combiner
     * @param hadoopReducer The Hadoop Reducer instance to wrap
     * @param hadoopCombiner The Hadoop Combiner instance to wrap
     */
    public HadoopReduceCombineFunction(
            Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
            Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner);
    
    /**
     * Creates a HadoopReduceCombineFunction with reducer, combiner, and job configuration
     * @param hadoopReducer The Hadoop Reducer instance to wrap
     * @param hadoopCombiner The Hadoop Combiner instance to wrap
     * @param conf Hadoop JobConf with configuration parameters
     */
    public HadoopReduceCombineFunction(
            Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
            Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
            JobConf conf);
    
    /**
     * Returns the type information for the output tuples
     * @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
     */
    public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}

Usage Example:

import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;

// Use the same reducer as both combiner and reducer for efficiency
WordCountReducer reducer = new WordCountReducer();
WordCountReducer combiner = new WordCountReducer();

// Wrap with combine functionality
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceCombineFunction = 
    new HadoopReduceCombineFunction<>(reducer, combiner);

// Use in Flink program with automatic combining
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
    .groupBy(0)
    .reduceGroup(reduceCombineFunction);

HadoopOutputCollector

Wraps Flink Collector as Hadoop OutputCollector for compatibility with Hadoop mapper/reducer implementations.

/**
 * Wraps Flink OutputCollector as Hadoop OutputCollector
 * @param <KEY> Key type for collected output
 * @param <VALUE> Value type for collected output
 */
public class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
    
    /**
     * Creates a new HadoopOutputCollector
     */
    public HadoopOutputCollector();
    
    /**
     * Sets the wrapped Flink collector
     * @param flinkCollector The Flink collector to wrap
     */
    public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector);
    
    /**
     * Collects a key-value pair (implementation of Hadoop OutputCollector interface)
     * @param key The key to collect
     * @param val The value to collect
     */
    public void collect(final KEY key, final VALUE val) throws IOException;
}

HadoopTupleUnwrappingIterator

Wraps Flink Tuple2 iterator to provide an iterator over values only, compatible with Hadoop reducer input format.

/**
 * Wraps Flink Tuple2 iterator into iterator over values only
 * @param <KEY> Key type of the tuples
 * @param <VALUE> Value type of the tuples
 */
public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
        extends TupleUnwrappingIterator<VALUE, KEY>
        implements java.io.Serializable {
    
    /**
     * Creates a new HadoopTupleUnwrappingIterator
     * @param keySerializer Serializer for the key type
     */
    public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer);
    
    /**
     * Sets the Flink iterator to wrap
     * @param iterator The Flink Tuple2 iterator
     */
    public void set(final Iterator<Tuple2<KEY,VALUE>> iterator);
    
    /**
     * Checks if more elements are available
     * @return true if more elements exist, false otherwise
     */
    public boolean hasNext();
    
    /**
     * Returns the next value in the iteration
     * @return The next value
     */
    public VALUE next();
    
    /**
     * Returns the current key associated with the last returned value
     * @return The current key
     */
    public KEY getCurrentKey();
    
    /**
     * Remove operation is not supported
     * @throws UnsupportedOperationException Always thrown
     */
    public void remove();
}

Complete Word Count Example

Here's a complete example showing how to use Hadoop MapReduce components in a Flink program:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;

public class FlinkHadoopWordCount {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // Read input text
        DataSet<String> text = env.fromElements(
            "Hello World",
            "Hello Flink",
            "Hello Hadoop"
        );
        
        // Convert to Hadoop input format (line number, text)
        DataSet<Tuple2<LongWritable, Text>> hadoopInput = text
            .map((String line) -> new Tuple2<>(
                new LongWritable(0), 
                new Text(line)
            ));
        
        // Use Hadoop Mapper
        HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction = 
            new HadoopMapFunction<>(new WordCountMapper());
        
        DataSet<Tuple2<Text, LongWritable>> words = hadoopInput.flatMap(mapFunction);
        
        // Use Hadoop Reducer with Combiner
        HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceFunction = 
            new HadoopReduceCombineFunction<>(
                new WordCountReducer(),  // reducer
                new WordCountReducer()   // combiner (same logic)
            );
        
        DataSet<Tuple2<Text, LongWritable>> wordCounts = words
            .groupBy(0)
            .reduceGroup(reduceFunction);
        
        wordCounts.print();
        env.execute("Hadoop-Flink Word Count");
    }
}

Common Types

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import java.io.Serializable;
import java.util.Iterator;

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-batch-connectors

docs

avro.md

hadoop.md

hbase.md

hcatalog.md

index.md

jdbc.md

tile.json