Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
npx @tessl/cli install tessl/maven-org-apache-avro--avro-mapred@1.12.0A comprehensive Hadoop MapReduce compatible API for using Apache Avro serialization in distributed data processing pipelines. This library provides seamless integration between Avro's schema-based serialization system and Hadoop's MapReduce framework, supporting both legacy (org.apache.hadoop.mapred) and modern (org.apache.hadoop.mapreduce) APIs with efficient serialization, file I/O, and cross-language capabilities.
pom.xml:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.12.0</version>
</dependency>Key imports for MapReduce job development:
// Legacy MapReduce API (org.apache.hadoop.mapred)
import org.apache.avro.mapred.*;
// New MapReduce API (org.apache.hadoop.mapreduce)
import org.apache.avro.mapreduce.*;
// Data wrappers
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.Pair;
// Serialization
import org.apache.avro.hadoop.io.AvroSerialization;
// File utilities
import org.apache.avro.hadoop.file.SortedKeyValueFile;import org.apache.avro.Schema;
import org.apache.avro.mapred.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
// Configure job for Avro input/output
JobConf job = new JobConf();
AvroJob.setInputSchema(job, inputSchema);
AvroJob.setOutputSchema(job, outputSchema);
AvroJob.setMapperClass(job, MyAvroMapper.class);
AvroJob.setReducerClass(job, MyAvroReducer.class);
// Set input/output formats
job.setInputFormat(AvroInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.generic.GenericRecord;
// Wrap Avro data for MapReduce
AvroKey<GenericRecord> key = new AvroKey<GenericRecord>(record);
AvroValue<GenericRecord> value = new AvroValue<GenericRecord>(data);
// Access wrapped data
GenericRecord keyData = key.datum();
GenericRecord valueData = value.datum();import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
// Set schemas for different stages of the job
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
AvroJob.setInputSchema(job, userSchema);
AvroJob.setMapOutputSchema(job, userSchema);
AvroJob.setOutputSchema(job, userSchema);Apache Avro MapReduce is organized around several key architectural components:
org.apache.avro.mapred): Compatible with org.apache.hadoop.mapredorg.apache.avro.mapreduce): Compatible with org.apache.hadoop.mapreduceCore utilities for configuring MapReduce jobs with Avro schemas and data models, supporting both legacy and modern Hadoop APIs.
// Legacy API (org.apache.avro.mapred.AvroJob)
public class AvroJob {
// Schema configuration
public static void setInputSchema(JobConf job, Schema schema);
public static void setMapOutputSchema(JobConf job, Schema schema);
public static void setOutputSchema(JobConf job, Schema schema);
public static Schema getInputSchema(Configuration job);
public static Schema getMapOutputSchema(Configuration job);
public static Schema getOutputSchema(Configuration job);
// Job component configuration
public static void setMapperClass(JobConf job, Class<? extends AvroMapper> c);
public static void setCombinerClass(JobConf job, Class<? extends AvroReducer> c);
public static void setReducerClass(JobConf job, Class<? extends AvroReducer> c);
// Output configuration
public static void setOutputCodec(JobConf job, String codec);
public static void setOutputMeta(JobConf job, String key, String value);
public static void setOutputMeta(JobConf job, String key, long value);
public static void setOutputMeta(JobConf job, String key, byte[] value);
// Input format configuration
public static void setInputSequenceFile(JobConf job);
// Data model configuration
public static void setReflect(JobConf job);
public static void setInputReflect(JobConf job);
public static void setMapOutputReflect(JobConf job);
public static void setDataModelClass(JobConf job, Class<? extends GenericData> modelClass);
public static Class<? extends GenericData> getDataModelClass(Configuration conf);
public static GenericData createDataModel(Configuration conf);
public static GenericData createInputDataModel(Configuration conf);
public static GenericData createMapOutputDataModel(Configuration conf);
}
// New API (org.apache.avro.mapreduce.AvroJob)
public class org.apache.avro.mapreduce.AvroJob {
// Schema setters
public static void setInputKeySchema(Job job, Schema schema);
public static void setInputValueSchema(Job job, Schema schema);
public static void setMapOutputKeySchema(Job job, Schema schema);
public static void setMapOutputValueSchema(Job job, Schema schema);
public static void setOutputKeySchema(Job job, Schema schema);
public static void setOutputValueSchema(Job job, Schema schema);
// Schema getters
public static Schema getInputKeySchema(Configuration conf);
public static Schema getInputValueSchema(Configuration conf);
public static Schema getMapOutputKeySchema(Configuration conf);
public static Schema getMapOutputValueSchema(Configuration conf);
public static Schema getOutputKeySchema(Configuration conf);
public static Schema getOutputValueSchema(Configuration conf);
// Data model configuration
public static void setDataModelClass(Job job, Class<? extends GenericData> modelClass);
}Wrapper classes that integrate Avro data with Hadoop's MapReduce framework, providing schema-aware serialization and comparison.
public class AvroWrapper<T> {
public AvroWrapper();
public AvroWrapper(T datum);
public T datum();
public void datum(T datum);
public boolean equals(Object obj);
public int hashCode();
public String toString();
}
public class AvroKey<T> extends AvroWrapper<T> {
public AvroKey();
public AvroKey(T datum);
}
public class AvroValue<T> extends AvroWrapper<T> {
public AvroValue();
public AvroValue(T datum);
}
public class Pair<K,V> implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
// Primary constructors
public Pair(Schema schema);
public Pair(K key, Schema keySchema, V value, Schema valueSchema);
public Pair(Object key, Object value);
// Multiple convenience constructors for type combinations omitted for brevity
// Schema methods
public Schema getSchema();
public static Schema getPairSchema(Schema key, Schema value);
public static Schema getKeySchema(Schema pair);
public static Schema getValueSchema(Schema pair);
// Data access methods
public K key();
public void key(K key);
public V value();
public void value(V value);
public void set(K key, V value);
// IndexedRecord implementation
public Object get(int i);
public void put(int i, Object o);
// Comparison and equality
public int compareTo(Pair that);
public boolean equals(Object o);
public int hashCode();
public String toString();
}Specialized InputFormat and OutputFormat implementations for reading and writing Avro data in various formats within MapReduce jobs.
// Legacy API
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
public RecordReader<AvroWrapper<T>, NullWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter);
}
public class AvroOutputFormat<T> extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress);
}
// New API
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
public RecordReader<AvroKey<T>, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context);
}
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(InputSplit split, TaskAttemptContext context);
}Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange and schema management.
public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {
public boolean accept(Class<?> c);
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);
public static void addToConfiguration(Configuration conf);
public static void setKeyWriterSchema(Configuration conf, Schema schema);
public static void setValueWriterSchema(Configuration conf, Schema schema);
}
public class AvroSerializer<T> {
public AvroSerializer(Schema writerSchema);
public void serialize(AvroWrapper<T> avroWrapper);
}
public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> {
public abstract T deserialize(T avroWrapperToReuse);
public Schema getWriterSchema();
public Schema getReaderSchema();
}Advanced file handling utilities for sorted key-value files, sequence files, and compression codec integration.
public class SortedKeyValueFile {
public static class Reader<K,V> {
public Reader(Options options);
public V get(K key);
public Iterator<AvroKeyValue<K,V>> iterator();
public void close();
}
public static class Writer<K,V> {
public Writer(Options options);
public void append(K key, V value);
public void close();
}
}
public class AvroSequenceFile {
public static SequenceFile.Writer createWriter(Writer.Options options);
public static class Reader {
public Reader(Reader.Options options);
}
}
public class HadoopCodecFactory {
public static CodecFactory fromHadoopString(String hadoopCodecClass);
public static String getAvroCodecName(String hadoopCodecClass);
}Base classes and utilities for implementing Avro-aware mappers and reducers in the legacy MapReduce API.
public abstract class AvroMapper<IN,OUT> {
public abstract void map(IN datum, AvroCollector<OUT> collector, Reporter reporter);
public void configure(JobConf jobConf);
public void close();
}
public abstract class AvroReducer<K,V,OUT> {
public abstract void reduce(K key, Iterable<V> values, AvroCollector<OUT> collector, Reporter reporter);
public void configure(JobConf jobConf);
public void close();
}
public abstract class AvroCollector<T> {
public abstract void collect(T datum);
}Tether framework for implementing MapReduce jobs in non-Java languages while maintaining Avro data integration and schema compatibility.
public class TetherJob {
public static void setExecutable(JobConf job, File executable);
public static void setInputSchema(JobConf job, Schema schema);
public static void setMapOutputSchema(JobConf job, Schema schema);
public static void setOutputSchema(JobConf job, Schema schema);
}
public class TetherInputFormat extends FileInputFormat<AvroKey<Object>, AvroValue<Object>> {
public RecordReader<AvroKey<Object>, AvroValue<Object>> createRecordReader(InputSplit split, TaskAttemptContext context);
}
public class TetherOutputFormat extends AvroOutputFormat<Object> {
public RecordWriter<AvroKey<Object>, AvroValue<Object>> getRecordWriter(TaskAttemptContext context);
}// Base wrapper type
public class AvroWrapper<T> {
// Implementation details in Data Wrappers section
}
// Key-value pair type
public class AvroKeyValue<K,V> {
public AvroKeyValue(GenericRecord keyValueRecord);
public K getKey();
public V getValue();
public void setKey(K key);
public void setValue(V value);
public static Schema getSchema(Schema keySchema, Schema valueSchema);
}
// Options classes for file operations
public static class SortedKeyValueFile.Reader.Options {
// Builder pattern for reader configuration
}
public static class SortedKeyValueFile.Writer.Options {
// Builder pattern for writer configuration
}public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {
public int compare(AvroKey<T> x, AvroKey<T> y);
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public class AvroCharSequenceComparator<T> implements Comparator<T> {
public int compare(T o1, T o2);
public static final AvroCharSequenceComparator<CharSequence> INSTANCE;
}public abstract class AvroDatumConverter<INPUT,OUTPUT> {
public abstract OUTPUT convert(INPUT input);
public abstract Schema getWriterSchema();
}
public class AvroDatumConverterFactory {
public AvroDatumConverterFactory(Configuration conf);
public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);
}