Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines
—
Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange, schema management, and seamless conversion between Java objects and Avro data formats within MapReduce pipelines.
Main serialization class that registers Avro data types with Hadoop's serialization framework.
public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {
// Core serialization interface
public boolean accept(Class<?> c);
public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);
public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);
// Configuration management
public static void addToConfiguration(Configuration conf);
// Schema configuration
public static void setKeyWriterSchema(Configuration conf, Schema schema);
public static void setKeyReaderSchema(Configuration conf, Schema schema);
public static void setValueWriterSchema(Configuration conf, Schema schema);
public static void setValueReaderSchema(Configuration conf, Schema schema);
// Schema retrieval
public static Schema getKeyWriterSchema(Configuration conf);
public static Schema getKeyReaderSchema(Configuration conf);
public static Schema getValueWriterSchema(Configuration conf);
public static Schema getValueReaderSchema(Configuration conf);
// Data model support
public static GenericData createDataModel(Configuration conf);
}import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.hadoop.conf.Configuration;
// Enable Avro serialization
Configuration conf = new Configuration();
AvroSerialization.addToConfiguration(conf);
// Configure schemas
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
AvroSerialization.setKeyWriterSchema(conf, userSchema);
AvroSerialization.setValueWriterSchema(conf, userSchema);
// Retrieve configured schemas
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);
Schema valueSchema = AvroSerialization.getValueWriterSchema(conf);Serializer implementation for converting AvroWrapper objects to binary format.
public class AvroSerializer<T> implements Serializer<AvroWrapper<T>> {
// Constructors
public AvroSerializer(Schema writerSchema);
public AvroSerializer(Schema writerSchema, DatumWriter<T> datumWriter);
// Configuration
public Schema getWriterSchema();
// Serialization lifecycle
public void open(OutputStream outputStream) throws IOException;
public void serialize(AvroWrapper<T> avroWrapper) throws IOException;
public void close() throws IOException;
}import org.apache.avro.hadoop.io.AvroSerializer;
import org.apache.avro.mapred.AvroWrapper;
import java.io.ByteArrayOutputStream;
// Create serializer
Schema schema = Schema.parse("...");
AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema);
// Serialize data
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.open(out);
AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(record);
serializer.serialize(wrapper);
serializer.close();
byte[] serializedData = out.toByteArray();Base deserializer class for converting binary data back to AvroWrapper objects.
public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> implements Deserializer<T> {
// Schema access
public Schema getWriterSchema();
public Schema getReaderSchema();
// Deserialization lifecycle
public void open(InputStream inputStream) throws IOException;
public abstract T deserialize(T avroWrapperToReuse) throws IOException;
public void close() throws IOException;
}
public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> {
public AvroKey<D> deserialize(AvroKey<D> avroWrapperToReuse) throws IOException;
}
public class AvroValueDeserializer<D> extends AvroDeserializer<AvroValue<D>, D> {
public AvroValue<D> deserialize(AvroValue<D> avroWrapperToReuse) throws IOException;
}import org.apache.avro.hadoop.io.AvroKeyDeserializer;
import org.apache.avro.mapred.AvroKey;
import java.io.ByteArrayInputStream;
// Create deserializer
AvroKeyDeserializer<GenericRecord> deserializer = new AvroKeyDeserializer<>();
// Deserialize data
ByteArrayInputStream in = new ByteArrayInputStream(serializedData);
deserializer.open(in);
AvroKey<GenericRecord> key = new AvroKey<>();
AvroKey<GenericRecord> result = deserializer.deserialize(key);
deserializer.close();
GenericRecord record = result.datum();Framework for converting between different data formats and Avro.
public abstract class AvroDatumConverter<INPUT,OUTPUT> {
// Core conversion method
public abstract OUTPUT convert(INPUT input);
// Schema information
public abstract Schema getWriterSchema();
}
public class AvroDatumConverterFactory {
// Constructor
public AvroDatumConverterFactory(Configuration conf);
// Factory method
public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);
}The factory includes built-in converters for common Hadoop types:
WritableConverter: Converts Hadoop Writable objects to AvroTextConverter: Converts Text objects to Avro stringsLongWritableConverter: Converts LongWritable to Avro longIntWritableConverter: Converts IntWritable to Avro intDoubleWritableConverter: Converts DoubleWritable to Avro doubleFloatWritableConverter: Converts FloatWritable to Avro floatBooleanWritableConverter: Converts BooleanWritable to Avro booleanBytesWritableConverter: Converts BytesWritable to Avro bytesimport org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.hadoop.io.Text;
// Create converter factory
Configuration conf = new Configuration();
AvroDatumConverterFactory factory = new AvroDatumConverterFactory(conf);
// Get converter for Text to Avro string
AvroDatumConverter<Text, Utf8> converter = factory.create(Text.class);
// Convert data
Text input = new Text("Hello, World!");
Utf8 avroString = converter.convert(input);
Schema schema = converter.getWriterSchema();Specialized comparators for Avro data that support both object and raw byte comparison.
public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {
// Object comparison
public int compare(AvroKey<T> x, AvroKey<T> y);
// Raw byte comparison (for efficiency)
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
// Configuration
public void setConf(Configuration conf);
public Configuration getConf();
}import org.apache.avro.hadoop.io.AvroKeyComparator;
import org.apache.avro.mapred.AvroKey;
import org.apache.hadoop.conf.Configuration;
// Create and configure comparator
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
Configuration conf = new Configuration();
AvroSerialization.setKeyWriterSchema(conf, schema);
comparator.setConf(conf);
// Compare objects
AvroKey<GenericRecord> key1 = new AvroKey<>(record1);
AvroKey<GenericRecord> key2 = new AvroKey<>(record2);
int result = comparator.compare(key1, key2);Serialization support specifically for the legacy MapReduce API.
public class org.apache.avro.mapred.AvroSerialization implements Serialization<AvroWrapper> {
// Legacy implementation for org.apache.hadoop.mapred compatibility
public boolean accept(Class<?> c);
public Deserializer<AvroWrapper> getDeserializer(Class<AvroWrapper> c);
public Serializer<AvroWrapper> getSerializer(Class<AvroWrapper> c);
}The serialization framework supports Avro's schema evolution capabilities:
// Set different schemas for writing and reading
AvroSerialization.setKeyWriterSchema(conf, writerSchema);
AvroSerialization.setKeyReaderSchema(conf, readerSchema);
// Data written with writer schema will be automatically converted to reader schemaThe serialization framework follows Avro's schema evolution rules:
// Original schema (version 1)
Schema v1Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");
// Evolved schema (version 2) with new field
Schema v2Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}");
// Configure for evolution
AvroSerialization.setKeyWriterSchema(conf, v1Schema); // Data was written with v1
AvroSerialization.setKeyReaderSchema(conf, v2Schema); // Read with v2 (email will be null)Support for different Avro data models:
import org.apache.avro.generic.GenericData;
// Use generic data model (default)
AvroSerialization.setDataModelClass(conf, GenericData.class);
GenericData dataModel = AvroSerialization.createDataModel(conf);import org.apache.avro.specific.SpecificData;
// Use specific data model for generated classes
AvroSerialization.setDataModelClass(conf, SpecificData.class);import org.apache.avro.reflect.ReflectData;
// Use reflection data model for POJOs
AvroSerialization.setDataModelClass(conf, ReflectData.class);Serializers and deserializers support object reuse for better performance:
// Reuse wrapper objects
AvroKey<GenericRecord> reusableKey = new AvroKey<>();
AvroValue<GenericRecord> reusableValue = new AvroValue<>();
// Deserializers will reuse these objects
AvroKey<GenericRecord> result = deserializer.deserialize(reusableKey);Use raw byte comparison for sorting without deserialization:
// Raw comparator avoids object deserialization for sorting
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
// Hadoop will use raw byte comparison when possible// Proper resource management
try (AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema)) {
serializer.open(outputStream);
// Use serializer
} // Automatically closed// Store schemas in configuration for access across tasks
AvroSerialization.setKeyWriterSchema(conf, keySchema);
AvroSerialization.setValueWriterSchema(conf, valueSchema);
// Retrieve schemas where needed
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);// Always add Avro serialization to configuration
AvroSerialization.addToConfiguration(conf);
// Verify serialization is properly configured
String[] serializations = conf.getStrings("io.serializations");
// Should include "org.apache.avro.hadoop.io.AvroSerialization"Common serialization issues and solutions:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-avro--avro-mapred