0
# Type System Integration
1
2
Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration between Hadoop's serialization system and Flink's type system. Provides efficient serialization, comparison, and type safety for Hadoop data types.
3
4
## Capabilities
5
6
### WritableTypeInfo
7
8
Type information class for Hadoop Writable types that integrates with Flink's type system.
9
10
```java { .api }
11
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
12
13
/**
14
* Constructor for WritableTypeInfo
15
* @param typeClass The class of the Writable type
16
*/
17
public WritableTypeInfo(Class<T> typeClass);
18
19
/**
20
* Factory method to create WritableTypeInfo instances
21
* @param typeClass The class of the Writable type
22
* @return WritableTypeInfo instance for the given type
23
*/
24
public static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);
25
26
/**
27
* Create a type comparator for this Writable type
28
* @param sortOrderAscending Whether to sort in ascending order
29
* @param executionConfig Execution configuration
30
* @return TypeComparator for this type
31
*/
32
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);
33
34
/**
35
* Check if this is a basic type
36
* @return false (Writable types are not considered basic types)
37
*/
38
public boolean isBasicType();
39
40
/**
41
* Check if this is a tuple type
42
* @return false (Writable types are not tuple types)
43
*/
44
public boolean isTupleType();
45
46
/**
47
* Get the arity of this type
48
* @return 1 (Writable types have arity 1)
49
*/
50
public int getArity();
51
52
/**
53
* Get the total number of fields
54
* @return 1 (Writable types have 1 field)
55
*/
56
public int getTotalFields();
57
58
/**
59
* Get the type class
60
* @return The class of the Writable type
61
*/
62
public Class<T> getTypeClass();
63
64
/**
65
* Check if this type can be used as a key
66
* @return true (Writable types can be used as keys)
67
*/
68
public boolean isKeyType();
69
70
/**
71
* Create a serializer for this type
72
* @param serializerConfig Serializer configuration
73
* @return TypeSerializer for this Writable type
74
*/
75
public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);
76
77
/**
78
* String representation of this type
79
* @return String description of the type
80
*/
81
public String toString();
82
83
/**
84
* Hash code for this type information
85
* @return Hash code
86
*/
87
public int hashCode();
88
89
/**
90
* Check equality with another object
91
* @param obj Object to compare with
92
* @return true if equal, false otherwise
93
*/
94
public boolean equals(Object obj);
95
96
/**
97
* Check if this type can be equal to another type
98
* @param obj Object to check equality capability with
99
* @return true if can be equal, false otherwise
100
*/
101
public boolean canEqual(Object obj);
102
}
103
```
104
105
**Usage Example:**
106
107
```java
108
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
109
import org.apache.flink.api.common.typeinfo.TypeInformation;
110
import org.apache.hadoop.io.Text;
111
import org.apache.hadoop.io.IntWritable;
112
import org.apache.hadoop.io.LongWritable;
113
114
// Create type information for common Hadoop types
115
TypeInformation<Text> textTypeInfo = WritableTypeInfo.getWritableTypeInfo(Text.class);
116
TypeInformation<IntWritable> intTypeInfo = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
117
TypeInformation<LongWritable> longTypeInfo = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);
118
119
// Use with Flink DataSet API
120
DataSet<Text> textDataset = env.fromElements(new Text("hello"), new Text("world"))
121
.returns(textTypeInfo);
122
123
// Use with custom Writable types
124
public static class CustomWritable implements Writable {
125
private String data;
126
private int count;
127
128
public void write(DataOutput out) throws IOException {
129
out.writeUTF(data);
130
out.writeInt(count);
131
}
132
133
public void readFields(DataInput in) throws IOException {
134
data = in.readUTF();
135
count = in.readInt();
136
}
137
138
// constructors, getters, setters...
139
}
140
141
TypeInformation<CustomWritable> customTypeInfo =
142
WritableTypeInfo.getWritableTypeInfo(CustomWritable.class);
143
144
DataSet<CustomWritable> customDataset = env.fromElements(new CustomWritable())
145
.returns(customTypeInfo);
146
```
147
148
### WritableComparator
149
150
Type comparator for Hadoop Writable types that enables sorting and grouping operations.
151
152
```java { .api }
153
public class WritableComparator<T extends Writable> extends TypeComparator<T> {
154
// Implementation details for comparing Writable types
155
// Supports hash-based operations, sorting, and grouping
156
// Uses Hadoop's WritableComparable interface when available
157
// Falls back to serialization-based comparison for non-comparable Writables
158
}
159
```
160
161
**Usage Example:**
162
163
```java
164
import org.apache.flink.api.java.DataSet;
165
import org.apache.hadoop.io.Text;
166
167
// Sorting with Writable types - comparator is automatically created
168
DataSet<Text> textDataset = env.fromElements(
169
new Text("zebra"), new Text("apple"), new Text("banana")
170
);
171
172
DataSet<Text> sortedDataset = textDataset
173
.sortPartition(text -> text, Order.ASCENDING)
174
.returns(WritableTypeInfo.getWritableTypeInfo(Text.class));
175
176
// Grouping operations
177
DataSet<Tuple2<Text, IntWritable>> keyValuePairs = // ... your dataset
178
DataSet<Tuple2<Text, IntWritable>> grouped = keyValuePairs
179
.groupBy(0) // Group by Text key (uses WritableComparator internally)
180
.sum(1); // Sum IntWritable values
181
```
182
183
### WritableSerializer
184
185
Serializer for Hadoop Writable types that handles efficient serialization and deserialization.
186
187
```java { .api }
188
public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
189
// Implementation details for serializing Writable types
190
// Uses Hadoop's Writable serialization mechanism
191
// Handles object reuse and efficient byte stream operations
192
// Supports both mutable and immutable serialization patterns
193
}
194
```
195
196
**Usage Example:**
197
198
```java
199
// Serialization is handled automatically by Flink when using WritableTypeInfo
200
// Manual serializer creation is typically not needed
201
202
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
203
import org.apache.flink.core.memory.DataInputView;
204
import org.apache.flink.core.memory.DataOutputView;
205
import org.apache.hadoop.io.Text;
206
207
// Example of custom serialization logic (rarely needed)
208
WritableSerializer<Text> textSerializer = new WritableSerializer<>(Text.class);
209
210
// Serialization
211
Text original = new Text("example");
212
byte[] serialized = // ... serialize using Flink's serialization framework
213
Text deserialized = textSerializer.deserialize(/* ... */);
214
```
215
216
## Common Writable Types Support
217
218
### Standard Hadoop Types
219
220
All standard Hadoop Writable types are supported out of the box.
221
222
```java { .api }
223
// Numeric types
224
TypeInformation<IntWritable> intType = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);
225
TypeInformation<LongWritable> longType = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);
226
TypeInformation<FloatWritable> floatType = WritableTypeInfo.getWritableTypeInfo(FloatWritable.class);
227
TypeInformation<DoubleWritable> doubleType = WritableTypeInfo.getWritableTypeInfo(DoubleWritable.class);
228
229
// Text and boolean types
230
TypeInformation<Text> textType = WritableTypeInfo.getWritableTypeInfo(Text.class);
231
TypeInformation<BooleanWritable> boolType = WritableTypeInfo.getWritableTypeInfo(BooleanWritable.class);
232
233
// Null type
234
TypeInformation<NullWritable> nullType = WritableTypeInfo.getWritableTypeInfo(NullWritable.class);
235
236
// Array types
237
TypeInformation<ArrayWritable> arrayType = WritableTypeInfo.getWritableTypeInfo(ArrayWritable.class);
238
TypeInformation<TwoDArrayWritable> array2DType = WritableTypeInfo.getWritableTypeInfo(TwoDArrayWritable.class);
239
240
// Map and other container types
241
TypeInformation<MapWritable> mapType = WritableTypeInfo.getWritableTypeInfo(MapWritable.class);
242
TypeInformation<SortedMapWritable> sortedMapType = WritableTypeInfo.getWritableTypeInfo(SortedMapWritable.class);
243
```
244
245
### Custom Writable Types
246
247
Support for user-defined Writable implementations.
248
249
```java
250
// Example custom Writable with complex data
251
public static class PersonWritable implements Writable, WritableComparable<PersonWritable> {
252
private String name;
253
private int age;
254
private String email;
255
256
public PersonWritable() {} // Default constructor required
257
258
public PersonWritable(String name, int age, String email) {
259
this.name = name;
260
this.age = age;
261
this.email = email;
262
}
263
264
@Override
265
public void write(DataOutput out) throws IOException {
266
out.writeUTF(name != null ? name : "");
267
out.writeInt(age);
268
out.writeUTF(email != null ? email : "");
269
}
270
271
@Override
272
public void readFields(DataInput in) throws IOException {
273
name = in.readUTF();
274
age = in.readInt();
275
email = in.readUTF();
276
}
277
278
@Override
279
public int compareTo(PersonWritable other) {
280
int result = name.compareTo(other.name);
281
if (result == 0) {
282
result = Integer.compare(age, other.age);
283
}
284
return result;
285
}
286
287
// getters, setters, equals, hashCode...
288
}
289
290
// Use custom Writable type in Flink
291
TypeInformation<PersonWritable> personType =
292
WritableTypeInfo.getWritableTypeInfo(PersonWritable.class);
293
294
DataSet<PersonWritable> people = env.fromElements(
295
new PersonWritable("Alice", 25, "alice@example.com"),
296
new PersonWritable("Bob", 30, "bob@example.com")
297
).returns(personType);
298
299
// Sorting works automatically due to WritableComparable implementation
300
DataSet<PersonWritable> sortedPeople = people.sortPartition(person -> person, Order.ASCENDING);
301
```
302
303
## Integration Patterns
304
305
### Automatic Type Extraction
306
307
Flink can often automatically extract Writable type information.
308
309
```java
310
// Automatic type extraction for Hadoop InputFormats
311
HadoopInputFormat<LongWritable, Text> inputFormat =
312
HadoopInputs.readHadoopFile(
313
new TextInputFormat(),
314
LongWritable.class, // Type information extracted automatically
315
Text.class,
316
"input/path"
317
);
318
319
DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(inputFormat);
320
// TypeInformation for Tuple2<LongWritable, Text> is created automatically
321
```
322
323
### Explicit Type Information
324
325
Providing explicit type information when automatic extraction fails.
326
327
```java
328
// Explicit type information for complex scenarios
329
TypeInformation<Tuple2<Text, CustomWritable>> tupleType =
330
new TupleTypeInfo<>(
331
WritableTypeInfo.getWritableTypeInfo(Text.class),
332
WritableTypeInfo.getWritableTypeInfo(CustomWritable.class)
333
);
334
335
DataSet<Tuple2<Text, CustomWritable>> dataset = env.fromElements(
336
new Tuple2<>(new Text("key"), new CustomWritable())
337
).returns(tupleType);
338
```
339
340
### Performance Considerations
341
342
Optimizing performance with Writable types.
343
344
```java
345
// Reuse objects to reduce garbage collection
346
public static class EfficientMapper
347
extends RichMapFunction<Tuple2<LongWritable, Text>, Tuple2<Text, IntWritable>> {
348
349
private transient Text outputKey;
350
private transient IntWritable outputValue;
351
352
@Override
353
public void open(Configuration parameters) {
354
outputKey = new Text();
355
outputValue = new IntWritable();
356
}
357
358
@Override
359
public Tuple2<Text, IntWritable> map(Tuple2<LongWritable, Text> input) {
360
String text = input.f1.toString();
361
outputKey.set(text.toLowerCase());
362
outputValue.set(text.length());
363
return new Tuple2<>(outputKey, outputValue);
364
}
365
}
366
```
367
368
## Key Design Patterns
369
370
### Type Safety
371
WritableTypeInfo ensures compile-time type safety while maintaining runtime efficiency through Hadoop's proven serialization mechanisms.
372
373
### Performance Optimization
374
- Uses Hadoop's native serialization for maximum compatibility
375
- Supports object reuse patterns to minimize garbage collection
376
- Provides efficient comparators for sorting and grouping operations
377
378
### Compatibility
379
- Full compatibility with existing Hadoop Writable types
380
- Support for both WritableComparable and plain Writable interfaces
381
- Seamless integration with Flink's type system and operations
382
383
### Configuration Integration
384
Type information integrates with Flink's configuration system and can be serialized along with job graphs for distributed execution.