Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing
—
Integration for writing data to Hadoop OutputFormats from Flink applications. Supports both legacy mapred API and newer mapreduce API with automatic conversion from Flink Tuple2 objects to Hadoop key-value pairs.
Wrapper class for Hadoop OutputFormats using the legacy mapred API.
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
/**
* Constructor with basic configuration
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
* @param job JobConf configuration for the Hadoop job
*/
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
JobConf job
);
/**
* Constructor with custom OutputCommitter
* @param mapredOutputFormat The Hadoop OutputFormat to wrap
* @param outputCommitterClass Class of the OutputCommitter to use
* @param job JobConf configuration for the Hadoop job
*/
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
Class<OutputCommitter> outputCommitterClass,
JobConf job
);
/**
* Write a record to the Hadoop OutputFormat
* @param record The record to write as a Tuple2<K, V>
* @throws IOException if writing fails
*/
public void writeRecord(Tuple2<K, V> record) throws IOException;
}Usage Example:
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
// Configure JobConf for output
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", "hdfs://output/path");
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
// Create Hadoop output format wrapper
HadoopOutputFormat<LongWritable, Text> hadoopOutput =
new HadoopOutputFormat<>(
new TextOutputFormat<LongWritable, Text>(),
jobConf
);
// Use with Flink DataSet
DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset
dataset.output(hadoopOutput);Wrapper class for Hadoop OutputFormats using the newer mapreduce API.
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
/**
* Constructor for mapreduce OutputFormat
* @param mapreduceOutputFormat The Hadoop OutputFormat to wrap
* @param job Job configuration for the Hadoop job
*/
public HadoopOutputFormat(
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
Job job
);
/**
* Write a record to the Hadoop OutputFormat
* @param record The record to write as a Tuple2<K, V>
* @throws IOException if writing fails
*/
public void writeRecord(Tuple2<K, V> record) throws IOException;
}Usage Example:
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
// Configure Job for output
Job job = Job.getInstance();
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://output/path"));
// Create Hadoop output format wrapper
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<LongWritable, Text> mapreduceOutput =
new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(
new TextOutputFormat<LongWritable, Text>(),
job
);
// Use with Flink DataSet
DataSet<Tuple2<LongWritable, Text>> dataset = // ... your dataset
dataset.output(mapreduceOutput);Writing plain text files using TextOutputFormat.
mapred API:
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", "hdfs://output/text");
jobConf.setOutputFormat(TextOutputFormat.class);
HadoopOutputFormat<NullWritable, Text> textOutput =
new HadoopOutputFormat<>(
new TextOutputFormat<NullWritable, Text>(),
jobConf
);
// Convert strings to Tuple2<NullWritable, Text>
DataSet<String> stringDataset = // ... your string dataset
DataSet<Tuple2<NullWritable, Text>> tuples = stringDataset.map(
s -> new Tuple2<>(NullWritable.get(), new Text(s))
);
tuples.output(textOutput);mapreduce API:
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
Job job = Job.getInstance();
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://output/text"));
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<NullWritable, Text> textOutput =
new org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat<>(
new TextOutputFormat<NullWritable, Text>(),
job
);Writing Hadoop sequence files with key-value pairs.
mapred API:
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", "hdfs://output/sequence");
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
jobConf.setOutputKeyClass(IntWritable.class);
jobConf.setOutputValueClass(Text.class);
HadoopOutputFormat<IntWritable, Text> sequenceOutput =
new HadoopOutputFormat<>(
new SequenceFileOutputFormat<IntWritable, Text>(),
jobConf
);
DataSet<Tuple2<IntWritable, Text>> keyValuePairs = // ... your dataset
keyValuePairs.output(sequenceOutput);Using MultipleTextOutputFormat to write to multiple files based on keys.
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
// Custom MultipleTextOutputFormat that partitions by key prefix
public static class KeyPartitionedOutput extends MultipleTextOutputFormat<Text, Text> {
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String name) {
return key.toString().substring(0, 1) + "/" + name;
}
}
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", "hdfs://output/partitioned");
jobConf.setOutputFormat(KeyPartitionedOutput.class);
HadoopOutputFormat<Text, Text> partitionedOutput =
new HadoopOutputFormat<>(
new KeyPartitionedOutput(),
jobConf
);Standard configuration for common output scenarios.
// mapred API configuration
JobConf jobConf = new JobConf();
jobConf.set("mapred.output.dir", outputPath);
jobConf.setOutputFormat(outputFormatClass);
jobConf.setOutputKeyClass(keyClass);
jobConf.setOutputValueClass(valueClass);
// mapreduce API configuration
Job job = Job.getInstance();
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
OutputFormat.setOutputPath(job, new Path(outputPath));Enabling compression for output files.
// Enable compression in JobConf
JobConf jobConf = new JobConf();
jobConf.setBoolean("mapred.output.compress", true);
jobConf.setClass("mapred.output.compression.codec",
GzipCodec.class, CompressionCodec.class);
jobConf.set("mapred.output.compression.type", "BLOCK");
// Enable compression in Job
Job job = Job.getInstance();
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);
job.getConfiguration().setClass("mapreduce.output.fileoutputformat.compress.codec",
GzipCodec.class, CompressionCodec.class);Using custom OutputCommitter for advanced output coordination.
import org.apache.hadoop.mapred.FileOutputCommitter;
// Custom committer that moves files to final location
public class CustomOutputCommitter extends FileOutputCommitter {
@Override
public void commitJob(JobContext context) throws IOException {
super.commitJob(context);
// Custom logic after job completion
}
}
HadoopOutputFormat<K, V> outputFormat =
new HadoopOutputFormat<>(
hadoopOutputFormat,
CustomOutputCommitter.class,
jobConf
);All output formats consume Tuple2<K, V> objects where:
f0 contains the key to be writtenf1 contains the value to be writtenBoth JobConf (mapred) and Job (mapreduce) configuration objects are supported, allowing use of existing Hadoop configuration patterns.
Custom OutputCommitter classes can be specified for advanced output coordination and cleanup operations.
IOException is thrown for write operations, maintaining consistency with Hadoop's exception handling patterns.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility