CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs

Pending
Overview
Eval results
Files

type-system.mddocs/

Type System Integration

The Type System Integration capability provides comprehensive support for Hadoop Writable types within Flink's type system, enabling efficient serialization, deserialization, and comparison of Hadoop data types in Flink applications.

Overview

Flink's type system requires explicit type information for serialization and comparison operations. The Hadoop compatibility layer provides specialized type information classes that bridge Hadoop's Writable interface with Flink's TypeInformation system, ensuring optimal performance and correctness.

WritableTypeInfo

The primary class for integrating Hadoop Writable types with Flink's type system.

@Public
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
    
    // Constructor
    @PublicEvolving
    public WritableTypeInfo(Class<T> typeClass);
    
    // Create type-specific serializer
    @PublicEvolving
    public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);
    
    // Create type-specific comparator for sorting
    @PublicEvolving
    public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
    
    // Type introspection methods
    @PublicEvolving
    public boolean isBasicType();
    @PublicEvolving
    public boolean isTupleType();
    @PublicEvolving
    public int getArity();
    @PublicEvolving
    public int getTotalFields();
    @PublicEvolving
    public Class<T> getTypeClass();
    @PublicEvolving
    public boolean isKeyType();
    
    // Package-private factory method (internal use)
    @PublicEvolving
    static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
}

WritableSerializer

High-performance serializer for Hadoop Writable types.

@Internal
public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
    
    // Constructor
    public WritableSerializer(Class<T> typeClass);
    
    // Instance creation and copying
    public T createInstance();
    public T copy(T from);
    public T copy(T from, T reuse);
    
    // Serialization metadata
    public int getLength();
    public boolean isImmutableType();
    
    // Serialization operations
    public void serialize(T record, DataOutputView target) throws IOException;
    public T deserialize(DataInputView source) throws IOException;
    public T deserialize(T reuse, DataInputView source) throws IOException;
    public void copy(DataInputView source, DataOutputView target) throws IOException;
    
    // Serializer management
    public WritableSerializer<T> duplicate();
}

WritableComparator

Efficient comparator for Hadoop Writable types supporting both in-memory and serialized comparison.

@Internal
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
    
    // Constructor
    public WritableComparator(boolean ascending, Class<T> type);
    
    // Hash computation
    public int hash(T record);
    
    // Reference-based comparison
    public void setReference(T toCompare);
    public boolean equalToReference(T candidate);
    public int compareToReference(TypeComparator<T> referencedComparator);
    
    // Direct comparison
    public int compare(T first, T second);
    
    // Serialized data comparison
    public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException;
    
    // Normalized key support for sorting optimization
    public boolean supportsNormalizedKey();
    public int getNormalizeKeyLen();
    public boolean isNormalizedKeyPrefixOnly(int keyBytes);
    public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes);
    public boolean invertNormalizedKey();
    
    // Comparator management
    public TypeComparator<T> duplicate();
}

Usage Examples

Basic Type Information Usage

import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

// Create type information for Hadoop Writable types
TypeInformation<Text> textTypeInfo = new WritableTypeInfo<>(Text.class);
TypeInformation<IntWritable> intTypeInfo = new WritableTypeInfo<>(IntWritable.class);

// Use with DataSet operations
DataSet<Text> textData = env.fromElements(
    new Text("Hello"), 
    new Text("World")
).returns(textTypeInfo);

DataSet<IntWritable> intData = env.fromElements(
    new IntWritable(1),
    new IntWritable(2),
    new IntWritable(3)
).returns(intTypeInfo);

Custom Writable Types

// Define custom Writable class
public class CustomWritable implements Writable, Comparable<CustomWritable> {
    private int id;
    private String name;
    
    public CustomWritable() {}
    
    public CustomWritable(int id, String name) {
        this.id = id;
        this.name = name;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(id);
        out.writeUTF(name);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readInt();
        name = in.readUTF();
    }
    
    @Override
    public int compareTo(CustomWritable other) {
        int result = Integer.compare(this.id, other.id);
        if (result == 0) {
            result = this.name.compareTo(other.name);
        }
        return result;
    }
    
    // hashCode, equals, toString methods...
}
// Use custom Writable with type information
TypeInformation<CustomWritable> customTypeInfo = 
    new WritableTypeInfo<>(CustomWritable.class);

DataSet<CustomWritable> customData = env.fromElements(
    new CustomWritable(1, "Alice"),
    new CustomWritable(2, "Bob"),
    new CustomWritable(3, "Charlie")
).returns(customTypeInfo);

