or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdinput-formats.mdmapreduce-functions.mdoutput-formats.mdtype-system.mdutilities.md

type-system.mddocs/

0

# Type System Integration

1

2

Custom type information and serialization support for Hadoop Writable types, ensuring seamless integration between Hadoop's serialization system and Flink's type system. Provides efficient serialization, comparison, and type safety for Hadoop data types.

3

4

## Capabilities

5

6

### WritableTypeInfo

7

8

Type information class for Hadoop Writable types that integrates with Flink's type system.

9

10

```java { .api }

11

public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {

12

13

/**

14

* Constructor for WritableTypeInfo

15

* @param typeClass The class of the Writable type

16

*/

17

public WritableTypeInfo(Class<T> typeClass);

18

19

/**

20

* Factory method to create WritableTypeInfo instances

21

* @param typeClass The class of the Writable type

22

* @return WritableTypeInfo instance for the given type

23

*/

24

public static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass);

25

26

/**

27

* Create a type comparator for this Writable type

28

* @param sortOrderAscending Whether to sort in ascending order

29

* @param executionConfig Execution configuration

30

* @return TypeComparator for this type

31

*/

32

public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig);

33

34

/**

35

* Check if this is a basic type

36

* @return false (Writable types are not considered basic types)

37

*/

38

public boolean isBasicType();

39

40

/**

41

* Check if this is a tuple type

42

* @return false (Writable types are not tuple types)

43

*/

44

public boolean isTupleType();

45

46

/**

47

* Get the arity of this type

48

* @return 1 (Writable types have arity 1)

49

*/

50

public int getArity();

51

52

/**

53

* Get the total number of fields

54

* @return 1 (Writable types have 1 field)

55

*/

56

public int getTotalFields();

57

58

/**

59

* Get the type class

60

* @return The class of the Writable type

61

*/

62

public Class<T> getTypeClass();

63

64

/**

65

* Check if this type can be used as a key

66

* @return true (Writable types can be used as keys)

67

*/

68

public boolean isKeyType();

69

70

/**

71

* Create a serializer for this type

72

* @param serializerConfig Serializer configuration

73

* @return TypeSerializer for this Writable type

74

*/

75

public TypeSerializer<T> createSerializer(SerializerConfig serializerConfig);

76

77

/**

78

* String representation of this type

79

* @return String description of the type

80

*/

81

public String toString();

82

83

/**

84

* Hash code for this type information

85

* @return Hash code

86

*/

87

public int hashCode();

88

89

/**

90

* Check equality with another object

91

* @param obj Object to compare with

92

* @return true if equal, false otherwise

93

*/

94

public boolean equals(Object obj);

95

96

/**

97

* Check if this type can be equal to another type

98

* @param obj Object to check equality capability with

99

* @return true if can be equal, false otherwise

100

*/

101

public boolean canEqual(Object obj);

102

}

103

```

104

105

**Usage Example:**

106

107

```java

108

import org.apache.flink.api.java.typeutils.WritableTypeInfo;

109

import org.apache.flink.api.common.typeinfo.TypeInformation;

110

import org.apache.hadoop.io.Text;

111

import org.apache.hadoop.io.IntWritable;

112

import org.apache.hadoop.io.LongWritable;

113

114

// Create type information for common Hadoop types

115

TypeInformation<Text> textTypeInfo = WritableTypeInfo.getWritableTypeInfo(Text.class);

116

TypeInformation<IntWritable> intTypeInfo = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);

117

TypeInformation<LongWritable> longTypeInfo = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);

118

119

// Use with Flink DataSet API

120

DataSet<Text> textDataset = env.fromElements(new Text("hello"), new Text("world"))

121

.returns(textTypeInfo);

122

123

// Use with custom Writable types

124

public static class CustomWritable implements Writable {

125

private String data;

126

private int count;

127

128

public void write(DataOutput out) throws IOException {

129

out.writeUTF(data);

130

out.writeInt(count);

131

}

132

133

public void readFields(DataInput in) throws IOException {

134

data = in.readUTF();

135

count = in.readInt();

136

}

137

138

// constructors, getters, setters...

139

}

140

141

TypeInformation<CustomWritable> customTypeInfo =

142

WritableTypeInfo.getWritableTypeInfo(CustomWritable.class);

143

144

DataSet<CustomWritable> customDataset = env.fromElements(new CustomWritable())

145

.returns(customTypeInfo);

146

```

