CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

output-formats.mddocs/

Output Format Integration

The Output Format Integration capability enables writing Flink DataSets to Hadoop OutputFormats, providing seamless integration with Hadoop ecosystem storage systems and custom output processing pipelines.

Overview

Flink's Hadoop compatibility layer wraps Hadoop OutputFormats to accept data from Flink DataSets. The integration supports both legacy MapRed API and modern MapReduce API, automatically converting Flink Tuple2 objects or Scala tuples to Hadoop key-value pairs.

HadoopOutputFormat Classes

MapRed HadoopOutputFormat (Java)

@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
    
    // Constructor with JobConf
    public HadoopOutputFormat(
        org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, 
        JobConf job);
    
    // Constructor with OutputCommitter and JobConf
    public HadoopOutputFormat(
        org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat,
        Class<OutputCommitter> outputCommitterClass,
        JobConf job);
    
    // Write a record to the output
    public void writeRecord(Tuple2<K, V> record) throws IOException;
}

MapReduce HadoopOutputFormat (Java)

@Public
public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
    
    // Constructor with Job
    public HadoopOutputFormat(
        org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, 
        Job job);
    
    // Write a record to the output
    public void writeRecord(Tuple2<K, V> record) throws IOException;
}

MapRed HadoopOutputFormat (Scala)

@Public
class HadoopOutputFormat[K, V] extends HadoopOutputFormatBase[K, V, (K, V)] {
    
    // Constructor with JobConf
    def this(mapredOutputFormat: OutputFormat[K, V], job: JobConf);
    
    // Constructor with OutputCommitter and JobConf
    def this(
        mapredOutputFormat: OutputFormat[K, V], 
        outputCommitterClass: Class[OutputCommitter], 
        job: JobConf);
    
    // Write a record to the output
    def writeRecord(record: (K, V)): Unit;
}

Usage Examples

Writing Text Files (Java)

import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;

// Configure output
JobConf conf = new JobConf();
conf.setOutputFormat(TextOutputFormat.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/output"));

// Create output format
HadoopOutputFormat<NullWritable, Text> hadoopOutput = 
    new HadoopOutputFormat<>(new TextOutputFormat<>(), conf);

// Prepare data for output
DataSet<String> textLines = env.fromElements("Line 1", "Line 2", "Line 3");
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
    line -> new Tuple2<>(NullWritable.get(), new Text(line))
);

// Write to Hadoop output
outputData.output(hadoopOutput);

Writing Sequence Files (Java)

import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

// Configure sequence file output
JobConf conf = new JobConf();
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(Text.class);
SequenceFileOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/sequence-output"));

// Create output format
HadoopOutputFormat<IntWritable, Text> seqOutput = 
    new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), conf);

// Prepare key-value data
DataSet<Tuple2<IntWritable, Text>> keyValueData = env.fromElements(
    new Tuple2<>(new IntWritable(1), new Text("First")),
    new Tuple2<>(new IntWritable(2), new Text("Second")),
    new Tuple2<>(new IntWritable(3), new Text("Third"))
);

// Write sequence file
keyValueData.output(seqOutput);

Writing with MapReduce API (Java)

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;

// Configure MapReduce job
Job job = Job.getInstance();
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
TextOutputFormat.setOutputPath(job, new Path("hdfs://namenode:port/mapreduce-output"));

// Create MapReduce output format
HadoopOutputFormat<NullWritable, Text> mapreduceOutput = 
    new HadoopOutputFormat<>(new TextOutputFormat<>(), job);

// Write data
DataSet<Tuple2<NullWritable, Text>> outputData = textLines.map(
    line -> new Tuple2<>(NullWritable.get(), new Text(line))
);
outputData.output(mapreduceOutput);

Scala Usage

import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.hadoop.mapred.{TextOutputFormat, JobConf}
import org.apache.hadoop.io.{Text, NullWritable}
import org.apache.hadoop.fs.Path

