Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs. These formats provide seamless integration between Avro's schema-based serialization and Hadoop's distributed file system, supporting both legacy and modern MapReduce APIs.
Input formats for the legacy org.apache.hadoop.mapred API that read Avro data and present it as wrapped objects.
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException;
// Configuration constants
public static final String IGNORE_FILES_WITHOUT_EXTENSION_KEY = "avro.mapred.ignore.inputs.without.extension";
public static final boolean IGNORE_INPUTS_WITHOUT_EXTENSION_DEFAULT = true;
}
public class AvroAsTextInputFormat<T> extends AvroInputFormat<T> {
// Reads Avro data but presents as text representation
}
public class AvroUtf8InputFormat extends AvroInputFormat<Utf8> {
// Specialized for reading Avro UTF-8 string data
}import org.apache.avro.mapred.AvroInputFormat;
import org.apache.avro.mapred.AvroJob;
import org.apache.hadoop.mapred.JobConf;
// Configure job for Avro input
JobConf job = new JobConf();
job.setInputFormat(AvroInputFormat.class);
AvroJob.setInputSchema(job, userSchema);
// Input format will read Avro container files and produce AvroWrapper<T> keysOutput formats for writing Avro data from MapReduce jobs using the legacy API.
public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;
}
public class AvroTextOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
// Writes Avro data as text representation
}import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroJob;
// Configure job for Avro output
job.setOutputFormat(AvroOutputFormat.class);
AvroJob.setOutputSchema(job, outputSchema);
AvroJob.setOutputCodec(job, "snappy");
// Output format will write AvroWrapper<T> data as Avro container filesInput formats for the modern org.apache.hadoop.mapreduce API with enhanced key-value separation.
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}
public class CombineAvroKeyValueFileInputFormat<K,V> extends CombineFileInputFormat<AvroKey<K>, AvroValue<V>> {
// Optimized for processing many small Avro files
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException;
}import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyValueInputFormat;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.mapreduce.Job;
// Single key input format
Job job = Job.getInstance();
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);
// Key-value input format
job.setInputFormatClass(AvroKeyValueInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);
AvroJob.setInputValueSchema(job, valueSchema);
// For many small files
job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);Output formats for writing Avro data using the modern MapReduce API.
public class AvroKeyOutputFormat<T> extends FileOutputFormat<AvroKey<T>, NullWritable> {
public RecordWriter<AvroKey<T>, NullWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
public class AvroKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
public abstract class AvroOutputFormatBase<K,V> extends FileOutputFormat<K,V> {
// Base class providing common functionality for Avro output formats
protected static class AvroRecordWriter<K,V> extends RecordWriter<K,V> {
public void write(K key, V value) throws IOException, InterruptedException;
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
}
}import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroKeyValueOutputFormat;
import org.apache.avro.mapreduce.AvroJob;
// Single key output format
job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroJob.setOutputKeySchema(job, outputKeySchema);
// Key-value output format
job.setOutputFormatClass(AvroKeyValueOutputFormat.class);
AvroJob.setOutputKeySchema(job, outputKeySchema);
AvroJob.setOutputValueSchema(job, outputValueSchema);Formats for reading and writing Avro data in Hadoop SequenceFile format.
public class AvroSequenceFileInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}
public class AvroSequenceFileOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
}
// Legacy API equivalent
public class SequenceFileInputFormat<K,V> extends FileInputFormat<AvroWrapper<K>, AvroWrapper<V>> {
public RecordReader<AvroWrapper<K>, AvroWrapper<V>> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException;
}import org.apache.avro.mapreduce.AvroSequenceFileInputFormat;
import org.apache.avro.mapreduce.AvroSequenceFileOutputFormat;
// Read from SequenceFile with Avro serialization
job.setInputFormatClass(AvroSequenceFileInputFormat.class);
AvroJob.setInputKeySchema(job, keySchema);
AvroJob.setInputValueSchema(job, valueSchema);
// Write to SequenceFile with Avro serialization
job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
AvroJob.setOutputKeySchema(job, keySchema);
AvroJob.setOutputValueSchema(job, valueSchema);Base classes and implementations for reading and writing Avro records.
public abstract class AvroRecordReaderBase<K,V,T> extends RecordReader<K,V> {
// Base class for Avro record readers
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
public boolean nextKeyValue() throws IOException, InterruptedException;
public float getProgress() throws IOException, InterruptedException;
public void close() throws IOException;
// Abstract methods for specific implementations
public abstract K getCurrentKey() throws IOException, InterruptedException;
public abstract V getCurrentValue() throws IOException, InterruptedException;
}
public class AvroKeyRecordReader<T> extends AvroRecordReaderBase<AvroKey<T>, NullWritable, T> {
public AvroKey<T> getCurrentKey() throws IOException, InterruptedException;
public NullWritable getCurrentValue() throws IOException, InterruptedException;
}
public class AvroKeyValueRecordReader<K,V> extends AvroRecordReaderBase<AvroKey<K>, AvroValue<V>, Pair<K,V>> {
public AvroKey<K> getCurrentKey() throws IOException, InterruptedException;
public AvroValue<V> getCurrentValue() throws IOException, InterruptedException;
}
public class AvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable> {
public void write(AvroKey<T> key, NullWritable value) throws IOException, InterruptedException;
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
}
public class AvroKeyValueRecordWriter<K,V> extends RecordWriter<AvroKey<K>, AvroValue<V>> {
public void write(AvroKey<K> key, AvroValue<V> value) throws IOException, InterruptedException;
public void close(TaskAttemptContext context) throws IOException, InterruptedException;
}Support for writing to multiple output files from a single job.
// Legacy API
public class AvroMultipleOutputs {
public AvroMultipleOutputs(JobConf job);
public <T> void write(String name, AvroWrapper<T> key, NullWritable value) throws IOException;
public void close() throws IOException;
public static void addNamedOutput(JobConf job, String name, Class<? extends OutputFormat> outputFormat, Schema schema);
public static void setCountersEnabled(JobConf job, boolean enabled);
}
// New API
public class org.apache.avro.mapreduce.AvroMultipleOutputs {
public AvroMultipleOutputs(TaskAttemptContext context);
public <K> void write(K key, NullWritable value, String baseOutputPath) throws IOException, InterruptedException;
public <K,V> void write(K key, V value, String baseOutputPath) throws IOException, InterruptedException;
public void close() throws IOException, InterruptedException;
public static void addNamedOutput(Job job, String name, Class<? extends OutputFormat> outputFormat,
Class<?> keyClass, Class<?> valueClass);
public static void setCountersEnabled(Job job, boolean enabled);
}import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
// Configure multiple outputs
AvroMultipleOutputs.addNamedOutput(job, "users", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);
AvroMultipleOutputs.addNamedOutput(job, "events", AvroKeyOutputFormat.class, AvroKey.class, NullWritable.class);
// In reducer
public class MyReducer extends Reducer<Text, IntWritable, AvroKey<GenericRecord>, NullWritable> {
private AvroMultipleOutputs multipleOutputs;
protected void setup(Context context) {
multipleOutputs = new AvroMultipleOutputs(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
// Write to different outputs based on logic
if (key.toString().startsWith("user_")) {
multipleOutputs.write(new AvroKey<>(userRecord), NullWritable.get(), "users");
} else {
multipleOutputs.write(new AvroKey<>(eventRecord), NullWritable.get(), "events");
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close();
}
}Control which files are processed by input formats:
// Ignore files without .avro extension
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, true);
// Process all files regardless of extension
job.setBoolean(AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false);All output formats support Avro's compression codecs:
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapreduce.AvroJob;
// Legacy API
AvroJob.setOutputCodec(job, "snappy");
AvroJob.setOutputCodec(job, "deflate");
AvroJob.setOutputCodec(job, "bzip2");
// New API - via configuration
job.getConfiguration().set("avro.mapreduce.output.codec", "snappy");Input/output formats automatically use schemas configured via AvroJob:
// Schemas set via AvroJob are automatically picked up by formats
AvroJob.setInputSchema(job, inputSchema); // Used by AvroInputFormat
AvroJob.setOutputSchema(job, outputSchema); // Used by AvroOutputFormat
// New API with separate key/value schemas
AvroJob.setInputKeySchema(job, keySchema); // Used by AvroKeyInputFormat
AvroJob.setInputValueSchema(job, valueSchema); // Used by AvroKeyValueInputFormatUse CombineAvroKeyValueFileInputFormat for many small files:
job.setInputFormatClass(CombineAvroKeyValueFileInputFormat.class);
// Configure combine parameters
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", 128 * 1024 * 1024);
job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", 64 * 1024 * 1024);Input formats handle memory efficiently by:
Common issues and solutions:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred