CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-avro--avro-mapred

Hadoop MapReduce compatible API for using Avro serialization in distributed data processing pipelines

Pending
Overview
Eval results
Files

file-utilities.mddocs/

File Utilities and Storage

Advanced file handling utilities for sorted key-value files, sequence files, and compression codec integration. These utilities provide efficient storage and retrieval mechanisms for Avro data with specialized support for indexed access, sorted operations, and seamless integration with Hadoop's file system.

Capabilities

Sorted Key-Value Files

Indexed Avro container files that support efficient key-based lookups, similar to Hadoop's MapFile but designed specifically for Avro data.

public class SortedKeyValueFile {
    // Nested reader class
    public static class Reader<K,V> implements Closeable {
        // Constructor
        public Reader(Options options) throws IOException;
        
        // Data access
        public V get(K key) throws IOException;
        public Iterator<AvroKeyValue<K,V>> iterator() throws IOException;
        
        // Resource management
        public void close() throws IOException;
        
        // Options configuration
        public static class Options {
            // Builder pattern methods for configuration
            public Options withKeySchema(Schema keySchema);
            public Options withValueSchema(Schema valueSchema);
            public Options withPath(Path path);
            public Options withConfiguration(Configuration conf);
            public Options withDataModel(GenericData dataModel);
        }
    }
    
    // Nested writer class
    public static class Writer<K,V> implements Closeable {
        // Constructor  
        public Writer(Options options) throws IOException;
        
        // Data writing (keys must be in sorted order)
        public void append(K key, V value) throws IOException;
        
        // Resource management
        public void close() throws IOException;
        
        // Options configuration
        public static class Options {
            // Builder pattern methods for configuration
            public Options withKeySchema(Schema keySchema);
            public Options withValueSchema(Schema valueSchema);
            public Options withPath(Path path);
            public Options withConfiguration(Configuration conf);
            public Options withDataModel(GenericData dataModel);
            public Options withCodec(CodecFactory codec);
        }
    }
}

Usage Example

import org.apache.avro.hadoop.file.SortedKeyValueFile;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;

// Define schemas
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");

// Write sorted key-value file
SortedKeyValueFile.Writer.Options writerOpts = new SortedKeyValueFile.Writer.Options()
    .withKeySchema(keySchema)
    .withValueSchema(valueSchema)
    .withPath(new Path("/data/users.skv"))
    .withConfiguration(conf)
    .withCodec(CodecFactory.snappyCodec());

try (SortedKeyValueFile.Writer<String, GenericRecord> writer = 
     new SortedKeyValueFile.Writer<>(writerOpts)) {
    
    // Append data in sorted key order
    writer.append("alice", aliceRecord);
    writer.append("bob", bobRecord);
    writer.append("charlie", charlieRecord);
}

// Read from sorted key-value file
SortedKeyValueFile.Reader.Options readerOpts = new SortedKeyValueFile.Reader.Options()
    .withKeySchema(keySchema)
    .withValueSchema(valueSchema)
    .withPath(new Path("/data/users.skv"))
    .withConfiguration(conf);

try (SortedKeyValueFile.Reader<String, GenericRecord> reader = 
     new SortedKeyValueFile.Reader<>(readerOpts)) {
    
    // Efficient key lookup
    GenericRecord user = reader.get("bob");
    
    // Iterator over all records
    Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
    while (iter.hasNext()) {
        AvroKeyValue<String, GenericRecord> kv = iter.next();
        String key = kv.getKey();
        GenericRecord value = kv.getValue();
        // Process key-value pair
    }
}

Avro Sequence Files

Enhanced Hadoop SequenceFile support with Avro serialization, providing metadata storage and schema information.

public class AvroSequenceFile {
    // Metadata field constants
    public static final Text METADATA_FIELD_KEY_SCHEMA = new Text("key.schema");
    public static final Text METADATA_FIELD_VALUE_SCHEMA = new Text("value.schema");
    
    // Writer creation
    public static SequenceFile.Writer createWriter(Writer.Options options) throws IOException;
    
    // Nested writer class
    public static class Writer implements Closeable {
        // Data writing
        public void append(AvroWrapper key, AvroWrapper value) throws IOException;
        public void close() throws IOException;
        
