Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The Output Format Integration capability enables writing Flink DataSets to Hadoop OutputFormats, providing seamless integration with Hadoop ecosystem storage systems and custom output processing pipelines.
Flink's Hadoop compatibility layer wraps Hadoop OutputFormats to accept data from Flink DataSets. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Flink Tuple2 objects or Scala tuples to Hadoop key-value pairs.
@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
// Constructor with JobConf
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
JobConf job);
// Constructor with OutputCommitter and JobConf
public HadoopOutputFormat(
org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
Class<OutputCommitter> outputCommitterClass,
JobConf job);
// Write a record to the output
public void writeRecord(Tuple2<K, V> record) throws IOException;
}@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
// Constructor with Job
public HadoopOutputFormat(
org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat,
Job job);
// Write a record to the output
public void writeRecord(Tuple2<K, V> record) throws IOException;
}@Public
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
// Constructor with JobConf
def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
// Constructor with OutputCommitter and JobConf
def this(
mapredOutputFormat: OutputFormat[K, V],
outputCommitterClass: Class[OutputCommitter],
job: JobConf);
// Write a record to the output
def writeRecord(record: (K, V)): Unit;
}import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
// Configure output
JobConf conf = new JobConf();
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/output"));
// Create output format
HadoopOutputFormat<NullWritable, Text> hadoopOutput =
new HadoopOutputFormat<>(new TextOutputFormat<>(), conf);
// Prepare data for output
DataSet<String> textLines = env.fromElements("Line 1", "Line 2", "Line 3");
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
line -> new Tuple2<>(NullWritable.get(), new Text(line))
);
// Write to Hadoop output
outputData.output(hadoopOutput);import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
// Configure sequence file output
JobConf conf = new JobConf();
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
SequenceFileOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/sequence-output"));
// Create output format
HadoopOutputFormat<IntWritable, Text> seqOutput =
new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), conf);
// Prepare key-value data
DataSet<Tuple2<IntWritable, Text>> keyValueData = env.fromElements(
new Tuple2<>(new IntWritable(1), new Text("First")),
new Tuple2<>(new IntWritable(2), new Text("Second")),
new Tuple2<>(new IntWritable(3), new Text("Third"))
);
// Write sequence file
keyValueData.output(seqOutput);import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
// Configure MapReduce job
Job job = Job.getInstance();
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://namenode:port/mapreduce-output"));
// Create MapReduce output format
HadoopOutputFormat<NullWritable, Text> mapreduceOutput =
new HadoopOutputFormat<>(new TextOutputFormat<>(), job);
// Write data
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
line -> new Tuple2<>(NullWritable.get(), new Text(line))
);
outputData.output(mapreduceOutput);import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
import org.apache.hadoop.io.{Text, NullWritable}
import org.apache.hadoop.fs.Path
// Configure output
val conf = new JobConf()
conf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
conf.setOutputKeyClass(classOf[NullWritable])
conf.setOutputValueClass(classOf[Text])
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/scala-output"))
// Create output format
val hadoopOutput = new HadoopOutputFormat(new TextOutputFormat[NullWritable, Text](), conf)
// Prepare and write data
val textLines = env.fromElements("Scala line 1", "Scala line 2", "Scala line 3")
val outputData = textLines.map(line => (NullWritable.get(), new Text(line)))
outputData.output(hadoopOutput)import com.example.CustomOutputFormat;
import com.example.CustomKey;
import com.example.CustomValue;
// Configure custom output format
JobConf conf = new JobConf();
conf.setOutputFormat(CustomOutputFormat.class);
conf.setOutputKeyClass(CustomKey.class);
conf.setOutputValueClass(CustomValue.class);
conf.set("custom.output.property", "custom-value");
// Use custom output format
HadoopOutputFormat<CustomKey, CustomValue> customOutput =
new HadoopOutputFormat<>(new CustomOutputFormat(), conf);
// Process and write custom data
DataSet<Tuple2<CustomKey, CustomValue>> customData = processedData.map(
data -> new Tuple2<>(new CustomKey(data.getId()), new CustomValue(data.getContent()))
);
customData.output(customOutput);Hadoop OutputFormats often use OutputCommitters to manage the output lifecycle. The Hadoop compatibility layer properly integrates with these committers.
import org.apache.hadoop.mapred.FileOutputCommitter;
// Specify custom OutputCommitter
JobConf conf = new JobConf();
conf.setOutputFormat(TextOutputFormat.class);
// OutputCommitter is automatically handled, but can be customized if needed
HadoopOutputFormat<NullWritable, Text> outputWithCommitter =
new HadoopOutputFormat<>(
new TextOutputFormat<>(),
FileOutputCommitter.class, // Custom committer class
conf
);When writing to partitioned outputs or multiple files:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
// Configure multiple output files
JobConf conf = new JobConf();
conf.setOutputFormat(MultipleTextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
// The key determines the output file name
DataSet<Tuple2<Text, Text>> partitionedData = processedData.map(
data -> new Tuple2<>(
new Text("partition-" + data.getPartition()), // File name prefix
new Text(data.getContent()) // Content
)
);
HadoopOutputFormat<Text, Text> multiOutput =
new HadoopOutputFormat<>(new MultipleTextOutputFormat<>(), conf);
partitionedData.output(multiOutput);Output format operations may encounter various errors:
try {
outputData.output(hadoopOutput);
env.execute("Hadoop Output Job");
} catch (IOException e) {
// Handle I/O errors during writing
logger.error("Failed to write to Hadoop output: " + e.getMessage());
} catch (Exception e) {
// Handle other execution errors
logger.error("Job execution failed: " + e.getMessage());
}Common exceptions include:
IOException - File system or network errors during writingIllegalArgumentException - Invalid configuration or parametersRuntimeException - Various Hadoop-related runtime errorsJobExecutionException - Flink job execution failures// Always use absolute paths for distributed file systems
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/full/path/to/output"));
// For local file system (testing only)
TextOutputFormat.setOutputPath(conf, new Path("file:///tmp/local/output"));// Enable compression for text output
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec",
org.apache.hadoop.io.compress.GzipCodec.class,
CompressionCodec.class);
// For sequence files
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);// Set appropriate block size for HDFS
conf.setLong("dfs.block.size", 134217728); // 128MB
// Configure buffer sizes
conf.setInt("io.file.buffer.size", 65536); // 64KB
// Set replication factor
conf.setInt("dfs.replication", 3);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11