or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

function-wrappers.mdindex.mdinput-output-formats.mdtype-system.mdutility-classes.md
tile.json

type-system.mddocs/

Type System Integration

Type information and serialization support for Hadoop Writable types, enabling seamless integration with Flink's type system and runtime. This capability allows Hadoop Writable objects to be used natively within Flink applications with full type safety and efficient serialization.

Capabilities

WritableTypeInfo

Type information class that enables Hadoop Writable types to work seamlessly with Flink's type system, providing serialization, deserialization, and comparison operations.

/**
 * Type information for data types that extend Hadoop's Writable interface
 * @param <T> The Writable type
 */
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> {
    
    /**
     * Creates type information for a Writable class
     * @param typeClass The class extending Writable
     */
    public WritableTypeInfo(Class<T> typeClass);
    
    /**
     * Creates comparator for sorting operations
     * @param sortOrderAscending Whether to sort in ascending order
     * @param executionConfig Flink execution configuration
     * @return TypeComparator for the Writable type
     */
    public TypeComparator<T> createComparator(
        boolean sortOrderAscending, 
        ExecutionConfig executionConfig
    );
    
    /**
     * Returns false as Writable types are not basic types
     * @return false
     */
    public boolean isBasicType();
    
    /**
     * Returns false as Writable types are not tuple types  
     * @return false
     */
    public boolean isTupleType();
    
    /**
     * Returns 1 as Writable types represent single fields
     * @return 1
     */
    public int getArity();
    
    /**
     * Returns 1 as Writable types represent single fields
     * @return 1
     */
    public int getTotalFields();
    
    /**
     * Returns the wrapped type class
     * @return The Writable class
     */
    public Class<T> getTypeClass();
    
    /**
     * Returns true if the type implements Comparable
     * @return true if type implements Comparable, false otherwise
     */
    public boolean isKeyType();
    
    /**
     * Creates serializer for the Writable type
     * @param serializerConfig Serializer configuration
     * @return TypeSerializer for the Writable type
     */
    public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
    
    /**
     * Creates serializer for the Writable type (deprecated)
     * @param config Execution configuration
     * @return TypeSerializer for the Writable type
     * @deprecated Use createSerializer(SerializerConfig) instead
     */
    @Deprecated
    public TypeSerializer<T> createSerializer(ExecutionConfig config);
    
    /**
     * Factory method for creating WritableTypeInfo
     * @param typeClass The Writable class
     * @param <T> The Writable type
     * @return WritableTypeInfo instance
     */
    public static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
}

Usage Examples:

import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create type information for Hadoop Writables
TypeInformation<Text> textType = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<IntWritable> intType = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);

// Use with DataSet transformations
DataSet<Text> textData = env.fromElements(
    new Text("hello"), new Text("world"), new Text("flink")
).returns(textType);

DataSet<IntWritable> numbers = env.fromElements(
    new IntWritable(1), new IntWritable(2), new IntWritable(3)
).returns(intType);

// Sorting with Writable types (works because they implement Comparable)
DataSet<Text> sortedText = textData.sortPartition(0, Order.ASCENDING);

// Grouping and aggregation
DataSet<Tuple2<Text, IntWritable>> wordCounts = textData
    .map(text -> new Tuple2<>(text, new IntWritable(1)))
    .returns(Types.TUPLE(textType, intType))
    .groupBy(0)
    .sum(1);

Common Writable Types Support

The type system supports all standard Hadoop Writable types:

// Standard Hadoop Writable types supported
TypeInformation<Text> textInfo = WritableTypeInfo.getWritableTypeInfo(Text.class);
TypeInformation<IntWritable> intInfo = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
TypeInformation<LongWritable> longInfo = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);
TypeInformation<DoubleWritable> doubleInfo = WritableTypeInfo.getWritableTypeInfo(DoubleWritable.class);
TypeInformation<FloatWritable> floatInfo = WritableTypeInfo.getWritableTypeInfo(FloatWritable.class);
TypeInformation<BooleanWritable> boolInfo = WritableTypeInfo.getWritableTypeInfo(BooleanWritable.class);
TypeInformation<BytesWritable> bytesInfo = WritableTypeInfo.getWritableTypeInfo(BytesWritable.class);
TypeInformation<NullWritable> nullInfo = WritableTypeInfo.getWritableTypeInfo(NullWritable.class);

Custom Writable Types

Support for custom Writable implementations:

// Custom Writable class
public class MyCustomWritable implements Writable, Comparable<MyCustomWritable> {
    private String data;
    private int value;
    
    // Constructors, getters, setters...
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(data);
        out.writeInt(value);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        data = in.readUTF();
        value = in.readInt();
    }
    
    @Override
    public int compareTo(MyCustomWritable other) {
        int result = data.compareTo(other.data);
        return result != 0 ? result : Integer.compare(value, other.value);
    }
}

// Usage with Flink
TypeInformation<MyCustomWritable> customType = 
    WritableTypeInfo.getWritableTypeInfo(MyCustomWritable.class);

DataSet<MyCustomWritable> customData = env.fromElements(
    new MyCustomWritable("test", 1),
    new MyCustomWritable("data", 2)
).returns(customType);

Integration with Flink Operations

Serialization Performance

WritableTypeInfo provides efficient serialization by leveraging Hadoop's existing Writable serialization mechanism:

  • Zero-copy operations: Direct byte buffer operations where possible
  • Compact binary format: Leverages Hadoop's optimized Writable serialization
  • Type-specific optimizations: Custom serializers for common Writable types

Sorting and Grouping

Writable types that implement Comparable can be used for sorting and grouping operations:

// Sorting operations
DataSet<Text> sortedWords = words.sortPartition(0, Order.ASCENDING);

// Grouping operations  
DataSet<Tuple2<Text, IntWritable>> groupedData = data.groupBy(0).sum(1);

// Key extraction
DataSet<MyWritable> uniqueItems = data.distinct(0);

Memory Management

The type system provides efficient memory management for Writable types:

  • Object reuse: Automatic object reuse for Writable instances during processing
  • Memory-efficient storage: Compact binary representation in Flink's memory segments
  • Garbage collection optimization: Reduced object allocation through reuse patterns

Error Handling

The type system handles various error scenarios:

  • Serialization Errors: Clear error messages when Writable serialization fails
  • Type Mismatch: Runtime type checking to prevent ClassCastExceptions
  • Null Handling: Proper null value handling for nullable Writable types
  • Configuration Issues: Detailed error reporting for type configuration problems

Performance Considerations

Best Practices

  1. Reuse TypeInformation instances: Create WritableTypeInfo once and reuse across operations
  2. Implement Comparable: For Writable types used in sorting/grouping operations
  3. Optimize readFields/write: Efficient serialization implementation in custom Writables
  4. Use appropriate Writable types: Choose the most specific Writable type for your data

Memory Usage

WritableTypeInfo provides memory-efficient operations:

  • Serialized size estimation for memory planning
  • Efficient comparator implementations for sorting
  • Optimized hash code computation for grouping operations