or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

type-system-integration.mddocs/

0

# Type System Integration

1

2

Type information classes and utilities for seamless integration with Flink's type system. Provides efficient serialization, type safety, and schema conversion between Flink and Avro type systems.

3

4

## AvroTypeInfo

5

6

Type information class for Avro specific record types that extend `SpecificRecordBase`.

7

8

```java { .api }

9

public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {

10

// Constructor

11

public AvroTypeInfo(Class<T> typeClass);

12

13

// Serializer creation

14

public TypeSerializer<T> createSerializer(SerializerConfig config);

15

}

16

```

17

18

### Usage Examples

19

20

**Creating Type Information:**

21

22

```java

23

import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;

24

25

// Create type information for specific record

26

AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);

27

28

// Use in DataStream operations

29

DataStream<User> userStream = env.fromCollection(users, userTypeInfo);

30

31

// Explicit type hints

32

DataStream<User> transformedStream = rawStream

33

.map(data -> parseUser(data))

34

.returns(userTypeInfo);

35

```

36

37

**With DataSet API:**

38

39

```java

40

// Create DataSet with explicit type information

41

DataSet<User> userDataSet = env.fromCollection(userList, userTypeInfo);

42

43

// Type-safe operations

44

DataSet<String> names = userDataSet

45

.map(user -> user.getName().toString())

46

.returns(Types.STRING);

47

```

48

49

## GenericRecordAvroTypeInfo

50

51

Type information class for Avro generic records with schema information.

52

53

```java { .api }

54

public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {

55

// Constructor

56

public GenericRecordAvroTypeInfo(Schema schema);

57

58

// Type system integration

59

public TypeSerializer<GenericRecord> createSerializer(SerializerConfig config);

60

public boolean isBasicType();

61

public boolean isTupleType();

62

public int getArity();

63

public int getTotalFields();

64

}

65

```

66

67

### Usage Examples

68

69

**Generic Record Processing:**

70

71

```java

72

import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;

73

import org.apache.avro.Schema;

74

import org.apache.avro.generic.GenericRecord;

75

76

// Parse schema

77

Schema schema = new Schema.Parser().parse(schemaString);

78

79

// Create type information

80

GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);

81

82

// Use in streaming

83

DataStream<GenericRecord> recordStream = env

84

.fromCollection(genericRecords, typeInfo);

85

86

// Type-safe transformations

87

DataStream<String> nameStream = recordStream

88

.map(record -> record.get("name").toString())

89

.name("extract-names");

90

```

91

92

**Schema Evolution Support:**

93

94

```java

95

// Handle multiple schema versions

96

Schema readerSchema = getReaderSchema();

97

Schema writerSchema = getWriterSchema();

98

99

GenericRecordAvroTypeInfo readerTypeInfo = new GenericRecordAvroTypeInfo(readerSchema);

100

101

// Process with schema evolution

102

DataStream<GenericRecord> evolvedStream = rawStream

103

.map(new SchemaEvolutionMapper(writerSchema, readerSchema))

104

.returns(readerTypeInfo);

105

```

106

107

## AvroSerializer

108

109

High-performance serializer for Avro types with schema evolution support.

110

111

```java { .api }

112

public class AvroSerializer<T> extends TypeSerializer<T> {

113

// Constructor

114

public AvroSerializer(Class<T> type);

115

116

// Serialization operations

117

public boolean isImmutableType();

118

public TypeSerializer<T> duplicate();

119

public T createInstance();

120

public T copy(T from);

121

public T copy(T from, T reuse);

122

public int getLength();

123

public void serialize(T record, DataOutputView target) throws IOException;

124

public T deserialize(DataInputView source) throws IOException;

125

public T deserialize(T reuse, DataInputView source) throws IOException;

126

public void copy(DataInputView source, DataOutputView target) throws IOException;

127

}

128

```

129

130

### Performance Characteristics

131

132

**Object Reuse:**

133