// Sort using the Comparable implementation
DataSet<CustomWritable> sortedData = customData.sortPartition(0, Order.ASCENDING);

Tuple Types with Writables

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;

// Create type information for tuples containing Writables
TypeInformation<Tuple2<IntWritable, Text>> tupleTypeInfo = 
    new TupleTypeInfo<>(
        new WritableTypeInfo<>(IntWritable.class),
        new WritableTypeInfo<>(Text.class)
    );

DataSet<Tuple2<IntWritable, Text>> tupleData = env.fromElements(
    new Tuple2<>(new IntWritable(1), new Text("First")),
    new Tuple2<>(new IntWritable(2), new Text("Second"))
).returns(tupleTypeInfo);

Serialization Configuration

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;

// Get serializer for custom configuration
ExecutionConfig config = env.getConfig();
TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);
TypeSerializer<Text> textSerializer = textType.createSerializer(config);

// The serializer can be used for manual serialization if needed
// (typically handled automatically by Flink)

Performance Optimization

// Enable object reuse for better performance with Writable types
env.getConfig().enableObjectReuse();

// This is particularly beneficial for Writable types as they support
// in-place deserialization, reducing garbage collection pressure

Common Writable Types

The following Hadoop Writable types are commonly used and fully supported:

// Primitive wrappers
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;

// Text and binary data
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;

// Null values
import org.apache.hadoop.io.NullWritable;

// Variable-length integers
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;

// Collections
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SortedMapWritable;

Type Information for Common Writables

// Get type information for common Hadoop types
TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);
TypeInformation<IntWritable> intType = new WritableTypeInfo<>(IntWritable.class);
TypeInformation<LongWritable> longType = new WritableTypeInfo<>(LongWritable.class);
TypeInformation<BooleanWritable> boolType = new WritableTypeInfo<>(BooleanWritable.class);
TypeInformation<DoubleWritable> doubleType = new WritableTypeInfo<>(DoubleWritable.class);
TypeInformation<NullWritable> nullType = new WritableTypeInfo<>(NullWritable.class);

Advanced Usage

Custom Serialization Logic

// For complex Writable types that need special handling
public class ComplexWritable implements Writable {
    private Map<String, List<Integer>> data;
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(data.size());
        for (Map.Entry<String, List<Integer>> entry : data.entrySet()) {
            out.writeUTF(entry.getKey());
            List<Integer> values = entry.getValue();
            out.writeInt(values.size());
            for (Integer value : values) {
                out.writeInt(value);
            }
        }
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        int mapSize = in.readInt();
        data = new HashMap<>(mapSize);
        for (int i = 0; i < mapSize; i++) {
            String key = in.readUTF();
            int listSize = in.readInt();
            List<Integer> values = new ArrayList<>(listSize);
            for (int j = 0; j < listSize; j++) {
                values.add(in.readInt());
            }
            data.put(key, values);
        }
    }
}

Normalized Key Optimization

For high-performance sorting operations, Writable types that implement proper compareTo methods can benefit from normalized key optimization:

public class OptimizedWritable implements Writable, Comparable<OptimizedWritable> {
    private long primaryKey;
    private String secondaryKey;
    
    @Override
    public int compareTo(OptimizedWritable other) {
        // Primary comparison on long value (efficient for normalized keys)
        int result = Long.compare(this.primaryKey, other.primaryKey);
        if (result == 0) {
            result = this.secondaryKey.compareTo(other.secondaryKey);
        }
        return result;
    }
    
    // Writable implementation...
}

Error Handling

Common issues and their solutions:

try {
    TypeInformation<MyWritable> typeInfo = 
        new WritableTypeInfo<>(MyWritable.class);
} catch (IllegalArgumentException e) {
    // Class doesn't implement Writable interface
    logger.error("Type must implement Writable interface: " + e.getMessage());
} catch (RuntimeException e) {
    // Issues with reflection or class instantiation
    logger.error("Failed to create type information: " + e.getMessage());
}

Common Problems

  1. Missing default constructor: Writable classes must have a public no-argument constructor
  2. Incomplete Writable implementation: Both write() and readFields() must be properly implemented
  3. Inconsistent serialization: The order and format of writes must match reads exactly
  4. Missing Comparable implementation: For sorting operations, implement Comparable<T>

Best Practices

  1. Always test Writable serialization/deserialization in isolation
  2. Implement hashCode() and equals() consistently with compareTo()
  3. Use appropriate buffer sizes for large data structures
  4. Consider implementing toString() for debugging
  5. Use object reuse when processing large datasets with Writable types

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11

docs

configuration.md

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

scala-api.md

type-system.md

tile.json