        // Options configuration
        public static class Options {
            public Options withPath(Path path);
            public Options withKeySchema(Schema keySchema);
            public Options withValueSchema(Schema valueSchema);
            public Options withConfiguration(Configuration conf);
            public Options withCompressionType(SequenceFile.CompressionType compressionType);
            public Options withCompressionCodec(CompressionCodec codec);
            public Options withDataModel(GenericData dataModel);
        }
    }
    
    // Nested reader class
    public static class Reader implements Closeable {
        // Data reading
        public boolean next(AvroWrapper key, AvroWrapper value) throws IOException;
        public void close() throws IOException;
        
        // Schema access
        public Schema getKeySchema();
        public Schema getValueSchema();
        
        // Options configuration
        public static class Options {
            public Options withPath(Path path);
            public Options withConfiguration(Configuration conf);
            public Options withDataModel(GenericData dataModel);
        }
    }
}

Usage Example

import org.apache.avro.hadoop.io.AvroSequenceFile;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.io.SequenceFile;

// Write Avro sequence file
AvroSequenceFile.Writer.Options writerOpts = new AvroSequenceFile.Writer.Options()
    .withPath(new Path("/data/sequence.seq"))
    .withKeySchema(keySchema)
    .withValueSchema(valueSchema)
    .withConfiguration(conf)
    .withCompressionType(SequenceFile.CompressionType.BLOCK);

try (AvroSequenceFile.Writer writer = new AvroSequenceFile.Writer(writerOpts)) {
    AvroWrapper<String> key = new AvroWrapper<>();
    AvroWrapper<GenericRecord> value = new AvroWrapper<>();
    
    // Write key-value pairs
    key.datum("key1");
    value.datum(record1);
    writer.append(key, value);
    
    key.datum("key2");
    value.datum(record2);
    writer.append(key, value);
}

// Read Avro sequence file
AvroSequenceFile.Reader.Options readerOpts = new AvroSequenceFile.Reader.Options()
    .withPath(new Path("/data/sequence.seq"))
    .withConfiguration(conf);

try (AvroSequenceFile.Reader reader = new AvroSequenceFile.Reader(readerOpts)) {
    Schema keySchema = reader.getKeySchema();
    Schema valueSchema = reader.getValueSchema();
    
    AvroWrapper<String> key = new AvroWrapper<>();
    AvroWrapper<GenericRecord> value = new AvroWrapper<>();
    
    while (reader.next(key, value)) {
        String keyData = key.datum();
        GenericRecord valueData = value.datum();
        // Process data
    }
}

Hadoop Codec Integration

Utilities for mapping between Hadoop compression codecs and Avro compression codecs.

public class HadoopCodecFactory {
    // Codec mapping methods
    public static CodecFactory fromHadoopString(String hadoopCodecClass) throws AvroRuntimeException;
    public static String getAvroCodecName(String hadoopCodecClass);
}

Supported codec mappings:

  • org.apache.hadoop.io.compress.DefaultCodecdeflate
  • org.apache.hadoop.io.compress.GzipCodecdeflate
  • org.apache.hadoop.io.compress.BZip2Codecbzip2
  • org.apache.hadoop.io.compress.SnappyCodecsnappy
  • org.apache.hadoop.io.compress.Lz4Codecxz
  • com.github.luben.zstd.ZstdCodeczstandard

Usage Example

import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.file.CodecFactory;

// Convert Hadoop codec class to Avro codec
String hadoopCodecClass = "org.apache.hadoop.io.compress.SnappyCodec";
CodecFactory avroCodec = HadoopCodecFactory.fromHadoopString(hadoopCodecClass);

// Get Avro codec name
String avroCodecName = HadoopCodecFactory.getAvroCodecName(hadoopCodecClass);
// Returns "snappy"

// Use in file writing
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
    .withCodec(avroCodec);

Key-Value Helper Utilities

Helper classes for working with key-value data structures in Avro format.

public class AvroKeyValue<K,V> {
    // Constructor
    public AvroKeyValue(GenericRecord keyValueRecord);
    
    // Data access
    public GenericRecord get();
    public K getKey();
    public V getValue();
    public void setKey(K key);
    public void setValue(V value);
    
    // Schema utilities
    public static Schema getSchema(Schema keySchema, Schema valueSchema);
    
    // Field name constants
    public static final String KEY_VALUE_PAIR_RECORD_NAME = "org.apache.avro.mapred.Pair";
    public static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = null;
    public static final String KEY_FIELD = "key";
    public static final String VALUE_FIELD = "value";
    
