Type information and serialization support for Hadoop Writable types, enabling seamless integration with Flink's type system and runtime. This capability allows Hadoop Writable objects to be used natively within Flink applications with full type safety and efficient serialization.
Type information class that enables Hadoop Writable types to work seamlessly with Flink's type system, providing serialization, deserialization, and comparison operations.
/**
* Type information for data types that extend Hadoop's Writable interface
* @param <T> The Writable type
*/
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> {
/**
* Creates type information for a Writable class
* @param typeClass The class extending Writable
*/
public WritableTypeInfo(Class<T> typeClass);
/**
* Creates comparator for sorting operations
* @param sortOrderAscending Whether to sort in ascending order
* @param executionConfig Flink execution configuration
* @return TypeComparator for the Writable type
*/
public TypeComparator<T> createComparator(
boolean sortOrderAscending,
ExecutionConfig executionConfig
);
/**
* Returns false as Writable types are not basic types
* @return false
*/
public boolean isBasicType();
/**
* Returns false as Writable types are not tuple types
* @return false
*/
public boolean isTupleType();
/**
* Returns 1 as Writable types represent single fields
* @return 1
*/
public int getArity();
/**
* Returns 1 as Writable types represent single fields
* @return 1
*/
public int getTotalFields();
/**
* Returns the wrapped type class
* @return The Writable class
*/
public Class<T> getTypeClass();
/**
* Returns true if the type implements Comparable
* @return true if type implements Comparable, false otherwise
*/
public boolean isKeyType();
/**
* Creates serializer for the Writable type
* @param serializerConfig Serializer configuration
* @return TypeSerializer for the Writable type
*/
public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
/**
* Creates serializer for the Writable type (deprecated)
* @param config Execution configuration
* @return TypeSerializer for the Writable type
* @deprecated Use createSerializer(SerializerConfig) instead
*/
@Deprecated
public TypeSerializer<T> createSerializer(ExecutionConfig config);
/**
* Factory method for creating WritableTypeInfo
* @param typeClass The Writable class
* @param <T> The Writable type
* @return WritableTypeInfo instance
*/
public static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
}Usage Examples:
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create type information for Hadoop Writables
TypeInformation<Text> textType = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<IntWritable> intType = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
// Use with DataSet transformations
DataSet<Text> textData = env.fromElements(
new Text("hello"), new Text("world"), new Text("flink")
).returns(textType);
DataSet<IntWritable> numbers = env.fromElements(
new IntWritable(1), new IntWritable(2), new IntWritable(3)
).returns(intType);
// Sorting with Writable types (works because they implement Comparable)
DataSet<Text> sortedText = textData.sortPartition(0, Order.ASCENDING);
// Grouping and aggregation
DataSet<Tuple2<Text, IntWritable>> wordCounts = textData
.map(text -> new Tuple2<>(text, new IntWritable(1)))
.returns(Types.TUPLE(textType, intType))
.groupBy(0)
.sum(1);The type system supports all standard Hadoop Writable types:
// Standard Hadoop Writable types supported
TypeInformation<Text> textInfo = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<IntWritable> intInfo = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
TypeInformation<LongWritable> longInfo = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);
TypeInformation<DoubleWritable> doubleInfo = WritableTypeInfo.getWritableTypeInfo(DoubleWritable.class);
TypeInformation<FloatWritable> floatInfo = WritableTypeInfo.getWritableTypeInfo(FloatWritable.class);
TypeInformation<BooleanWritable> boolInfo = WritableTypeInfo.getWritableTypeInfo(BooleanWritable.class);
TypeInformation<BytesWritable> bytesInfo = WritableTypeInfo.getWritableTypeInfo(BytesWritable.class);
TypeInformation<NullWritable> nullInfo = WritableTypeInfo.getWritableTypeInfo(NullWritable.class);Support for custom Writable implementations:
// Custom Writable class
public class MyCustomWritable implements Writable, Comparable<MyCustomWritable> {
private String data;
private int value;
// Constructors, getters, setters...
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(data);
out.writeInt(value);
}
@Override
public void readFields(DataInput in) throws IOException {
data = in.readUTF();
value = in.readInt();
}
@Override
public int compareTo(MyCustomWritable other) {
int result = data.compareTo(other.data);
return result != 0 ? result : Integer.compare(value, other.value);
}
}
// Usage with Flink
TypeInformation<MyCustomWritable> customType =
WritableTypeInfo.getWritableTypeInfo(MyCustomWritable.class);
DataSet<MyCustomWritable> customData = env.fromElements(
new MyCustomWritable("test", 1),
new MyCustomWritable("data", 2)
).returns(customType);WritableTypeInfo provides efficient serialization by leveraging Hadoop's existing Writable serialization mechanism:
Writable types that implement Comparable can be used for sorting and grouping operations:
// Sorting operations
DataSet<Text> sortedWords = words.sortPartition(0, Order.ASCENDING);
// Grouping operations
DataSet<Tuple2<Text, IntWritable>> groupedData = data.groupBy(0).sum(1);
// Key extraction
DataSet<MyWritable> uniqueItems = data.distinct(0);The type system provides efficient memory management for Writable types:
The type system handles various error scenarios:
WritableTypeInfo provides memory-efficient operations: