CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-hadoop-compatibility

Apache Flink compatibility layer for integrating Hadoop InputFormats, OutputFormats, and MapReduce functions with Flink streaming and batch processing

Pending
Overview
Eval results
Files

type-system.mddocs/

Type System Integration

Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration between Hadoop's serialization system and Flink's type system. Provides efficient serialization, comparison, and type safety for Hadoop data types.

Capabilities

WritableTypeInfo

Type information class for Hadoop Writable types that integrates with Flink's type system.

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

    /**
     * Constructor for WritableTypeInfo
     * @param typeClass The class of the Writable type
     */
    public WritableTypeInfo(Class<T> typeClass);

    /**
     * Factory method to create WritableTypeInfo instances
     * @param typeClass The class of the Writable type
     * @return WritableTypeInfo instance for the given type
     */
    public static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);

    /**
     * Create a type comparator for this Writable type
     * @param sortOrderAscending Whether to sort in ascending order
     * @param executionConfig Execution configuration
     * @return TypeComparator for this type
     */
    public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);

    /**
     * Check if this is a basic type
     * @return false (Writable types are not considered basic types)
     */
    public boolean isBasicType();

    /**
     * Check if this is a tuple type
     * @return false (Writable types are not tuple types)
     */
    public boolean isTupleType();

    /**
     * Get the arity of this type
     * @return 1 (Writable types have arity 1)
     */
    public int getArity();

    /**
     * Get the total number of fields
     * @return 1 (Writable types have 1 field)
     */
    public int getTotalFields();

    /**
     * Get the type class
     * @return The class of the Writable type
     */
    public Class<T> getTypeClass();

    /**
     * Check if this type can be used as a key
     * @return true (Writable types can be used as keys)
     */
    public boolean isKeyType();

    /**
     * Create a serializer for this type
     * @param serializerConfig Serializer configuration
     * @return TypeSerializer for this Writable type
     */
    public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);

    /**
     * String representation of this type
     * @return String description of the type
     */
    public String toString();

    /**
     * Hash code for this type information
     * @return Hash code
     */
    public int hashCode();

    /**
     * Check equality with another object
     * @param obj Object to compare with
     * @return true if equal, false otherwise
     */
    public boolean equals(Object obj);

    /**
     * Check if this type can be equal to another type
     * @param obj Object to check equality capability with
     * @return true if can be equal, false otherwise
     */
    public boolean canEqual(Object obj);
}

Usage Example:

import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;

// Create type information for common Hadoop types
TypeInformation<Text> textTypeInfo = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<IntWritable> intTypeInfo = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
TypeInformation<LongWritable> longTypeInfo = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);

// Use with Flink DataSet API
DataSet<Text> textDataset = env.fromElements(new Text("hello"), new Text("world"))
    .returns(textTypeInfo);

// Use with custom Writable types
public static class CustomWritable implements Writable {
    private String data;
    private int count;
    
    public void write(DataOutput out) throws IOException {
        out.writeUTF(data);
        out.writeInt(count);
    }
    
    public void readFields(DataInput in) throws IOException {
        data = in.readUTF();
        count = in.readInt();
    }
    
    // constructors, getters, setters...
}

TypeInformation<CustomWritable> customTypeInfo = 
    WritableTypeInfo.getWritableTypeInfo(CustomWritable.class);

DataSet<CustomWritable> customDataset = env.fromElements(new CustomWritable())
    .returns(customTypeInfo);

WritableComparator

Type comparator for Hadoop Writable types that enables sorting and grouping operations.

public class WritableComparator<T extends Writable> extends TypeComparator<T> {
    // Implementation details for comparing Writable types
    // Supports hash-based operations, sorting, and grouping
    // Uses Hadoop's WritableComparable interface when available
    // Falls back to serialization-based comparison for non-comparable Writables
}

Usage Example:

import org.apache.flink.api.java.DataSet;
import org.apache.hadoop.io.Text;

// Sorting with Writable types - comparator is automatically created
DataSet<Text> textDataset = env.fromElements(
    new Text("zebra"), new Text("apple"), new Text("banana")
);

DataSet<Text> sortedDataset = textDataset
    .sortPartition(text -> text, Order.ASCENDING)
    .returns(WritableTypeInfo.getWritableTypeInfo(Text.class));

// Grouping operations
DataSet<Tuple2<Text, IntWritable>> keyValuePairs = // ... your dataset
DataSet<Tuple2<Text, IntWritable>> grouped = keyValuePairs
    .groupBy(0)  // Group by Text key (uses WritableComparator internally)
    .sum(1);     // Sum IntWritable values

WritableSerializer

Serializer for Hadoop Writable types that handles efficient serialization and deserialization.

public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
    // Implementation details for serializing Writable types
    // Uses Hadoop's Writable serialization mechanism
    // Handles object reuse and efficient byte stream operations
    // Supports both mutable and immutable serialization patterns
}

Usage Example:

// Serialization is handled automatically by Flink when using WritableTypeInfo
// Manual serializer creation is typically not needed

import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.hadoop.io.Text;

// Example of custom serialization logic (rarely needed)
WritableSerializer<Text> textSerializer = new WritableSerializer<>(Text.class);

// Serialization
Text original = new Text("example");
byte[] serialized = // ... serialize using Flink's serialization framework
Text deserialized = textSerializer.deserialize(/* ... */);

