or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdscala-api.mdtype-system.md

type-system.mddocs/

0

# Type System Integration

1

2

The Type System Integration capability provides comprehensive support for Hadoop Writable types within Flink's type system, enabling efficient serialization, deserialization, and comparison of Hadoop data types in Flink applications.

3

4

## Overview

5

6

Flink's type system requires explicit type information for serialization and comparison operations. The Hadoop compatibility layer provides specialized type information classes that bridge Hadoop's Writable interface with Flink's TypeInformation system, ensuring optimal performance and correctness.

7

8

## WritableTypeInfo

9

10

The primary class for integrating Hadoop Writable types with Flink's type system.

11

12

```java { .api }

13

@Public

14

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

15

16

// Constructor

17

@PublicEvolving

18

public WritableTypeInfo(Class<T> typeClass);

19

20

// Create type-specific serializer

21

@PublicEvolving

22

public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig);

23

24

// Create type-specific comparator for sorting

25

@PublicEvolving

26

public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);

27

28

// Type introspection methods

29

@PublicEvolving

30

public boolean isBasicType();

31

@PublicEvolving

32

public boolean isTupleType();

33

@PublicEvolving

34

public int getArity();

35

@PublicEvolving

36

public int getTotalFields();

37

@PublicEvolving

38

public Class<T> getTypeClass();

39

@PublicEvolving

40

public boolean isKeyType();

41

42

// Package-private factory method (internal use)

43

@PublicEvolving

44

static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);

45

}

46

```

47

48

## WritableSerializer

49

50

High-performance serializer for Hadoop Writable types.

51

52

```java { .api }

53

@Internal

54

public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {

55

56

// Constructor

57

public WritableSerializer(Class<T> typeClass);

58

59

// Instance creation and copying

60

public T createInstance();

61

public T copy(T from);

62

public T copy(T from, T reuse);

63

64

// Serialization metadata

65

public int getLength();

66

public boolean isImmutableType();

67

68

// Serialization operations

69

public void serialize(T record, DataOutputView target) throws IOException;

70

public T deserialize(DataInputView source) throws IOException;

71

public T deserialize(T reuse, DataInputView source) throws IOException;

72

public void copy(DataInputView source, DataOutputView target) throws IOException;

73

74

// Serializer management

75

public WritableSerializer<T> duplicate();

76

}

77

```

78

79

## WritableComparator

80

81

Efficient comparator for Hadoop Writable types supporting both in-memory and serialized comparison.

82

83

```java { .api }

84

@Internal

85

public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {

86

87

// Constructor

88

public WritableComparator(boolean ascending, Class<T> type);

89

90

// Hash computation

91

public int hash(T record);

92

93

// Reference-based comparison

94

public void setReference(T toCompare);

95

public boolean equalToReference(T candidate);

96

public int compareToReference(TypeComparator<T> referencedComparator);

97

98

// Direct comparison

99

public int compare(T first, T second);

100

101

// Serialized data comparison

102

public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException;

103

104

// Normalized key support for sorting optimization

105

public boolean supportsNormalizedKey();

106

public int getNormalizeKeyLen();

107

public boolean isNormalizedKeyPrefixOnly(int keyBytes);

108

public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes);

109

public boolean invertNormalizedKey();

110

111

// Comparator management

112

public TypeComparator<T> duplicate();

113

}

114

```

115

116

## Usage Examples

117

118

### Basic Type Information Usage

119

120

```java

121

import org.apache.flink.api.java.typeutils.WritableTypeInfo;

122

import org.apache.hadoop.io.Text;

123

import org.apache.hadoop.io.IntWritable;

124

125

// Create type information for Hadoop Writable types

126

TypeInformation<Text> textTypeInfo = new WritableTypeInfo<>(Text.class);

127

TypeInformation<IntWritable> intTypeInfo = new WritableTypeInfo<>(IntWritable.class);

128

129

// Use with DataSet operations

130

DataSet<Text> textData = env.fromElements(

131

new Text("Hello"),

132

new Text("World")

133

).returns(textTypeInfo);

134

135

DataSet<IntWritable> intData = env.fromElements(

136

new IntWritable(1),

137

new IntWritable(2),

138

new IntWritable(3)

139

).returns(intTypeInfo);

140

```

