Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
—
Hadoop MapReduce API compatibility layer for Flink, enabling seamless reuse of existing Hadoop Mapper and Reducer implementations within Flink batch programs.
Wraps Hadoop Mapper (mapred API) as a Flink FlatMapFunction for seamless integration.
/**
* Wraps Hadoop Mapper (mapred API) to Flink FlatMapFunction
* @param <KEYIN> Input key type for the Hadoop mapper
* @param <VALUEIN> Input value type for the Hadoop mapper
* @param <KEYOUT> Output key type from the Hadoop mapper
* @param <VALUEOUT> Output value type from the Hadoop mapper
*/
@Public
public class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
/**
* Creates a HadoopMapFunction with a Hadoop mapper
* @param hadoopMapper The Hadoop Mapper instance to wrap
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper);
/**
* Creates a HadoopMapFunction with a Hadoop mapper and job configuration
* @param hadoopMapper The Hadoop Mapper instance to wrap
* @param conf Hadoop JobConf with configuration parameters
*/
public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf);
/**
* Returns the type information for the output tuples
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
*/
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}Usage Example:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
// Your existing Hadoop Mapper
public class WordCountMapper implements Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output,
Reporter reporter) throws IOException {
String[] words = value.toString().split("\\s+");
for (String w : words) {
word.set(w);
output.collect(word, one);
}
}
public void configure(JobConf job) {}
public void close() throws IOException {}
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Wrap Hadoop mapper in Flink function
HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =
new HadoopMapFunction<>(new WordCountMapper());
// Use in Flink program
DataSet<Tuple2<LongWritable, Text>> input = // ... your input data
DataSet<Tuple2<Text, LongWritable>> words = input.flatMap(mapFunction);Wraps Hadoop Reducer (mapred API) as a non-combinable Flink GroupReduceFunction.
/**
* Wraps Hadoop Reducer (mapred API) to non-combinable Flink GroupReduceFunction
* @param <KEYIN> Input key type for the Hadoop reducer
* @param <VALUEIN> Input value type for the Hadoop reducer
* @param <KEYOUT> Output key type from the Hadoop reducer
* @param <VALUEOUT> Output value type from the Hadoop reducer
*/
@Public
public class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
/**
* Creates a HadoopReduceFunction with a Hadoop reducer
* @param hadoopReducer The Hadoop Reducer instance to wrap
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer);
/**
* Creates a HadoopReduceFunction with a Hadoop reducer and job configuration
* @param hadoopReducer The Hadoop Reducer instance to wrap
* @param conf Hadoop JobConf with configuration parameters
*/
public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf);
/**
* Returns the type information for the output tuples
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
*/
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}Usage Example:
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
// Your existing Hadoop Reducer
public class WordCountReducer implements Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
long count = 0;
while (values.hasNext()) {
count += values.next().get();
}
output.collect(key, new LongWritable(count));
}
public void configure(JobConf job) {}
public void close() throws IOException {}
}
// Wrap Hadoop reducer in Flink function
HadoopReduceFunction<Text, LongWritable, Text, LongWritable> reduceFunction =
new HadoopReduceFunction<>(new WordCountReducer());
// Use in Flink program (after grouping by key)
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
.groupBy(0) // Group by key (position 0 in tuple)
.reduceGroup(reduceFunction);Wraps both Hadoop Reducer and Combiner (mapred API) as a combinable Flink GroupReduceFunction for optimized processing.
/**
* Wraps Hadoop Reducer and Combiner (mapred API) to combinable Flink GroupReduceFunction
* @param <KEYIN> Input key type for the Hadoop reducer
* @param <VALUEIN> Input value type for the Hadoop reducer
* @param <KEYOUT> Output key type from the Hadoop reducer
* @param <VALUEOUT> Output value type from the Hadoop reducer
*/
@Public
public class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
/**
* Creates a HadoopReduceCombineFunction with separate reducer and combiner
* @param hadoopReducer The Hadoop Reducer instance to wrap
* @param hadoopCombiner The Hadoop Combiner instance to wrap
*/
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner);
/**
* Creates a HadoopReduceCombineFunction with reducer, combiner, and job configuration
* @param hadoopReducer The Hadoop Reducer instance to wrap
* @param hadoopCombiner The Hadoop Combiner instance to wrap
* @param conf Hadoop JobConf with configuration parameters
*/
public HadoopReduceCombineFunction(
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
JobConf conf);
/**
* Returns the type information for the output tuples
* @return TypeInformation for Tuple2<KEYOUT,VALUEOUT>
*/
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType();
}Usage Example:
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
// Use the same reducer as both combiner and reducer for efficiency
WordCountReducer reducer = new WordCountReducer();
WordCountReducer combiner = new WordCountReducer();
// Wrap with combine functionality
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceCombineFunction =
new HadoopReduceCombineFunction<>(reducer, combiner);
// Use in Flink program with automatic combining
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
.groupBy(0)
.reduceGroup(reduceCombineFunction);Wraps Flink Collector as Hadoop OutputCollector for compatibility with Hadoop mapper/reducer implementations.
/**
* Wraps Flink OutputCollector as Hadoop OutputCollector
* @param <KEY> Key type for collected output
* @param <VALUE> Value type for collected output
*/
public class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> {
/**
* Creates a new HadoopOutputCollector
*/
public HadoopOutputCollector();
/**
* Sets the wrapped Flink collector
* @param flinkCollector The Flink collector to wrap
*/
public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector);
/**
* Collects a key-value pair (implementation of Hadoop OutputCollector interface)
* @param key The key to collect
* @param val The value to collect
*/
public void collect(final KEY key, final VALUE val) throws IOException;
}Wraps Flink Tuple2 iterator to provide an iterator over values only, compatible with Hadoop reducer input format.
/**
* Wraps Flink Tuple2 iterator into iterator over values only
* @param <KEY> Key type of the tuples
* @param <VALUE> Value type of the tuples
*/
public class HadoopTupleUnwrappingIterator<KEY,VALUE>
extends TupleUnwrappingIterator<VALUE, KEY>
implements java.io.Serializable {
/**
* Creates a new HadoopTupleUnwrappingIterator
* @param keySerializer Serializer for the key type
*/
public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer);
/**
* Sets the Flink iterator to wrap
* @param iterator The Flink Tuple2 iterator
*/
public void set(final Iterator<Tuple2<KEY,VALUE>> iterator);
/**
* Checks if more elements are available
* @return true if more elements exist, false otherwise
*/
public boolean hasNext();
/**
* Returns the next value in the iteration
* @return The next value
*/
public VALUE next();
/**
* Returns the current key associated with the last returned value
* @return The current key
*/
public KEY getCurrentKey();
/**
* Remove operation is not supported
* @throws UnsupportedOperationException Always thrown
*/
public void remove();
}Here's a complete example showing how to use Hadoop MapReduce components in a Flink program:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
public class FlinkHadoopWordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read input text
DataSet<String> text = env.fromElements(
"Hello World",
"Hello Flink",
"Hello Hadoop"
);
// Convert to Hadoop input format (line number, text)
DataSet<Tuple2<LongWritable, Text>> hadoopInput = text
.map((String line) -> new Tuple2<>(
new LongWritable(0),
new Text(line)
));
// Use Hadoop Mapper
HadoopMapFunction<LongWritable, Text, Text, LongWritable> mapFunction =
new HadoopMapFunction<>(new WordCountMapper());
DataSet<Tuple2<Text, LongWritable>> words = hadoopInput.flatMap(mapFunction);
// Use Hadoop Reducer with Combiner
HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable> reduceFunction =
new HadoopReduceCombineFunction<>(
new WordCountReducer(), // reducer
new WordCountReducer() // combiner (same logic)
);
DataSet<Tuple2<Text, LongWritable>> wordCounts = words
.groupBy(0)
.reduceGroup(reduceFunction);
wordCounts.print();
env.execute("Hadoop-Flink Word Count");
}
}import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import java.io.Serializable;
import java.util.Iterator;