or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfilesystem.mdindex.mdregistry.mdrowdata.mdschemas.mdutilities.md

schemas.mddocs/

0

# Schema-based Serialization and Deserialization

1

2

## Capabilities

3

4

### Generic Record Support

5

6

Support for working with Avro GenericRecord objects using runtime schemas.

7

8

```java { .api }

9

/**

10

* Deserialization schema for Avro GenericRecord with binary encoding

11

* @param schema The Avro schema to use for deserialization

12

* @return AvroDeserializationSchema instance for GenericRecord

13

*/

14

public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);

15

16

/**

17

* Deserialization schema for Avro GenericRecord with custom encoding

18

* @param schema The Avro schema to use for deserialization

19

* @param encoding The encoding type (BINARY or JSON)

20

* @return AvroDeserializationSchema instance for GenericRecord

21

*/

22

public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

23

24

/**

25

* Serialization schema for Avro GenericRecord with binary encoding

26

* @param schema The Avro schema to use for serialization

27

* @return AvroSerializationSchema instance for GenericRecord

28

*/

29

public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);

30

31

/**

32

* Serialization schema for Avro GenericRecord with custom encoding

33

* @param schema The Avro schema to use for serialization

34

* @param encoding The encoding type (BINARY or JSON)

35

* @return AvroSerializationSchema instance for GenericRecord

36

*/

37

public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

38

```

39

40

### Specific Record Support

41

42

Support for working with strongly-typed Avro SpecificRecord classes.

43

44

```java { .api }

45

/**

46

* Deserialization schema for Avro SpecificRecord with binary encoding

47

* @param recordClazz The SpecificRecord class to deserialize to

48

* @return AvroDeserializationSchema instance for the specific type

49

*/

50

public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);

51

52

/**

53

* Deserialization schema for Avro SpecificRecord with custom encoding

54

* @param recordClazz The SpecificRecord class to deserialize to

55

* @param encoding The encoding type (BINARY or JSON)

56

* @return AvroDeserializationSchema instance for the specific type

57

*/

58

public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);

59

60

/**

61

* Serialization schema for Avro SpecificRecord with binary encoding

62

* @param recordClazz The SpecificRecord class to serialize from

63

* @return AvroSerializationSchema instance for the specific type

64

*/

65

public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);

66

67

/**

68

* Serialization schema for Avro SpecificRecord with custom encoding

69

* @param recordClazz The SpecificRecord class to serialize from

70

* @param encoding The encoding type (BINARY or JSON)

71

* @return AvroSerializationSchema instance for the specific type

72

*/

73

public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);

74

```

75

76

### Core Schema Operations

77

78

```java { .api }

79

/**

80

* Deserialization schema that deserializes from Avro format

81

* @param <T> Type of record it produces

82

*/

83

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

84

85

/**

86

* Deserializes the byte message to an object

87

* @param message Serialized message bytes, may be null

88

* @return Deserialized object, or null if message was null

89

* @throws IOException If deserialization fails

90

*/

91

public T deserialize(byte[] message) throws IOException;

92

93

/**

94

* Checks if the given element signals end of the stream

95

* @param nextElement The element to check

96

* @return Always false (Avro records don't signal end of stream)

97

*/

98

public boolean isEndOfStream(T nextElement);

99

100

/**

101

* Gets the data type produced by this deserializer

102

* @return TypeInformation for the produced type

103

*/

104

public TypeInformation<T> getProducedType();

105

}

106

107

/**

108

* Serialization schema that serializes objects to Avro format

109

* @param <T> Type of record it consumes

110

*/

111

public class AvroSerializationSchema<T> implements SerializationSchema<T> {

112

113

/**

114

* Serializes the given object to a byte array

115

* @param object Object to serialize

116

* @return Serialized byte array

117

*/

118

public byte[] serialize(T object);

119

120

/**

121

* Gets the Avro schema used by this serializer

122

* @return The Avro schema

123

*/

124

public Schema getSchema();

125

126

/**

127

* Opens the serializer for initialization

128

* @param context Initialization context

129

* @throws Exception If initialization fails

130

*/

131

public void open(InitializationContext context) throws Exception;

132

}

133

```

134

135

## Usage Examples

136

137

