Hadoop compatibility layer for Apache Flink providing input/output format wrappers and utilities to integrate Hadoop MapReduce with Flink's DataSet and DataStream APIs
—
The Type System Integration capability provides comprehensive support for Hadoop Writable types within Flink's type system, enabling efficient serialization, deserialization, and comparison of Hadoop data types in Flink applications.
Flink's type system requires explicit type information for serialization and comparison operations. The Hadoop compatibility layer provides specialized type information classes that bridge Hadoop's Writable interface with Flink's TypeInformation system, ensuring optimal performance and correctness.
The primary class for integrating Hadoop Writable types with Flink's type system.
@Public
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
// Constructor
@PublicEvolving
public WritableTypeInfo(Class<T> typeClass);
// Create type-specific serializer
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);
// Create type-specific comparator for sorting
@PublicEvolving
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
// Type introspection methods
@PublicEvolving
public boolean isBasicType();
@PublicEvolving
public boolean isTupleType();
@PublicEvolving
public int getArity();
@PublicEvolving
public int getTotalFields();
@PublicEvolving
public Class<T> getTypeClass();
@PublicEvolving
public boolean isKeyType();
// Package-private factory method (internal use)
@PublicEvolving
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
}High-performance serializer for Hadoop Writable types.
@Internal
public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
// Constructor
public WritableSerializer(Class<T> typeClass);
// Instance creation and copying
public T createInstance();
public T copy(T from);
public T copy(T from, T reuse);
// Serialization metadata
public int getLength();
public boolean isImmutableType();
// Serialization operations
public void serialize(T record, DataOutputView target) throws IOException;
public T deserialize(DataInputView source) throws IOException;
public T deserialize(T reuse, DataInputView source) throws IOException;
public void copy(DataInputView source, DataOutputView target) throws IOException;
// Serializer management
public WritableSerializer<T> duplicate();
}Efficient comparator for Hadoop Writable types supporting both in-memory and serialized comparison.
@Internal
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
// Constructor
public WritableComparator(boolean ascending, Class<T> type);
// Hash computation
public int hash(T record);
// Reference-based comparison
public void setReference(T toCompare);
public boolean equalToReference(T candidate);
public int compareToReference(TypeComparator<T> referencedComparator);
// Direct comparison
public int compare(T first, T second);
// Serialized data comparison
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException;
// Normalized key support for sorting optimization
public boolean supportsNormalizedKey();
public int getNormalizeKeyLen();
public boolean isNormalizedKeyPrefixOnly(int keyBytes);
public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes);
public boolean invertNormalizedKey();
// Comparator management
public TypeComparator<T> duplicate();
}import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
// Create type information for Hadoop Writable types
TypeInformation<Text> textTypeInfo = new WritableTypeInfo<>(Text.class);
TypeInformation<IntWritable> intTypeInfo = new WritableTypeInfo<>(IntWritable.class);
// Use with DataSet operations
DataSet<Text> textData = env.fromElements(
new Text("Hello"),
new Text("World")
).returns(textTypeInfo);
DataSet<IntWritable> intData = env.fromElements(
new IntWritable(1),
new IntWritable(2),
new IntWritable(3)
).returns(intTypeInfo);// Define custom Writable class
public class CustomWritable implements Writable, Comparable<CustomWritable> {
private int id;
private String name;
public CustomWritable() {}
public CustomWritable(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readInt();
name = in.readUTF();
}
@Override
public int compareTo(CustomWritable other) {
int result = Integer.compare(this.id, other.id);
if (result == 0) {
result = this.name.compareTo(other.name);
}
return result;
}
// hashCode, equals, toString methods...
}// Use custom Writable with type information
TypeInformation<CustomWritable> customTypeInfo =
new WritableTypeInfo<>(CustomWritable.class);
DataSet<CustomWritable> customData = env.fromElements(
new CustomWritable(1, "Alice"),
new CustomWritable(2, "Bob"),
new CustomWritable(3, "Charlie")
).returns(customTypeInfo);
// Sort using the Comparable implementation
DataSet<CustomWritable> sortedData = customData.sortPartition(0, Order.ASCENDING);import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
// Create type information for tuples containing Writables
TypeInformation<Tuple2<IntWritable, Text>> tupleTypeInfo =
new TupleTypeInfo<>(
new WritableTypeInfo<>(IntWritable.class),
new WritableTypeInfo<>(Text.class)
);
DataSet<Tuple2<IntWritable, Text>> tupleData = env.fromElements(
new Tuple2<>(new IntWritable(1), new Text("First")),
new Tuple2<>(new IntWritable(2), new Text("Second"))
).returns(tupleTypeInfo);import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
// Get serializer for custom configuration
ExecutionConfig config = env.getConfig();
TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);
TypeSerializer<Text> textSerializer = textType.createSerializer(config);
// The serializer can be used for manual serialization if needed
// (typically handled automatically by Flink)// Enable object reuse for better performance with Writable types
env.getConfig().enableObjectReuse();
// This is particularly beneficial for Writable types as they support
// in-place deserialization, reducing garbage collection pressureThe following Hadoop Writable types are commonly used and fully supported:
// Primitive wrappers
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.DoubleWritable;
// Text and binary data
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
// Null values
import org.apache.hadoop.io.NullWritable;
// Variable-length integers
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
// Collections
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SortedMapWritable;// Get type information for common Hadoop types
TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);
TypeInformation<IntWritable> intType = new WritableTypeInfo<>(IntWritable.class);
TypeInformation<LongWritable> longType = new WritableTypeInfo<>(LongWritable.class);
TypeInformation<BooleanWritable> boolType = new WritableTypeInfo<>(BooleanWritable.class);
TypeInformation<DoubleWritable> doubleType = new WritableTypeInfo<>(DoubleWritable.class);
TypeInformation<NullWritable> nullType = new WritableTypeInfo<>(NullWritable.class);// For complex Writable types that need special handling
public class ComplexWritable implements Writable {
private Map<String, List<Integer>> data;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(data.size());
for (Map.Entry<String, List<Integer>> entry : data.entrySet()) {
out.writeUTF(entry.getKey());
List<Integer> values = entry.getValue();
out.writeInt(values.size());
for (Integer value : values) {
out.writeInt(value);
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
int mapSize = in.readInt();
data = new HashMap<>(mapSize);
for (int i = 0; i < mapSize; i++) {
String key = in.readUTF();
int listSize = in.readInt();
List<Integer> values = new ArrayList<>(listSize);
for (int j = 0; j < listSize; j++) {
values.add(in.readInt());
}
data.put(key, values);
}
}
}For high-performance sorting operations, Writable types that implement proper compareTo methods can benefit from normalized key optimization:
public class OptimizedWritable implements Writable, Comparable<OptimizedWritable> {
private long primaryKey;
private String secondaryKey;
@Override
public int compareTo(OptimizedWritable other) {
// Primary comparison on long value (efficient for normalized keys)
int result = Long.compare(this.primaryKey, other.primaryKey);
if (result == 0) {
result = this.secondaryKey.compareTo(other.secondaryKey);
}
return result;
}
// Writable implementation...
}Common issues and their solutions:
try {
TypeInformation<MyWritable> typeInfo =
new WritableTypeInfo<>(MyWritable.class);
} catch (IllegalArgumentException e) {
// Class doesn't implement Writable interface
logger.error("Type must implement Writable interface: " + e.getMessage());
} catch (RuntimeException e) {
// Issues with reflection or class instantiation
logger.error("Failed to create type information: " + e.getMessage());
}write() and readFields() must be properly implementedComparable<T>hashCode() and equals() consistently with compareTo()toString() for debuggingInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-hadoop-compatibility-2-11