// Configure output
val conf = new JobConf()
conf.setOutputFormat(classOf[TextOutputFormat[NullWritable, Text]])
conf.setOutputKeyClass(classOf[NullWritable])
conf.setOutputValueClass(classOf[Text])
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/scala-output"))

// Create output format
val hadoopOutput = new HadoopOutputFormat(new TextOutputFormat[NullWritable, Text](), conf)

// Prepare and write data
val textLines = env.fromElements("Scala line 1", "Scala line 2", "Scala line 3")
val outputData = textLines.map(line => (NullWritable.get(), new Text(line)))
outputData.output(hadoopOutput)

Custom Output Formats

import com.example.CustomOutputFormat;
import com.example.CustomKey;
import com.example.CustomValue;

// Configure custom output format
JobConf conf = new JobConf();
conf.setOutputFormat(CustomOutputFormat.class);
conf.setOutputKeyClass(CustomKey.class);
conf.setOutputValueClass(CustomValue.class);
conf.set("custom.output.property", "custom-value");

// Use custom output format
HadoopOutputFormat<CustomKey, CustomValue> customOutput = 
    new HadoopOutputFormat<>(new CustomOutputFormat(), conf);

// Process and write custom data
DataSet<Tuple2<CustomKey, CustomValue>> customData = processedData.map(
    data -> new Tuple2<>(new CustomKey(data.getId()), new CustomValue(data.getContent()))
);
customData.output(customOutput);

Output Committer Integration

Hadoop OutputFormats often use OutputCommitters to manage the output lifecycle. The Hadoop compatibility layer properly integrates with these committers.

import org.apache.hadoop.mapred.FileOutputCommitter;

// Specify custom OutputCommitter
JobConf conf = new JobConf();
conf.setOutputFormat(TextOutputFormat.class);
// OutputCommitter is automatically handled, but can be customized if needed

HadoopOutputFormat<NullWritable, Text> outputWithCommitter = 
    new HadoopOutputFormat<>(
        new TextOutputFormat<>(), 
        FileOutputCommitter.class,  // Custom committer class
        conf
    );

Partitioning and Multiple Outputs

When writing to partitioned outputs or multiple files:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;

// Configure multiple output files
JobConf conf = new JobConf();
conf.setOutputFormat(MultipleTextOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);

// The key determines the output file name
DataSet<Tuple2<Text, Text>> partitionedData = processedData.map(
    data -> new Tuple2<>(
        new Text("partition-" + data.getPartition()),  // File name prefix
        new Text(data.getContent())                     // Content
    )
);

HadoopOutputFormat<Text, Text> multiOutput = 
    new HadoopOutputFormat<>(new MultipleTextOutputFormat<>(), conf);
partitionedData.output(multiOutput);

Error Handling

Output format operations may encounter various errors:

try {
    outputData.output(hadoopOutput);
    env.execute("Hadoop Output Job");
} catch (IOException e) {
    // Handle I/O errors during writing
    logger.error("Failed to write to Hadoop output: " + e.getMessage());
} catch (Exception e) {
    // Handle other execution errors
    logger.error("Job execution failed: " + e.getMessage());
}

Common exceptions include:

  • IOException - File system or network errors during writing
  • IllegalArgumentException - Invalid configuration or parameters
  • RuntimeException - Various Hadoop-related runtime errors
  • JobExecutionException - Flink job execution failures

Configuration Best Practices

Setting Output Paths

// Always use absolute paths for distributed file systems
TextOutputFormat.setOutputPath(conf, new Path("hdfs://namenode:port/full/path/to/output"));

// For local file system (testing only)
TextOutputFormat.setOutputPath(conf, new Path("file:///tmp/local/output"));

Compression Configuration

// Enable compression for text output
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", 
              org.apache.hadoop.io.compress.GzipCodec.class, 
              CompressionCodec.class);

// For sequence files
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK);

Performance Tuning

// Set appropriate block size for HDFS
conf.setLong("dfs.block.size", 134217728); // 128MB

// Configure buffer sizes
conf.setInt("io.file.buffer.size", 65536); // 64KB

// Set replication factor
conf.setInt("dfs.replication", 3);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json