147

148

### WritableComparator

149

150

Type comparator for Hadoop Writable types that enables sorting and grouping operations.

151

152

```java { .api }

153

public class WritableComparator<T extends Writable> extends TypeComparator<T> {

154

// Implementation details for comparing Writable types

155

// Supports hash-based operations, sorting, and grouping

156

// Uses Hadoop's WritableComparable interface when available

157

// Falls back to serialization-based comparison for non-comparable Writables

158

}

159

```

160

161

**Usage Example:**

162

163

```java

164

import org.apache.flink.api.java.DataSet;

165

import org.apache.hadoop.io.Text;

166

167

// Sorting with Writable types - comparator is automatically created

168

DataSet<Text> textDataset = env.fromElements(

169

new Text("zebra"), new Text("apple"), new Text("banana")

170

);

171

172

DataSet<Text> sortedDataset = textDataset

173

.sortPartition(text -> text, Order.ASCENDING)

174

.returns(WritableTypeInfo.getWritableTypeInfo(Text.class));

175

176

// Grouping operations

177

DataSet<Tuple2<Text, IntWritable>> keyValuePairs = // ... your dataset

178

DataSet<Tuple2<Text, IntWritable>> grouped = keyValuePairs

179

.groupBy(0) // Group by Text key (uses WritableComparator internally)

180

.sum(1); // Sum IntWritable values

181

```

182

183

### WritableSerializer

184

185

Serializer for Hadoop Writable types that handles efficient serialization and deserialization.

186

187

```java { .api }

188

public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {

189

// Implementation details for serializing Writable types

190

// Uses Hadoop's Writable serialization mechanism

191

// Handles object reuse and efficient byte stream operations

192

// Supports both mutable and immutable serialization patterns

193

}

194

```

195

196

**Usage Example:**

197

198

```java

199

// Serialization is handled automatically by Flink when using WritableTypeInfo

200

// Manual serializer creation is typically not needed

201

202

import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;

203

import org.apache.flink.core.memory.DataInputView;

204

import org.apache.flink.core.memory.DataOutputView;

205

import org.apache.hadoop.io.Text;

206

207

// Example of custom serialization logic (rarely needed)

208

WritableSerializer<Text> textSerializer = new WritableSerializer<>(Text.class);

209

210

// Serialization

211

Text original = new Text("example");

212

byte[] serialized = // ... serialize using Flink's serialization framework

213

Text deserialized = textSerializer.deserialize(/* ... */);

214

```

215

216

## Common Writable Types Support

217

218

### Standard Hadoop Types

219

220

All standard Hadoop Writable types are supported out of the box.

221

222

```java { .api }

223

// Numeric types

224

TypeInformation<IntWritable> intType = WritableTypeInfo.getWritableTypeInfo(IntWritable.class);

225

TypeInformation<LongWritable> longType = WritableTypeInfo.getWritableTypeInfo(LongWritable.class);

226

TypeInformation<FloatWritable> floatType = WritableTypeInfo.getWritableTypeInfo(FloatWritable.class);

227

TypeInformation<DoubleWritable> doubleType = WritableTypeInfo.getWritableTypeInfo(DoubleWritable.class);

228

229

// Text and boolean types

230

TypeInformation<Text> textType = WritableTypeInfo.getWritableTypeInfo(Text.class);

231

TypeInformation<BooleanWritable> boolType = WritableTypeInfo.getWritableTypeInfo(BooleanWritable.class);

232

233

// Null type

234

TypeInformation<NullWritable> nullType = WritableTypeInfo.getWritableTypeInfo(NullWritable.class);

235

236

// Array types

237

TypeInformation<ArrayWritable> arrayType = WritableTypeInfo.getWritableTypeInfo(ArrayWritable.class);

238

TypeInformation<TwoDArrayWritable> array2DType = WritableTypeInfo.getWritableTypeInfo(TwoDArrayWritable.class);

239

240

// Map and other container types

241

TypeInformation<MapWritable> mapType = WritableTypeInfo.getWritableTypeInfo(MapWritable.class);

242

TypeInformation<SortedMapWritable> sortedMapType = WritableTypeInfo.getWritableTypeInfo(SortedMapWritable.class);

243

```

244

245

### Custom Writable Types

246

247

Support for user-defined Writable implementations.

248

249