Common Writable Types Support

Standard Hadoop Types

All standard Hadoop Writable types are supported out of the box.

// Numeric types
TypeInformation<IntWritable> intType = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
TypeInformation<LongWritable> longType = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);
TypeInformation<FloatWritable> floatType = WritableTypeInfo.getWritableTypeInfo(FloatWritable.class);
TypeInformation<DoubleWritable> doubleType = WritableTypeInfo.getWritableTypeInfo(DoubleWritable.class);

// Text and boolean types
TypeInformation<Text> textType = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<BooleanWritable> boolType = WritableTypeInfo.getWritableTypeInfo(BooleanWritable.class);

// Null type
TypeInformation<NullWritable> nullType = WritableTypeInfo.getWritableTypeInfo(NullWritable.class);

// Array types
TypeInformation<ArrayWritable> arrayType = WritableTypeInfo.getWritableTypeInfo(ArrayWritable.class);
TypeInformation<TwoDArrayWritable> array2DType = WritableTypeInfo.getWritableTypeInfo(TwoDArrayWritable.class);

// Map and other container types
TypeInformation<MapWritable> mapType = WritableTypeInfo.getWritableTypeInfo(MapWritable.class);
TypeInformation<SortedMapWritable> sortedMapType = WritableTypeInfo.getWritableTypeInfo(SortedMapWritable.class);

Custom Writable Types

Support for user-defined Writable implementations.

// Example custom Writable with complex data
public static class PersonWritable implements Writable, WritableComparable<PersonWritable> {
    private String name;
    private int age;
    private String email;
    
    public PersonWritable() {} // Default constructor required
    
    public PersonWritable(String name, int age, String email) {
        this.name = name;
        this.age = age;
        this.email = email;
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name != null ? name : "");
        out.writeInt(age);
        out.writeUTF(email != null ? email : "");
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        age = in.readInt();
        email = in.readUTF();
    }
    
    @Override
    public int compareTo(PersonWritable other) {
        int result = name.compareTo(other.name);
        if (result == 0) {
            result = Integer.compare(age, other.age);
        }
        return result;
    }
    
    // getters, setters, equals, hashCode...
}

// Use custom Writable type in Flink
TypeInformation<PersonWritable> personType = 
    WritableTypeInfo.getWritableTypeInfo(PersonWritable.class);

DataSet<PersonWritable> people = env.fromElements(
    new PersonWritable("Alice", 25, "alice@example.com"),
    new PersonWritable("Bob", 30, "bob@example.com")
).returns(personType);

// Sorting works automatically due to WritableComparable implementation
DataSet<PersonWritable> sortedPeople = people.sortPartition(person -> person, Order.ASCENDING);

Integration Patterns

Automatic Type Extraction

Flink can often automatically extract Writable type information.

// Automatic type extraction for Hadoop InputFormats
HadoopInputFormat<LongWritable, Text> inputFormat = 
    HadoopInputs.readHadoopFile(
        new TextInputFormat(),
        LongWritable.class,  // Type information extracted automatically
        Text.class,
        "input/path"
    );

DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(inputFormat);
// TypeInformation for Tuple2<LongWritable, Text> is created automatically

Explicit Type Information

Providing explicit type information when automatic extraction fails.

// Explicit type information for complex scenarios
TypeInformation<Tuple2<Text, CustomWritable>> tupleType = 
    new TupleTypeInfo<>(
        WritableTypeInfo.getWritableTypeInfo(Text.class),
        WritableTypeInfo.getWritableTypeInfo(CustomWritable.class)
    );

DataSet<Tuple2<Text, CustomWritable>> dataset = env.fromElements(
    new Tuple2<>(new Text("key"), new CustomWritable())
).returns(tupleType);

Performance Considerations

Optimizing performance with Writable types.

// Reuse objects to reduce garbage collection
public static class EfficientMapper 
    extends RichMapFunction<Tuple2<LongWritable, Text>, Tuple2<Text, IntWritable>> {
    
    private transient Text outputKey;
    private transient IntWritable outputValue;
    
    @Override
    public void open(Configuration parameters) {
        outputKey = new Text();
        outputValue = new IntWritable();
    }
    
    @Override
    public Tuple2<Text, IntWritable> map(Tuple2<LongWritable, Text> input) {
        String text = input.f1.toString();
        outputKey.set(text.toLowerCase());
        outputValue.set(text.length());
        return new Tuple2<>(outputKey, outputValue);
    }
}

Key Design Patterns

Type Safety

WritableTypeInfo ensures compile-time type safety while maintaining runtime efficiency through Hadoop's proven serialization mechanisms.

Performance Optimization

  • Uses Hadoop's native serialization for maximum compatibility
  • Supports object reuse patterns to minimize garbage collection
  • Provides efficient comparators for sorting and grouping operations

Compatibility

  • Full compatibility with existing Hadoop Writable types
  • Support for both WritableComparable and plain Writable interfaces
  • Seamless integration with Flink's type system and operations

Configuration Integration

Type information integrates with Flink's configuration system and can be serialized along with job graphs for distributed execution.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility

docs

index.md

input-formats.md

mapreduce-functions.md

output-formats.md

type-system.md

utilities.md

tile.json