### Generic Record Usage

138

139

```java

140

// Define Avro schema

141

Schema schema = SchemaBuilder.record("User")

142

.fields()

143

.name("id").type().longType().noDefault()

144

.name("username").type().stringType().noDefault()

145

.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()

146

.name("created_at").type().longType().noDefault()

147

.endRecord();

148

149

// Create deserializer

150

AvroDeserializationSchema<GenericRecord> deserializer =

151

AvroDeserializationSchema.forGeneric(schema);

152

153

// Create serializer with JSON encoding

154

AvroSerializationSchema<GenericRecord> serializer =

155

AvroSerializationSchema.forGeneric(schema, AvroEncoding.JSON);

156

157

// Use in Flink DataStream

158

DataStream<byte[]> inputStream = // ... your byte stream

159

DataStream<GenericRecord> records = inputStream

160

.map(new MapFunction<byte[], GenericRecord>() {

161

@Override

162

public GenericRecord map(byte[] value) throws Exception {

163

return deserializer.deserialize(value);

164

}

165

});

166

167

// Serialize records back to bytes

168

DataStream<byte[]> outputStream = records

169

.map(new MapFunction<GenericRecord, byte[]>() {

170

@Override

171

public byte[] map(GenericRecord record) throws Exception {

172

return serializer.serialize(record);

173

}

174

});

175

```

176

177

### Specific Record Usage

178

179

```java

180

// Assuming you have a generated SpecificRecord class

181

public class User extends SpecificRecord {

182

public Long id;

183

public String username;

184

public String email;

185

public Long createdAt;

186

187

// Generated methods...

188

}

189

190

// Create type-safe deserializer

191

AvroDeserializationSchema<User> deserializer =

192

AvroDeserializationSchema.forSpecific(User.class);

193

194

// Create type-safe serializer

195

AvroSerializationSchema<User> serializer =

196

AvroSerializationSchema.forSpecific(User.class, AvroEncoding.BINARY);

197

198

// Use in Flink DataStream with strong typing

199

DataStream<byte[]> inputStream = // ... your byte stream

200

DataStream<User> users = inputStream

201

.map(new MapFunction<byte[], User>() {

202

@Override

203

public User map(byte[] value) throws Exception {

204

return deserializer.deserialize(value);

205

}

206

});

207

208

// Process with type safety

209

DataStream<User> processedUsers = users

210

.filter(user -> user.email != null)

211

.map(user -> {

212

user.username = user.username.toLowerCase();

213

return user;

214

});

215

```

216

217

### Schema Evolution

218

219

```java

220

// Reader schema (what you expect to read)

221

Schema readerSchema = SchemaBuilder.record("User")

222

.fields()

223

.name("id").type().longType().noDefault()

224

.name("username").type().stringType().noDefault()

225

.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()

226

.name("full_name").type().unionOf().nullType().and().stringType().endUnion().nullDefault() // New field

227

.endRecord();

228

229

// Writer schema (what was written to the data)

230

Schema writerSchema = SchemaBuilder.record("User")

231

.fields()

232

.name("id").type().longType().noDefault()

233

.name("username").type().stringType().noDefault()

234

.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()

235

.endRecord();

236

237

// Avro automatically handles schema evolution during deserialization

238

AvroDeserializationSchema<GenericRecord> deserializer =

239

AvroDeserializationSchema.forGeneric(readerSchema);

240

241

// Records deserialized with reader schema will have null for new fields

242

// and missing fields will be ignored according to Avro resolution rules

243

```

244

245

### Error Handling

246

247

```java

248

AvroDeserializationSchema<GenericRecord> deserializer =

249

AvroDeserializationSchema.forGeneric(schema);

250

251

DataStream<GenericRecord> records = inputStream

252

.map(new MapFunction<byte[], GenericRecord>() {

253

@Override

254

public GenericRecord map(byte[] value) throws Exception {

255

try {

256

return deserializer.deserialize(value);

257

} catch (IOException e) {

258

// Handle deserialization errors

259

System.err.println("Failed to deserialize record: " + e.getMessage());

260

return null; // Or throw runtime exception based on requirements

261

}

262

}

263

})

264

.filter(Objects::nonNull); // Filter out failed deserializations

265

```