Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Wrapper classes that integrate Avro data with Hadoop's MapReduce framework. These classes provide schema-aware serialization, comparison, and seamless integration with MapReduce's key-value processing model while preserving Avro's type safety and schema evolution capabilities.
The foundational wrapper class for Avro data in MapReduce. Serialization is handled by AvroSerialization framework rather than implementing WritableComparable directly.
public class AvroWrapper<T> {
// Constructors
public AvroWrapper();
public AvroWrapper(T datum);
// Data access
public T datum();
public void datum(T datum);
// Object methods
public int hashCode();
public boolean equals(Object obj);
public String toString();
}import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.generic.GenericRecord;
// Create wrapper with data
GenericRecord user = ...; // Avro record
AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(user);
// Access wrapped data
GenericRecord data = wrapper.datum();
// Modify wrapped data
GenericRecord newUser = ...;
wrapper.datum(newUser);Specialized wrapper for MapReduce keys containing Avro data.
public class AvroKey<T> extends AvroWrapper<T> {
// Constructors
public AvroKey();
public AvroKey(T datum);
}import org.apache.avro.mapred.AvroKey;
import org.apache.avro.util.Utf8;
// Create key wrapper
AvroKey<Utf8> key = new AvroKey<>(new Utf8("user123"));
// Use in mapper/reducer
public class MyMapper extends AvroMapper<GenericRecord, GenericRecord> {
public void map(GenericRecord input, AvroCollector<GenericRecord> collector, Reporter reporter) {
AvroKey<Utf8> key = new AvroKey<>(new Utf8(input.get("id").toString()));
// Process...
}
}Specialized wrapper for MapReduce values containing Avro data.
public class AvroValue<T> extends AvroWrapper<T> {
// Constructors
public AvroValue();
public AvroValue(T datum);
}import org.apache.avro.mapred.AvroValue;
import org.apache.avro.generic.GenericRecord;
// Create value wrapper
GenericRecord userData = ...;
AvroValue<GenericRecord> value = new AvroValue<>(userData);
// Use in new MapReduce API
public class MyMapper extends Mapper<AvroKey<String>, AvroValue<GenericRecord>, Text, IntWritable> {
public void map(AvroKey<String> key, AvroValue<GenericRecord> value, Context context) {
GenericRecord record = value.datum();
// Process record...
}
}Schema-aware key-value pair implementation that integrates with Avro's record system.
public class Pair<K,V> implements IndexedRecord, Comparable<Pair>, SchemaConstructable {
// Primary constructors
public Pair(Schema schema);
public Pair(K key, Schema keySchema, V value, Schema valueSchema);
public Pair(Object key, Object value);
// Multiple type-specific convenience constructors available for various type combinations
// Data access
public K key();
public void key(K key);
public V value();
public void value(V value);
public void set(K key, V value);
// Schema operations
public Schema getSchema();
public static Schema getPairSchema(Schema key, Schema value);
public static Schema getKeySchema(Schema pair);
public static Schema getValueSchema(Schema pair);
// IndexedRecord interface
public Object get(int i);
public void put(int i, Object v);
// Comparable interface
public int compareTo(Pair o);
// Object methods
public boolean equals(Object obj);
public int hashCode();
public String toString();
}import org.apache.avro.mapred.Pair;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;
// Create pair with explicit schemas
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.create(Schema.Type.INT);
Schema pairSchema = Pair.getPairSchema(keySchema, valueSchema);
Pair<Utf8, Integer> pair = new Pair<>(pairSchema);
pair.set(new Utf8("count"), 42);
// Or create with data directly
Pair<Utf8, Integer> pair2 = new Pair<>(new Utf8("total"), 100);
// Access data
Utf8 key = pair.key();
Integer value = pair.value();
// Use in MapReduce
public class WordCountMapper extends AvroMapper<Utf8, Pair<Utf8, Integer>> {
public void map(Utf8 word, AvroCollector<Pair<Utf8, Integer>> collector, Reporter reporter) {
collector.collect(new Pair<>(word, 1));
}
}Helper class for working with key-value pairs stored in Avro generic records.
public class AvroKeyValue<K,V> {
// Constructor
public AvroKeyValue(GenericRecord keyValueRecord);
// Data access
public GenericRecord get();
public K getKey();
public V getValue();
public void setKey(K key);
public void setValue(V value);
// Schema utilities
public static Schema getSchema(Schema keySchema, Schema valueSchema);
// Constants
public static final String KEY_VALUE_PAIR_RECORD_NAME = "org.apache.avro.mapred.Pair";
public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = null;
public static final String KEY_FIELD = "key";
public static final String VALUE_FIELD = "value";
}import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
// Create key-value schema
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.create(Schema.Type.LONG);
Schema kvSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
// Create record
GenericRecord record = new GenericRecordBuilder(kvSchema)
.set("key", "user_id")
.set("value", 12345L)
.build();
// Wrap in helper
AvroKeyValue<String, Long> kv = new AvroKeyValue<>(record);
// Access data
String key = kv.getKey(); // "user_id"
Long value = kv.getValue(); // 12345L
// Modify data
kv.setKey("session_id");
kv.setValue(67890L);Iterator for processing collections of key-value pairs.
public static class AvroKeyValue.Iterator<K,V> implements Iterator<AvroKeyValue<K,V>> {
public Iterator(Iterator<GenericRecord> records);
public boolean hasNext();
public AvroKeyValue<K,V> next();
public void remove();
}import org.apache.avro.hadoop.io.AvroKeyValue;
import java.util.List;
import java.util.Iterator;
// Process multiple key-value records
List<GenericRecord> records = ...; // List of key-value records
AvroKeyValue.Iterator<String, Integer> iterator =
new AvroKeyValue.Iterator<>(records.iterator());
while (iterator.hasNext()) {
AvroKeyValue<String, Integer> kv = iterator.next();
String key = kv.getKey();
Integer value = kv.getValue();
// Process key-value pair...
}Data wrappers integrate seamlessly with Avro input and output formats:
// Legacy API - uses AvroWrapper directly
public class AvroInputFormat<T> extends FileInputFormat<AvroWrapper<T>, NullWritable> {
// Returns AvroWrapper<T> as keys
}
// New API - uses specific wrapper types
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
// Returns AvroKey<T> as keys
}
public class AvroKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
// Returns AvroKey<K> and AvroValue<V>
}Wrappers work with Hadoop's serialization system:
import org.apache.avro.hadoop.io.AvroSerialization;
// Configure serialization
Configuration conf = new Configuration();
AvroSerialization.addToConfiguration(conf);
// Wrappers are automatically serialized using Avro serialization
AvroKey<GenericRecord> key = new AvroKey<>(record);
// Hadoop serialization framework handles the restWrappers support Avro schema evolution:
// Writer schema (data was written with this schema)
Schema writerSchema = Schema.parse("...");
// Reader schema (current application schema)
Schema readerSchema = Schema.parse("...");
// Wrappers automatically handle schema evolution during deserialization
AvroKey<GenericRecord> key = new AvroKey<>();
// When deserialized, data is converted from writer to reader schemaWrappers support object reuse for better performance:
// Reuse wrapper objects in hot code paths
AvroKey<GenericRecord> reusableKey = new AvroKey<>();
AvroValue<GenericRecord> reusableValue = new AvroValue<>();
for (GenericRecord record : records) {
reusableKey.datum(record);
reusableValue.datum(record);
// Process without creating new wrapper objects
}// Clear references when done to help GC
wrapper.datum(null);Common issues and solutions:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred