CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-mapred

Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines

Pending
Overview
Eval results
Files

serialization.mddocs/

Serialization and I/O Infrastructure

Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange, schema management, and seamless conversion between Java objects and Avro data formats within MapReduce pipelines.

Capabilities

Hadoop Serialization Integration

Main serialization class that registers Avro data types with Hadoop's serialization framework.

public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {
    // Core serialization interface
    public boolean accept(Class<?> c);
    public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);
    public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);
    
    // Configuration management
    public static void addToConfiguration(Configuration conf);
    
    // Schema configuration
    public static void setKeyWriterSchema(Configuration conf, Schema schema);
    public static void setKeyReaderSchema(Configuration conf, Schema schema);
    public static void setValueWriterSchema(Configuration conf, Schema schema);
    public static void setValueReaderSchema(Configuration conf, Schema schema);
    
    // Schema retrieval
    public static Schema getKeyWriterSchema(Configuration conf);
    public static Schema getKeyReaderSchema(Configuration conf);
    public static Schema getValueWriterSchema(Configuration conf);
    public static Schema getValueReaderSchema(Configuration conf);
    
    // Data model support
    public static GenericData createDataModel(Configuration conf);
}

Usage Example

import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.hadoop.conf.Configuration;

// Enable Avro serialization
Configuration conf = new Configuration();
AvroSerialization.addToConfiguration(conf);

// Configure schemas
Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");
AvroSerialization.setKeyWriterSchema(conf, userSchema);
AvroSerialization.setValueWriterSchema(conf, userSchema);

// Retrieve configured schemas
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);
Schema valueSchema = AvroSerialization.getValueWriterSchema(conf);

Avro Serializer

Serializer implementation for converting AvroWrapper objects to binary format.

public class AvroSerializer<T> implements Serializer<AvroWrapper<T>> {
    // Constructors
    public AvroSerializer(Schema writerSchema);
    public AvroSerializer(Schema writerSchema, DatumWriter<T> datumWriter);
    
    // Configuration
    public Schema getWriterSchema();
    
    // Serialization lifecycle
    public void open(OutputStream outputStream) throws IOException;
    public void serialize(AvroWrapper<T> avroWrapper) throws IOException;
    public void close() throws IOException;
}

Usage Example

import org.apache.avro.hadoop.io.AvroSerializer;
import org.apache.avro.mapred.AvroWrapper;
import java.io.ByteArrayOutputStream;

// Create serializer
Schema schema = Schema.parse("...");
AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema);

// Serialize data
ByteArrayOutputStream out = new ByteArrayOutputStream();
serializer.open(out);

AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(record);
serializer.serialize(wrapper);
serializer.close();

byte[] serializedData = out.toByteArray();

Avro Deserializer

Base deserializer class for converting binary data back to AvroWrapper objects.

public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> implements Deserializer<T> {
    // Schema access
    public Schema getWriterSchema();
    public Schema getReaderSchema();
    
    // Deserialization lifecycle
    public void open(InputStream inputStream) throws IOException;
    public abstract T deserialize(T avroWrapperToReuse) throws IOException;
    public void close() throws IOException;
}

public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> {
    public AvroKey<D> deserialize(AvroKey<D> avroWrapperToReuse) throws IOException;
}

public class AvroValueDeserializer<D> extends AvroDeserializer<AvroValue<D>, D> {
    public AvroValue<D> deserialize(AvroValue<D> avroWrapperToReuse) throws IOException;
}

Usage Example

import org.apache.avro.hadoop.io.AvroKeyDeserializer;
import org.apache.avro.mapred.AvroKey;
import java.io.ByteArrayInputStream;

// Create deserializer
AvroKeyDeserializer<GenericRecord> deserializer = new AvroKeyDeserializer<>();

// Deserialize data
ByteArrayInputStream in = new ByteArrayInputStream(serializedData);
deserializer.open(in);

AvroKey<GenericRecord> key = new AvroKey<>();
AvroKey<GenericRecord> result = deserializer.deserialize(key);
deserializer.close();

GenericRecord record = result.datum();

Data Conversion Framework

Framework for converting between different data formats and Avro.

public abstract class AvroDatumConverter<INPUT,OUTPUT> {
    // Core conversion method
    public abstract OUTPUT convert(INPUT input);
    
    // Schema information
    public abstract Schema getWriterSchema();
}

public class AvroDatumConverterFactory {
    // Constructor
    public AvroDatumConverterFactory(Configuration conf);
    
    // Factory method
    public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);
}

The factory includes built-in converters for common Hadoop types:

  • WritableConverter: Converts Hadoop Writable objects to Avro
  • TextConverter: Converts Text objects to Avro strings
  • LongWritableConverter: Converts LongWritable to Avro long
  • IntWritableConverter: Converts IntWritable to Avro int
  • DoubleWritableConverter: Converts DoubleWritable to Avro double
  • FloatWritableConverter: Converts FloatWritable to Avro float
  • BooleanWritableConverter: Converts BooleanWritable to Avro boolean
  • BytesWritableConverter: Converts BytesWritable to Avro bytes

Usage Example

import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.hadoop.io.Text;

