or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfilesystem.mdindex.mdregistry.mdrowdata.mdschemas.mdutilities.md

utilities.mddocs/

0

# Type System and Utilities

1

2

## Capabilities

3

4

### Schema Conversion Utilities

5

6

Utilities for converting between Flink types and Avro schemas.

7

8

```java { .api }

9

/**

10

* Utility class for converting between Flink RowType and Avro Schema

11

* Located in org.apache.flink.formats.avro.typeutils package

12

*/

13

public class AvroSchemaConverter {

14

15

/**

16

* Converts a Flink LogicalType to an Avro Schema

17

* @param logicalType The Flink LogicalType to convert

18

* @return Corresponding Avro Schema

19

*/

20

public static Schema convertToSchema(LogicalType logicalType);

21

22

/**

23

* Converts a Flink LogicalType to an Avro Schema with timestamp mapping control

24

* @param logicalType The Flink LogicalType to convert

25

* @param legacyTimestampMapping Whether to use legacy timestamp mapping

26

* @return Corresponding Avro Schema

27

*/

28

public static Schema convertToSchema(LogicalType logicalType, boolean legacyTimestampMapping);

29

30

/**

31

* Converts a Flink LogicalType to an Avro Schema with custom row name

32

* @param logicalType The Flink LogicalType to convert

33

* @param rowName Name for the root record schema

34

* @return Corresponding Avro Schema

35

*/

36

public static Schema convertToSchema(LogicalType logicalType, String rowName);

37

38

/**

39

* Converts an Avro schema string to a Flink DataType

40

* @param avroSchemaString The Avro schema definition string

41

* @return Corresponding Flink DataType

42

*/

43

public static DataType convertToDataType(String avroSchemaString);

44

45

/**

46

* Converts an Avro schema string to a Flink DataType with timestamp mapping control

47

* @param avroSchemaString The Avro schema definition string

48

* @param legacyTimestampMapping Whether to use legacy timestamp mapping

49

* @return Corresponding Flink DataType

50

*/

51

public static DataType convertToDataType(String avroSchemaString, boolean legacyTimestampMapping);

52

53

/**

54

* Converts an Avro class into Flink TypeInformation

55

* @param avroClass Avro specific record class

56

* @return TypeInformation matching the schema

57

*/

58

public static <T extends SpecificRecord> TypeInformation<Row> convertToTypeInfo(Class<T> avroClass);

59

60

/**

61

* Converts an Avro schema string into Flink TypeInformation

62

* @param avroSchemaString Avro schema definition string

63

* @return TypeInformation matching the schema

64

*/

65

public static <T> TypeInformation<T> convertToTypeInfo(String avroSchemaString);

66

}

67

```

68

69

### Type Information Classes

70

71

Type information classes for Avro record types in Flink's type system.

72

73

```java { .api }

74

/**

75

* Type information for Avro records

76

* @param <T> The Avro record type

77

*/

78

public class AvroTypeInfo<T> extends TypeInformation<T> {

79

80

/**

81

* Creates AvroTypeInfo for a specific record class

82

* @param recordClazz The Avro record class

83

*/

84

public AvroTypeInfo(Class<T> recordClazz);

85

86

/**

87

* Creates AvroTypeInfo with explicit schema

88

* @param recordClazz The Avro record class

89

* @param schema The Avro schema

90

*/

91

public AvroTypeInfo(Class<T> recordClazz, Schema schema);

92

93

/**

94

* Gets the Avro schema for this type

95

* @return The Avro schema

96

*/

97

public Schema getAvroSchema();

98

99

/**

100

* Gets the record class

101

* @return The Java class for the Avro record

102

*/

103

public Class<T> getRecordClazz();

104

105

/**

106

* Creates a serializer for this type

107

* @param config Execution configuration

108

* @return TypeSerializer for this Avro type

109

*/

110

public TypeSerializer<T> createSerializer(ExecutionConfig config);

111

}

112

113

/**

114

* Specialized type information for Avro GenericRecord

115

*/

116

public class GenericRecordAvroTypeInfo extends AvroTypeInfo<GenericRecord> {

117

118

/**

119

* Creates type information for GenericRecord with schema

120

* @param schema The Avro schema for the GenericRecord

121

*/

122

public GenericRecordAvroTypeInfo(Schema schema);

123

}

124

```