    // Iterator support
    public static class Iterator<K,V> implements java.util.Iterator<AvroKeyValue<K,V>> {
        public Iterator(java.util.Iterator<GenericRecord> records);
        public boolean hasNext();
        public AvroKeyValue<K,V> next();
        public void remove();
    }
}

Usage Example

import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;

// Create key-value schema and record
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.create(Schema.Type.INT);
Schema kvSchema = AvroKeyValue.getSchema(keySchema, valueSchema);

GenericRecord kvRecord = new GenericRecordBuilder(kvSchema)
    .set(AvroKeyValue.KEY_FIELD, "count")
    .set(AvroKeyValue.VALUE_FIELD, 42)
    .build();

// Use helper
AvroKeyValue<String, Integer> kv = new AvroKeyValue<>(kvRecord);
String key = kv.getKey();     // "count"
Integer value = kv.getValue(); // 42

// Modify values
kv.setKey("total");
kv.setValue(100);

Integration with MapReduce

Input Format Integration

File utilities integrate with MapReduce input formats:

// Use sorted key-value files as MapReduce input
public class SortedKeyValueInputFormat<K,V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordReader<AvroKey<K>, AvroValue<V>> createRecordReader(InputSplit split, TaskAttemptContext context) {
        return new SortedKeyValueRecordReader<>();
    }
}

Output Format Integration

// Write MapReduce output as sorted key-value files
public class SortedKeyValueOutputFormat<K,V> extends FileOutputFormat<AvroKey<K>, AvroValue<V>> {
    public RecordWriter<AvroKey<K>, AvroValue<V>> getRecordWriter(TaskAttemptContext context) {
        return new SortedKeyValueRecordWriter<>();
    }
}

Performance Considerations

Sorted File Optimization

// Configure appropriate options for performance
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
    .withKeySchema(keySchema)
    .withValueSchema(valueSchema)
    .withPath(path)
    .withCodec(CodecFactory.snappyCodec())  // Use compression
    .withConfiguration(conf);

// Ensure keys are pre-sorted for optimal performance
// SortedKeyValueFile requires keys to be in sorted order

Memory Management

// Use try-with-resources for proper resource management
try (SortedKeyValueFile.Reader<String, GenericRecord> reader = 
     new SortedKeyValueFile.Reader<>(readerOpts)) {
    // Use reader
} // Automatically closed

// Reuse objects in loops
AvroKeyValue<String, GenericRecord> reusableKV = null;
Iterator<AvroKeyValue<String, GenericRecord>> iter = reader.iterator();
while (iter.hasNext()) {
    reusableKV = iter.next();  // May reuse object
    // Process reusableKV
}

Compression Strategy

// Choose appropriate compression based on use case
CodecFactory snappy = CodecFactory.snappyCodec();    // Fast compression/decompression
CodecFactory deflate = CodecFactory.deflateCodec(6); // Better compression ratio
CodecFactory bzip2 = CodecFactory.bzip2Codec();      // Highest compression ratio

// Configure based on workload
SortedKeyValueFile.Writer.Options opts = new SortedKeyValueFile.Writer.Options()
    .withCodec(snappy);  // Good balance of speed and compression

Error Handling

Common issues and solutions:

Sorted File Issues

  • Unsorted Keys: SortedKeyValueFile requires keys in sorted order during writing
  • Schema Mismatch: Ensure reader and writer use compatible schemas
  • Index Corruption: Verify file integrity if key lookups fail

Sequence File Issues

  • Codec Not Available: Ensure compression codecs are available on all nodes
  • Schema Missing: Verify schemas are stored in sequence file metadata
  • Version Compatibility: Check Hadoop version compatibility

Resource Management

// Always use try-with-resources or explicit cleanup
try (SortedKeyValueFile.Writer<String, GenericRecord> writer = 
     new SortedKeyValueFile.Writer<>(opts)) {
    // Use writer
} catch (IOException e) {
    // Handle errors
    log.error("Failed to write sorted key-value file", e);
    throw e;
}

Exception Handling

try {
    CodecFactory codec = HadoopCodecFactory.fromHadoopString("unknown.codec");
} catch (AvroRuntimeException e) {
    // Handle unsupported codec
    log.warn("Unsupported codec, falling back to default", e);
    CodecFactory codec = CodecFactory.deflateCodec();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-avro--avro-mapred

docs

cross-language-processing.md

data-wrappers.md

file-utilities.md

index.md

input-output-formats.md

job-configuration.md

mapreduce-processing.md

serialization.md

tile.json