// Create converter factory
Configuration conf = new Configuration();
AvroDatumConverterFactory factory = new AvroDatumConverterFactory(conf);

// Get converter for Text to Avro string
AvroDatumConverter<Text, Utf8> converter = factory.create(Text.class);

// Convert data
Text input = new Text("Hello, World!");
Utf8 avroString = converter.convert(input);
Schema schema = converter.getWriterSchema();

Key Comparators

Specialized comparators for Avro data that support both object and raw byte comparison.

public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {
    // Object comparison
    public int compare(AvroKey<T> x, AvroKey<T> y);
    
    // Raw byte comparison (for efficiency)
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
    
    // Configuration
    public void setConf(Configuration conf);
    public Configuration getConf();
}

Usage Example

import org.apache.avro.hadoop.io.AvroKeyComparator;
import org.apache.avro.mapred.AvroKey;
import org.apache.hadoop.conf.Configuration;

// Create and configure comparator
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
Configuration conf = new Configuration();
AvroSerialization.setKeyWriterSchema(conf, schema);
comparator.setConf(conf);

// Compare objects
AvroKey<GenericRecord> key1 = new AvroKey<>(record1);
AvroKey<GenericRecord> key2 = new AvroKey<>(record2);
int result = comparator.compare(key1, key2);

Legacy API Serialization

Serialization support specifically for the legacy MapReduce API.

public class org.apache.avro.mapred.AvroSerialization implements Serialization<AvroWrapper> {
    // Legacy implementation for org.apache.hadoop.mapred compatibility
    public boolean accept(Class<?> c);
    public Deserializer<AvroWrapper> getDeserializer(Class<AvroWrapper> c);
    public Serializer<AvroWrapper> getSerializer(Class<AvroWrapper> c);
}

Schema Evolution Support

The serialization framework supports Avro's schema evolution capabilities:

Writer and Reader Schemas

// Set different schemas for writing and reading
AvroSerialization.setKeyWriterSchema(conf, writerSchema);
AvroSerialization.setKeyReaderSchema(conf, readerSchema);

// Data written with writer schema will be automatically converted to reader schema

Evolution Rules

The serialization framework follows Avro's schema evolution rules:

  • Forward Compatibility: New schema can read data written with old schema
  • Backward Compatibility: Old schema can read data written with new schema
  • Full Compatibility: Both forward and backward compatible

Example

// Original schema (version 1)
Schema v1Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");

// Evolved schema (version 2) with new field
Schema v2Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}");

// Configure for evolution
AvroSerialization.setKeyWriterSchema(conf, v1Schema);  // Data was written with v1
AvroSerialization.setKeyReaderSchema(conf, v2Schema);  // Read with v2 (email will be null)

Data Model Integration

Support for different Avro data models:

Generic Data Model

import org.apache.avro.generic.GenericData;

// Use generic data model (default)
AvroSerialization.setDataModelClass(conf, GenericData.class);
GenericData dataModel = AvroSerialization.createDataModel(conf);

Specific Data Model

import org.apache.avro.specific.SpecificData;

// Use specific data model for generated classes
AvroSerialization.setDataModelClass(conf, SpecificData.class);

Reflect Data Model

import org.apache.avro.reflect.ReflectData;

// Use reflection data model for POJOs
AvroSerialization.setDataModelClass(conf, ReflectData.class);

Performance Optimization

Object Reuse

Serializers and deserializers support object reuse for better performance:

// Reuse wrapper objects
AvroKey<GenericRecord> reusableKey = new AvroKey<>();
AvroValue<GenericRecord> reusableValue = new AvroValue<>();

// Deserializers will reuse these objects
AvroKey<GenericRecord> result = deserializer.deserialize(reusableKey);

Raw Byte Comparison

Use raw byte comparison for sorting without deserialization:

// Raw comparator avoids object deserialization for sorting
AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();
// Hadoop will use raw byte comparison when possible

Memory Management

// Proper resource management
try (AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema)) {
    serializer.open(outputStream);
    // Use serializer
} // Automatically closed

Configuration Best Practices

Schema Management

// Store schemas in configuration for access across tasks
AvroSerialization.setKeyWriterSchema(conf, keySchema);
AvroSerialization.setValueWriterSchema(conf, valueSchema);

// Retrieve schemas where needed
Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);

Serialization Registration

// Always add Avro serialization to configuration
AvroSerialization.addToConfiguration(conf);

// Verify serialization is properly configured
String[] serializations = conf.getStrings("io.serializations");
// Should include "org.apache.avro.hadoop.io.AvroSerialization"

Error Handling

Common serialization issues and solutions:

  • Schema Not Found: Ensure schemas are configured before creating serializers/deserializers
  • Schema Evolution Errors: Verify schema compatibility using Avro's SchemaCompatibility class
  • Memory Issues: Use object reuse patterns for high-throughput scenarios
  • Configuration Errors: Verify AvroSerialization is added to configuration
  • Codec Issues: Ensure compression codecs are available on all cluster nodes

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-mapred

docs

cross-language-processing.md

data-wrappers.md

file-utilities.md

index.md

input-output-formats.md

job-configuration.md

mapreduce-processing.md

serialization.md

tile.json