125

126

### Schema Utilities

127

128

```java { .api }

129

/**

130

* Utility for encoding and decoding Avro schemas

131

*/

132

public class SchemaCoder {

133

134

/**

135

* Encodes an Avro schema to a string representation

136

* @param schema The schema to encode

137

* @return Encoded schema string

138

*/

139

public static String encode(Schema schema);

140

141

/**

142

* Decodes a schema from string representation

143

* @param encodedSchema The encoded schema string

144

* @return Decoded Avro schema

145

*/

146

public static Schema decode(String encodedSchema);

147

148

/**

149

* Validates schema compatibility between reader and writer schemas

150

* @param writerSchema The schema used to write data

151

* @param readerSchema The schema used to read data

152

* @return true if schemas are compatible

153

*/

154

public static boolean isCompatible(Schema writerSchema, Schema readerSchema);

155

}

156

157

/**

158

* Serializable wrapper for Avro Schema objects

159

*/

160

public class SerializableAvroSchema implements Serializable {

161

162

/**

163

* Creates a serializable schema wrapper

164

* @param schema The Avro schema to wrap

165

*/

166

public SerializableAvroSchema(Schema schema);

167

168

/**

169

* Gets the wrapped Avro schema

170

* @return The Avro schema

171

*/

172

public Schema getAvroSchema();

173

174

/**

175

* Creates from schema string

176

* @param schemaString JSON representation of the schema

177

* @return SerializableAvroSchema instance

178

*/

179

public static SerializableAvroSchema fromString(String schemaString);

180

}

181

```

182

183

### Kryo Serialization Support

184

185

```java { .api }

186

/**

187

* Utilities for Kryo serialization of Avro objects

188

*/

189

public class AvroKryoSerializerUtils {

190

191

/**

192

* Registers Avro types with Kryo for efficient serialization

193

* @param kryo The Kryo instance to register with

194

*/

195

public static void registerAvroTypes(Kryo kryo);

196

197

/**

198

* Creates optimized Kryo serializer for specific Avro record type

199

* @param recordClass The Avro record class

200

* @return Configured Kryo serializer

201

*/

202

public static <T> Serializer<T> createAvroSerializer(Class<T> recordClass);

203

}

204

```

205

206

## Usage Examples

207

208

### Schema Conversion

209

210

```java

211

// Convert Flink RowType to Avro Schema

212

RowType flinkRowType = RowType.of(

213

new DataType[] {

214

DataTypes.BIGINT(),

215

DataTypes.STRING(),

216

DataTypes.ARRAY(DataTypes.STRING()),

217

DataTypes.ROW(

218

DataTypes.FIELD("street", DataTypes.STRING()),

219

DataTypes.FIELD("city", DataTypes.STRING()),

220

DataTypes.FIELD("zipcode", DataTypes.STRING())

221

),

222

DataTypes.TIMESTAMP(3)

223

},

224

new String[] {"user_id", "username", "tags", "address", "created_at"}

225

);

226

227

// Convert to Avro schema

228

Schema avroSchema = AvroSchemaConverter.convertToSchema(flinkRowType);

229

230

System.out.println("Generated Avro Schema:");

231

System.out.println(avroSchema.toString(true));

232

233

// Convert back to Flink RowType

234

RowType convertedRowType = AvroSchemaConverter.convertToRowType(avroSchema);

235

236

// Convert to DataType for table processing

237

DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);

238

```

239

240

### Type Information Usage

241

242

```java

243

// Create type information for SpecificRecord

244

AvroTypeInfo<User> userTypeInfo = new AvroTypeInfo<>(User.class);

245

246

// Use in DataStream

247

DataStream<User> userStream = env

248

.fromCollection(userList, userTypeInfo)

249

.map(new MapFunction<User, User>() {

250

@Override

251

public User map(User user) throws Exception {

252

// Process user

253

return user;

254

}

255

});

256

257

// Create type information for GenericRecord

258

Schema schema = new Schema.Parser().parse(schemaString);

259

GenericRecordAvroTypeInfo genericTypeInfo = new GenericRecordAvroTypeInfo(schema);

260

261

DataStream<GenericRecord> genericStream = env

262

.fromCollection(genericRecordList, genericTypeInfo);

263

264

// Get schema from type info

265

Schema retrievedSchema = userTypeInfo.getAvroSchema();

266

Class<User> recordClass = userTypeInfo.getRecordClazz();

267

```

268

269

### Schema Validation and Compatibility

270

271

```java

272

// Schema evolution example

273

Schema v1Schema = SchemaBuilder.record("User")

274

.fields()

275

.name("id").type().longType().noDefault()

276

.name("name").type().stringType().noDefault()

277

.endRecord();

278

279

Schema v2Schema = SchemaBuilder.record("User")

280

.fields()

281

.name("id").type().longType().noDefault()

282

.name("name").type().stringType().noDefault()

283

.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()

284

.endRecord();

285

286

// Check compatibility

287

boolean compatible = SchemaCoder.isCompatible(v1Schema, v2Schema);

288

System.out.println("Schemas compatible: " + compatible);

289

290

// Encode schema for storage/transmission

291

String encodedSchema = SchemaCoder.encode(v2Schema);

292

293

// Decode schema

294

Schema decodedSchema = SchemaCoder.decode(encodedSchema);

295

296

// Create serializable schema for distributed processing

297

SerializableAvroSchema serializableSchema = new SerializableAvroSchema(v2Schema);

298

```

299

300

### Complex Type Mapping

301

302

```java

303

// Complex Flink type with nested structures

304

RowType complexType = RowType.of(

305

new DataType[] {

306

DataTypes.ROW(

307

DataTypes.FIELD("personal", DataTypes.ROW(

308

DataTypes.FIELD("firstName", DataTypes.STRING()),

309

DataTypes.FIELD("lastName", DataTypes.STRING()),

310

DataTypes.FIELD("age", DataTypes.INT())

311

)),

312

DataTypes.FIELD("contact", DataTypes.ROW(

313

DataTypes.FIELD("email", DataTypes.STRING()),

314

DataTypes.FIELD("phones", DataTypes.ARRAY(DataTypes.STRING()))

315

))

316

),

317

DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),

318

DataTypes.ARRAY(DataTypes.ROW(

319

DataTypes.FIELD("type", DataTypes.STRING()),

320

DataTypes.FIELD("value", DataTypes.STRING())

321

))

322

},

323

new String[] {"user_profile", "metadata", "preferences"}

324

);

325

326

// Convert complex type to Avro schema

327

Schema complexAvroSchema = AvroSchemaConverter.convertToSchema(complexType);

328

329

// The generated schema will have proper Avro types:

330

// - ROW becomes RECORD

331

// - ARRAY becomes ARRAY

332

// - MAP becomes MAP

333

// - Nested structures are properly represented

334

335

System.out.println("Complex Avro Schema:");

336

System.out.println(complexAvroSchema.toString(true));

337

```

338

339

### Custom Type Registration

340

341