```java

250

// Example custom Writable with complex data

251

public static class PersonWritable implements Writable, WritableComparable<PersonWritable> {

252

private String name;

253

private int age;

254

private String email;

255

256

public PersonWritable() {} // Default constructor required

257

258

public PersonWritable(String name, int age, String email) {

259

this.name = name;

260

this.age = age;

261

this.email = email;

262

}

263

264

@Override

265

public void write(DataOutput out) throws IOException {

266

out.writeUTF(name != null ? name : "");

267

out.writeInt(age);

268

out.writeUTF(email != null ? email : "");

269

}

270

271

@Override

272

public void readFields(DataInput in) throws IOException {

273

name = in.readUTF();

274

age = in.readInt();

275

email = in.readUTF();

276

}

277

278

@Override

279

public int compareTo(PersonWritable other) {

280

int result = name.compareTo(other.name);

281

if (result == 0) {

282

result = Integer.compare(age, other.age);

283

}

284

return result;

285

}

286

287

// getters, setters, equals, hashCode...

288

}

289

290

// Use custom Writable type in Flink

291

TypeInformation<PersonWritable> personType =

292

WritableTypeInfo.getWritableTypeInfo(PersonWritable.class);

293

294

DataSet<PersonWritable> people = env.fromElements(

295

new PersonWritable("Alice", 25, "alice@example.com"),

296

new PersonWritable("Bob", 30, "bob@example.com")

297

).returns(personType);

298

299

// Sorting works automatically due to WritableComparable implementation

300

DataSet<PersonWritable> sortedPeople = people.sortPartition(person -> person, Order.ASCENDING);

301

```

302

303

## Integration Patterns

304

305

### Automatic Type Extraction

306

307

Flink can often automatically extract Writable type information.

308

309

```java

310

// Automatic type extraction for Hadoop InputFormats

311

HadoopInputFormat<LongWritable, Text> inputFormat =

312

HadoopInputs.readHadoopFile(

313

new TextInputFormat(),

314

LongWritable.class, // Type information extracted automatically

315

Text.class,

316

"input/path"

317

);

318

319

DataSet<Tuple2<LongWritable, Text>> dataset = env.createInput(inputFormat);

320

// TypeInformation for Tuple2<LongWritable, Text> is created automatically

321

```

322

323

### Explicit Type Information

324

325

Providing explicit type information when automatic extraction fails.

326

327

```java

328

// Explicit type information for complex scenarios

329

TypeInformation<Tuple2<Text, CustomWritable>> tupleType =

330

new TupleTypeInfo<>(

331

WritableTypeInfo.getWritableTypeInfo(Text.class),

332

WritableTypeInfo.getWritableTypeInfo(CustomWritable.class)

333

);

334

335

DataSet<Tuple2<Text, CustomWritable>> dataset = env.fromElements(

336

new Tuple2<>(new Text("key"), new CustomWritable())

337

).returns(tupleType);

338

```

339

340

### Performance Considerations

341

342

Optimizing performance with Writable types.

343

344

```java

345

// Reuse objects to reduce garbage collection

346

public static class EfficientMapper

347

extends RichMapFunction<Tuple2<LongWritable, Text>, Tuple2<Text, IntWritable>> {

348

349

private transient Text outputKey;

350

private transient IntWritable outputValue;

351

352

@Override

353

public void open(Configuration parameters) {

354

outputKey = new Text();

355

outputValue = new IntWritable();

356

}

357

358

@Override

359

public Tuple2<Text, IntWritable> map(Tuple2<LongWritable, Text> input) {

360

String text = input.f1.toString();

361

outputKey.set(text.toLowerCase());

362

outputValue.set(text.length());

363

return new Tuple2<>(outputKey, outputValue);

364

}

365

}

366

```

367

368

## Key Design Patterns

369

370

### Type Safety

371

WritableTypeInfo ensures compile-time type safety while maintaining runtime efficiency through Hadoop's proven serialization mechanisms.

372

373

### Performance Optimization

374

- Uses Hadoop's native serialization for maximum compatibility

375

- Supports object reuse patterns to minimize garbage collection

376

- Provides efficient comparators for sorting and grouping operations

377

378

### Compatibility

379

- Full compatibility with existing Hadoop Writable types

380

- Support for both WritableComparable and plain Writable interfaces

381

- Seamless integration with Flink's type system and operations

382

383

### Configuration Integration

384

Type information integrates with Flink's configuration system and can be serialized along with job graphs for distributed execution.