141

142

### Custom Writable Types

143

144

```java

145

// Define custom Writable class

146

public class CustomWritable implements Writable, Comparable<CustomWritable> {

147

private int id;

148

private String name;

149

150

public CustomWritable() {}

151

152

public CustomWritable(int id, String name) {

153

this.id = id;

154

this.name = name;

155

}

156

157

@Override

158

public void write(DataOutput out) throws IOException {

159

out.writeInt(id);

160

out.writeUTF(name);

161

}

162

163

@Override

164

public void readFields(DataInput in) throws IOException {

165

id = in.readInt();

166

name = in.readUTF();

167

}

168

169

@Override

170

public int compareTo(CustomWritable other) {

171

int result = Integer.compare(this.id, other.id);

172

if (result == 0) {

173

result = this.name.compareTo(other.name);

174

}

175

return result;

176

}

177

178

// hashCode, equals, toString methods...

179

}

180

```

181

182

```java

183

// Use custom Writable with type information

184

TypeInformation<CustomWritable> customTypeInfo =

185

new WritableTypeInfo<>(CustomWritable.class);

186

187

DataSet<CustomWritable> customData = env.fromElements(

188

new CustomWritable(1, "Alice"),

189

new CustomWritable(2, "Bob"),

190

new CustomWritable(3, "Charlie")

191

).returns(customTypeInfo);

192

193

// Sort using the Comparable implementation

194

DataSet<CustomWritable> sortedData = customData.sortPartition(0, Order.ASCENDING);

195

```

196

197

### Tuple Types with Writables

198

199

```java

200

import org.apache.flink.api.java.tuple.Tuple2;

201

import org.apache.flink.api.java.typeutils.TupleTypeInfo;

202

203

// Create type information for tuples containing Writables

204

TypeInformation<Tuple2<IntWritable, Text>> tupleTypeInfo =

205

new TupleTypeInfo<>(

206

new WritableTypeInfo<>(IntWritable.class),

207

new WritableTypeInfo<>(Text.class)

208

);

209

210

DataSet<Tuple2<IntWritable, Text>> tupleData = env.fromElements(

211

new Tuple2<>(new IntWritable(1), new Text("First")),

212

new Tuple2<>(new IntWritable(2), new Text("Second"))

213

).returns(tupleTypeInfo);

214

```

215

216

### Serialization Configuration

217

218

```java

219

import org.apache.flink.api.common.ExecutionConfig;

220

import org.apache.flink.api.common.typeutils.TypeSerializer;

221

222

// Get serializer for custom configuration

223

ExecutionConfig config = env.getConfig();

224

TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);

225

TypeSerializer<Text> textSerializer = textType.createSerializer(config);

226

227

// The serializer can be used for manual serialization if needed

228

// (typically handled automatically by Flink)

229

```

230

231

### Performance Optimization

232

233

```java

234

// Enable object reuse for better performance with Writable types

235

env.getConfig().enableObjectReuse();

236

237

// This is particularly beneficial for Writable types as they support

238

// in-place deserialization, reducing garbage collection pressure

239

```

240

241

## Common Writable Types

242

243

The following Hadoop Writable types are commonly used and fully supported:

244

245

```java { .api }

246

// Primitive wrappers

247

import org.apache.hadoop.io.BooleanWritable;

248

import org.apache.hadoop.io.ByteWritable;

249

import org.apache.hadoop.io.IntWritable;

250

import org.apache.hadoop.io.LongWritable;

251

import org.apache.hadoop.io.FloatWritable;

252

import org.apache.hadoop.io.DoubleWritable;

253

254

// Text and binary data

255

import org.apache.hadoop.io.Text;

256

import org.apache.hadoop.io.BytesWritable;

257

258

// Null values

259

import org.apache.hadoop.io.NullWritable;

260

261

// Variable-length integers

262

import org.apache.hadoop.io.VIntWritable;

263

import org.apache.hadoop.io.VLongWritable;

264

265

// Collections

266

import org.apache.hadoop.io.ArrayWritable;

267

import org.apache.hadoop.io.MapWritable;

268

import org.apache.hadoop.io.SortedMapWritable;

269

```