```java

134

// Serializer supports object reuse for better performance

135

AvroSerializer<User> serializer = new AvroSerializer<>(User.class);

136

137

// Reuse instance across deserialization calls

138

User reusableUser = serializer.createInstance();

139

User deserializedUser = serializer.deserialize(reusableUser, dataInput);

140

```

141

142

**Memory Efficiency:**

143

```java

144

// Efficient copying without full deserialization

145

serializer.copy(inputView, outputView); // Direct byte copying when possible

146

```

147

148

## AvroSchemaConverter

149

150

Utility class for converting between Flink and Avro type systems.

151

152

```java { .api }

153

public class AvroSchemaConverter {

154

// Schema conversion methods

155

public static Schema convertToSchema(RowType rowType);

156

public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);

157

public static LogicalType convertToLogicalType(Schema schema);

158

public static RowType convertToRowType(Schema schema);

159

public static RowType convertToRowType(Schema schema, boolean legacyTimestampMapping);

160

}

161

```

162

163

### Type Conversion Examples

164

165

**Flink RowType to Avro Schema:**

166

167

```java

168

import org.apache.flink.table.types.logical.*;

169

import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;

170

171

// Define Flink row type

172

RowType rowType = RowType.of(

173

new LogicalType[] {

174

new VarCharType(50), // name

175

new IntType(), // age

176

new BooleanType(), // active

177

new TimestampType(3), // created_at

178

new DecimalType(10, 2), // salary

179

new ArrayType(new VarCharType(20)) // tags

180

},

181

new String[] {"name", "age", "active", "created_at", "salary", "tags"}

182

);

183

184

// Convert to Avro schema

185

Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);

186

187

// With legacy timestamp mapping

188

Schema legacySchema = AvroSchemaConverter.convertToSchema(rowType, true);

189

```

190

191

**Avro Schema to Flink RowType:**

192

193

```java

194

// Parse Avro schema

195

String schemaJson = "{ \"type\": \"record\", \"name\": \"User\", ... }";

196

Schema schema = new Schema.Parser().parse(schemaJson);

197

198

// Convert to Flink row type

199

RowType flinkRowType = AvroSchemaConverter.convertToRowType(schema);

200

201

// Use in Table API

202

Table table = tableEnv.fromDataStream(stream, Schema.of(flinkRowType));

203

```

204

205

### Type Mapping Reference

206

207

**Primitive Types:**

208

209

| Flink Type | Avro Type | Notes |

210

|------------|-----------|-------|

211

| BooleanType | boolean | Direct mapping |

212

| TinyIntType | int | Promoted to int |

213

| SmallIntType | int | Promoted to int |

214

| IntType | int | Direct mapping |

215

| BigIntType | long | Direct mapping |

216

| FloatType | float | Direct mapping |

217

| DoubleType | double | Direct mapping |

218

| VarCharType/CharType | string | UTF-8 encoding |

219

| VarBinaryType/BinaryType | bytes | Direct mapping |

220

221

**Complex Types:**

222

223

| Flink Type | Avro Type | Notes |

224

|------------|-----------|-------|

225

| ArrayType | array | Element type converted recursively |

226

| MapType | map | Key must be string, value converted |

227

| RowType | record | Fields converted recursively |

228

| MultisetType | map | Special map with int values |

229

230

**Temporal Types:**

231

232

| Flink Type | Avro Type | Logical Type |

233

|------------|-----------|--------------|

234

| DateType | int | date |

235

| TimeType | int | time-millis |

236

| TimestampType | long | timestamp-millis (legacy) |

237

| TimestampType | long | local-timestamp-millis (correct) |

238

| LocalZonedTimestampType | long | timestamp-millis |

239

240

**Decimal Types:**

241

242

| Flink Type | Avro Type | Logical Type |

243

|------------|-----------|--------------|

244

| DecimalType | bytes | decimal with precision/scale |

245

| DecimalType | fixed | decimal with precision/scale |

246

247

## SerializableAvroSchema

248

249

Wrapper class for making Avro Schema objects serializable in Flink operations.

250

251