```java

342

// Register custom Avro types with Flink's type system

343

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

344

345

// Register Avro types for better performance

346

env.getConfig().registerTypeWithKryoSerializer(

347

GenericRecord.class,

348

AvroKryoSerializerUtils.createAvroSerializer(GenericRecord.class)

349

);

350

351

// Register specific record types

352

env.getConfig().registerTypeWithKryoSerializer(

353

User.class,

354

AvroKryoSerializerUtils.createAvroSerializer(User.class)

355

);

356

357

// Enable Kryo for all Avro types

358

AvroKryoSerializerUtils.registerAvroTypes(

359

env.getConfig().getSerializerConfig().getKryo()

360

);

361

```

362

363

### Schema Registry Integration with Utilities

364

365

```java

366

// Utility for managing schemas with registry

367

public class AvroSchemaManager {

368

369

private final SchemaRegistryClient registryClient;

370

371

public AvroSchemaManager(String registryUrl) {

372

this.registryClient = new CachedSchemaRegistryClient(registryUrl, 100);

373

}

374

375

/**

376

* Convert Flink table to Avro schema and register

377

*/

378

public int registerTableSchema(String subject, RowType rowType) throws IOException {

379

Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);

380

return registryClient.register(subject, avroSchema);

381

}

382

383

/**

384

* Get Flink RowType from registered schema

385

*/

386

public RowType getRowTypeFromRegistry(String subject) throws IOException {

387

Schema schema = registryClient.getLatestSchemaMetadata(subject).getSchema();

388

return AvroSchemaConverter.convertToRowType(schema);

389

}

390

391

/**

392

* Validate schema evolution

393

*/

394

public boolean validateEvolution(String subject, RowType newRowType) throws IOException {

395

Schema newSchema = AvroSchemaConverter.convertToSchema(newRowType);

396

return registryClient.testCompatibility(subject, newSchema);

397

}

398

}

399

400

// Usage

401

AvroSchemaManager schemaManager = new AvroSchemaManager("http://localhost:8081");

402

403

// Register table schema

404

RowType tableSchema = RowType.of(

405

new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},

406

new String[] {"id", "name"}

407

);

408

409

int schemaId = schemaManager.registerTableSchema("users-value", tableSchema);

410

411

// Later, retrieve schema for processing

412

RowType retrievedSchema = schemaManager.getRowTypeFromRegistry("users-value");

413

```

414

415

### Performance Optimization with Type Information

416

417

```java

418

// Optimized processing with proper type information

419

public class OptimizedAvroProcessor {

420

421

public static <T> DataStream<T> processAvroStream(

422

DataStream<byte[]> rawStream,

423

AvroTypeInfo<T> typeInfo) {

424

425

// Create efficient deserializer

426

AvroDeserializationSchema<T> deserializer =

427

AvroDeserializationSchema.forGeneric(typeInfo.getAvroSchema());

428

429

return rawStream

430

.map(new MapFunction<byte[], T>() {

431

@Override

432

public T map(byte[] value) throws Exception {

433

return deserializer.deserialize(value);

434

}

435

})

436

.returns(typeInfo); // Explicit type information for optimization

437

}

438

439

public static DataStream<GenericRecord> optimizeGenericRecordProcessing(

440

DataStream<GenericRecord> stream,

441

Schema schema) {

442

443

// Use optimized type info

444

GenericRecordAvroTypeInfo typeInfo = new GenericRecordAvroTypeInfo(schema);

445

446

return stream

447

.rebalance() // Distribute load evenly

448

.map(new RichMapFunction<GenericRecord, GenericRecord>() {

449

private transient GenericRecord reusableRecord;

450

451

@Override

452

public void open(Configuration parameters) {

453

// Create reusable record for better performance

454

reusableRecord = new GenericData.Record(schema);

455

}

456

457

@Override

458

public GenericRecord map(GenericRecord record) throws Exception {

459

// Reuse record object to reduce GC pressure

460

for (Schema.Field field : schema.getFields()) {

461

reusableRecord.put(field.name(), record.get(field.name()));

462

}

463

return reusableRecord;

464

}

465

})

466

.returns(typeInfo);

467

}

468

}

469

```