270

271

### Type Information for Common Writables

272

273

```java

274

// Get type information for common Hadoop types

275

TypeInformation<Text> textType = new WritableTypeInfo<>(Text.class);

276

TypeInformation<IntWritable> intType = new WritableTypeInfo<>(IntWritable.class);

277

TypeInformation<LongWritable> longType = new WritableTypeInfo<>(LongWritable.class);

278

TypeInformation<BooleanWritable> boolType = new WritableTypeInfo<>(BooleanWritable.class);

279

TypeInformation<DoubleWritable> doubleType = new WritableTypeInfo<>(DoubleWritable.class);

280

TypeInformation<NullWritable> nullType = new WritableTypeInfo<>(NullWritable.class);

281

```

282

283

## Advanced Usage

284

285

### Custom Serialization Logic

286

287

```java

288

// For complex Writable types that need special handling

289

public class ComplexWritable implements Writable {

290

private Map<String, List<Integer>> data;

291

292

@Override

293

public void write(DataOutput out) throws IOException {

294

out.writeInt(data.size());

295

for (Map.Entry<String, List<Integer>> entry : data.entrySet()) {

296

out.writeUTF(entry.getKey());

297

List<Integer> values = entry.getValue();

298

out.writeInt(values.size());

299

for (Integer value : values) {

300

out.writeInt(value);

301

}

302

}

303

}

304

305

@Override

306

public void readFields(DataInput in) throws IOException {

307

int mapSize = in.readInt();

308

data = new HashMap<>(mapSize);

309

for (int i = 0; i < mapSize; i++) {

310

String key = in.readUTF();

311

int listSize = in.readInt();

312

List<Integer> values = new ArrayList<>(listSize);

313

for (int j = 0; j < listSize; j++) {

314

values.add(in.readInt());

315

}

316

data.put(key, values);

317

}

318

}

319

}

320

```

321

322

### Normalized Key Optimization

323

324

For high-performance sorting operations, Writable types that implement proper `compareTo` methods can benefit from normalized key optimization:

325

326

```java

327

public class OptimizedWritable implements Writable, Comparable<OptimizedWritable> {

328

private long primaryKey;

329

private String secondaryKey;

330

331

@Override

332

public int compareTo(OptimizedWritable other) {

333

// Primary comparison on long value (efficient for normalized keys)

334

int result = Long.compare(this.primaryKey, other.primaryKey);

335

if (result == 0) {

336

result = this.secondaryKey.compareTo(other.secondaryKey);

337

}

338

return result;

339

}

340

341

// Writable implementation...

342

}

343

```

344

345

## Error Handling

346

347

Common issues and their solutions:

348

349

```java

350

try {

351

TypeInformation<MyWritable> typeInfo =

352

new WritableTypeInfo<>(MyWritable.class);

353

} catch (IllegalArgumentException e) {

354

// Class doesn't implement Writable interface

355

logger.error("Type must implement Writable interface: " + e.getMessage());

356

} catch (RuntimeException e) {

357

// Issues with reflection or class instantiation

358

logger.error("Failed to create type information: " + e.getMessage());

359

}

360

```

361

362

### Common Problems

363

364

1. **Missing default constructor**: Writable classes must have a public no-argument constructor

365

2. **Incomplete Writable implementation**: Both `write()` and `readFields()` must be properly implemented

366

3. **Inconsistent serialization**: The order and format of writes must match reads exactly

367

4. **Missing Comparable implementation**: For sorting operations, implement `Comparable<T>`

368

369

### Best Practices

370

371

1. Always test Writable serialization/deserialization in isolation

372

2. Implement `hashCode()` and `equals()` consistently with `compareTo()`

373

3. Use appropriate buffer sizes for large data structures

374

4. Consider implementing `toString()` for debugging

375

5. Use object reuse when processing large datasets with Writable types