```java { .api }

252

public class SerializableAvroSchema implements Serializable {

253

// Constructor

254

public SerializableAvroSchema(Schema schema);

255

256

// Schema access

257

public Schema getAvroSchema();

258

259

// Serialization support

260

private void writeObject(ObjectOutputStream out) throws IOException;

261

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException;

262

}

263

```

264

265

### Usage in Distributed Operations

266

267

```java

268

// Wrap schema for serialization

269

Schema schema = new Schema.Parser().parse(schemaString);

270

SerializableAvroSchema serializableSchema = new SerializableAvroSchema(schema);

271

272

// Use in map function

273

DataStream<GenericRecord> processed = recordStream.map(new RichMapFunction<GenericRecord, GenericRecord>() {

274

private transient Schema schema;

275

276

@Override

277

public void open(Configuration parameters) {

278

this.schema = serializableSchema.getAvroSchema();

279

}

280

281

@Override

282

public GenericRecord map(GenericRecord record) throws Exception {

283

// Use schema in processing

284

return processRecord(record, schema);

285

}

286

});

287

```

288

289

## AvroFactory

290

291

Factory utilities for creating Avro-specific data structures and serializers.

292

293

```java { .api }

294

public class AvroFactory {

295

// Schema extraction

296

public static Schema extractAvroSpecificSchema(Class<?> specificRecordClass, SpecificData specificData);

297

298

// SpecificData creation

299

public static SpecificData getSpecificDataForClass(Class<? extends SpecificData> specificDataClass, ClassLoader classLoader);

300

}

301

```

302

303

### Advanced Usage

304

305

```java

306

// Extract schema from specific record class

307

Class<User> userClass = User.class;

308

SpecificData specificData = SpecificData.get();

309

Schema userSchema = AvroFactory.extractAvroSpecificSchema(userClass, specificData);

310

311

// Custom SpecificData with class loader

312

ClassLoader customClassLoader = Thread.currentThread().getContextClassLoader();

313

SpecificData customSpecificData = AvroFactory.getSpecificDataForClass(

314

SpecificData.class,

315

customClassLoader

316

);

317

```

318

319

## Performance Optimization

320

321

### Serializer Configuration

322

323

**Enable Object Reuse:**

324

```java

325

// Configure execution environment for object reuse

326

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

327

env.getConfig().enableObjectReuse(); // Better performance with Avro serializers

328

```

329

330

**Type Hint Usage:**

331

```java

332

// Provide explicit type information to avoid reflection

333

DataStream<User> typedStream = rawStream

334

.map(data -> parseUser(data))

335

.returns(new AvroTypeInfo<>(User.class)); // Avoid type erasure

336

```

337

338

### Schema Caching

339

340

```java

341

// Cache converted schemas to avoid repeated conversion

342

private static final ConcurrentHashMap<String, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();

343

344

public Schema getCachedSchema(RowType rowType) {

345

String key = rowType.toString();

346

return SCHEMA_CACHE.computeIfAbsent(key,

347

k -> AvroSchemaConverter.convertToSchema(rowType));

348

}

349

```

350

351

## Error Handling

352

353

**Type Mismatch Errors:**

354

```java

355

try {

356

Schema convertedSchema = AvroSchemaConverter.convertToSchema(rowType);

357

} catch (IllegalArgumentException e) {

358

logger.error("Unsupported type conversion: " + e.getMessage());

359

// Handle unsupported type

360

}

361

```

362

363

**Serialization Errors:**

364

```java

365

try {

366

serializer.serialize(record, output);

367

} catch (IOException e) {

368

logger.error("Serialization failed", e);

369

// Implement error recovery

370

}

371

```

372

373

**Schema Evolution Errors:**

374

```java

375

// Handle schema evolution gracefully

376

try {

377

GenericRecord evolved = SchemaEvolution.evolve(record, oldSchema, newSchema);

378

} catch (AvroRuntimeException e) {

379

logger.warn("Schema evolution failed, using default values", e);

380

// Apply default values or skip record

381

}

382

```