0
# Schema-based Serialization and Deserialization
1
2
## Capabilities
3
4
### Generic Record Support
5
6
Support for working with Avro GenericRecord objects using runtime schemas.
7
8
```java { .api }
9
/**
10
* Deserialization schema for Avro GenericRecord with binary encoding
11
* @param schema The Avro schema to use for deserialization
12
* @return AvroDeserializationSchema instance for GenericRecord
13
*/
14
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
15
16
/**
17
* Deserialization schema for Avro GenericRecord with custom encoding
18
* @param schema The Avro schema to use for deserialization
19
* @param encoding The encoding type (BINARY or JSON)
20
* @return AvroDeserializationSchema instance for GenericRecord
21
*/
22
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
23
24
/**
25
* Serialization schema for Avro GenericRecord with binary encoding
26
* @param schema The Avro schema to use for serialization
27
* @return AvroSerializationSchema instance for GenericRecord
28
*/
29
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
30
31
/**
32
* Serialization schema for Avro GenericRecord with custom encoding
33
* @param schema The Avro schema to use for serialization
34
* @param encoding The encoding type (BINARY or JSON)
35
* @return AvroSerializationSchema instance for GenericRecord
36
*/
37
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
38
```
39
40
### Specific Record Support
41
42
Support for working with strongly-typed Avro SpecificRecord classes.
43
44
```java { .api }
45
/**
46
* Deserialization schema for Avro SpecificRecord with binary encoding
47
* @param recordClazz The SpecificRecord class to deserialize to
48
* @return AvroDeserializationSchema instance for the specific type
49
*/
50
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz);
51
52
/**
53
* Deserialization schema for Avro SpecificRecord with custom encoding
54
* @param recordClazz The SpecificRecord class to deserialize to
55
* @param encoding The encoding type (BINARY or JSON)
56
* @return AvroDeserializationSchema instance for the specific type
57
*/
58
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);
59
60
/**
61
* Serialization schema for Avro SpecificRecord with binary encoding
62
* @param recordClazz The SpecificRecord class to serialize from
63
* @return AvroSerializationSchema instance for the specific type
64
*/
65
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz);
66
67
/**
68
* Serialization schema for Avro SpecificRecord with custom encoding
69
* @param recordClazz The SpecificRecord class to serialize from
70
* @param encoding The encoding type (BINARY or JSON)
71
* @return AvroSerializationSchema instance for the specific type
72
*/
73
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> recordClazz, AvroEncoding encoding);
74
```
75
76
### Core Schema Operations
77
78
```java { .api }
79
/**
80
* Deserialization schema that deserializes from Avro format
81
* @param <T> Type of record it produces
82
*/
83
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
84
85
/**
86
* Deserializes the byte message to an object
87
* @param message Serialized message bytes, may be null
88
* @return Deserialized object, or null if message was null
89
* @throws IOException If deserialization fails
90
*/
91
public T deserialize(byte[] message) throws IOException;
92
93
/**
94
* Checks if the given element signals end of the stream
95
* @param nextElement The element to check
96
* @return Always false (Avro records don't signal end of stream)
97
*/
98
public boolean isEndOfStream(T nextElement);
99
100
/**
101
* Gets the data type produced by this deserializer
102
* @return TypeInformation for the produced type
103
*/
104
public TypeInformation<T> getProducedType();
105
}
106
107
/**
108
* Serialization schema that serializes objects to Avro format
109
* @param <T> Type of record it consumes
110
*/
111
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
112
113
/**
114
* Serializes the given object to a byte array
115
* @param object Object to serialize
116
* @return Serialized byte array
117
*/
118
public byte[] serialize(T object);
119
120
/**
121
* Gets the Avro schema used by this serializer
122
* @return The Avro schema
123
*/
124
public Schema getSchema();
125
126
/**
127
* Opens the serializer for initialization
128
* @param context Initialization context
129
* @throws Exception If initialization fails
130
*/
131
public void open(InitializationContext context) throws Exception;
132
}
133
```
134
135
## Usage Examples
136
137
### Generic Record Usage
138
139
```java
140
// Define Avro schema
141
Schema schema = SchemaBuilder.record("User")
142
.fields()
143
.name("id").type().longType().noDefault()
144
.name("username").type().stringType().noDefault()
145
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
146
.name("created_at").type().longType().noDefault()
147
.endRecord();
148
149
// Create deserializer
150
AvroDeserializationSchema<GenericRecord> deserializer =
151
AvroDeserializationSchema.forGeneric(schema);
152
153
// Create serializer with JSON encoding
154
AvroSerializationSchema<GenericRecord> serializer =
155
AvroSerializationSchema.forGeneric(schema, AvroEncoding.JSON);
156
157
// Use in Flink DataStream
158
DataStream<byte[]> inputStream = // ... your byte stream
159
DataStream<GenericRecord> records = inputStream
160
.map(new MapFunction<byte[], GenericRecord>() {
161
@Override
162
public GenericRecord map(byte[] value) throws Exception {
163
return deserializer.deserialize(value);
164
}
165
});
166
167
// Serialize records back to bytes
168
DataStream<byte[]> outputStream = records
169
.map(new MapFunction<GenericRecord, byte[]>() {
170
@Override
171
public byte[] map(GenericRecord record) throws Exception {
172
return serializer.serialize(record);
173
}
174
});
175
```
176
177
### Specific Record Usage
178
179
```java
180
// Assuming you have a generated SpecificRecord class
181
public class User extends SpecificRecord {
182
public Long id;
183
public String username;
184
public String email;
185
public Long createdAt;
186
187
// Generated methods...
188
}
189
190
// Create type-safe deserializer
191
AvroDeserializationSchema<User> deserializer =
192
AvroDeserializationSchema.forSpecific(User.class);
193
194
// Create type-safe serializer
195
AvroSerializationSchema<User> serializer =
196
AvroSerializationSchema.forSpecific(User.class, AvroEncoding.BINARY);
197
198
// Use in Flink DataStream with strong typing
199
DataStream<byte[]> inputStream = // ... your byte stream
200
DataStream<User> users = inputStream
201
.map(new MapFunction<byte[], User>() {
202
@Override
203
public User map(byte[] value) throws Exception {
204
return deserializer.deserialize(value);
205
}
206
});
207
208
// Process with type safety
209
DataStream<User> processedUsers = users
210
.filter(user -> user.email != null)
211
.map(user -> {
212
user.username = user.username.toLowerCase();
213
return user;
214
});
215
```
216
217
### Schema Evolution
218
219
```java
220
// Reader schema (what you expect to read)
221
Schema readerSchema = SchemaBuilder.record("User")
222
.fields()
223
.name("id").type().longType().noDefault()
224
.name("username").type().stringType().noDefault()
225
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
226
.name("full_name").type().unionOf().nullType().and().stringType().endUnion().nullDefault() // New field
227
.endRecord();
228
229
// Writer schema (what was written to the data)
230
Schema writerSchema = SchemaBuilder.record("User")
231
.fields()
232
.name("id").type().longType().noDefault()
233
.name("username").type().stringType().noDefault()
234
.name("email").type().unionOf().nullType().and().stringType().endUnion().nullDefault()
235
.endRecord();
236
237
// Avro automatically handles schema evolution during deserialization
238
AvroDeserializationSchema<GenericRecord> deserializer =
239
AvroDeserializationSchema.forGeneric(readerSchema);
240
241
// Records deserialized with reader schema will have null for new fields
242
// and missing fields will be ignored according to Avro resolution rules
243
```
244
245
### Error Handling
246
247
```java
248
AvroDeserializationSchema<GenericRecord> deserializer =
249
AvroDeserializationSchema.forGeneric(schema);
250
251
DataStream<GenericRecord> records = inputStream
252
.map(new MapFunction<byte[], GenericRecord>() {
253
@Override
254
public GenericRecord map(byte[] value) throws Exception {
255
try {
256
return deserializer.deserialize(value);
257
} catch (IOException e) {
258
// Handle deserialization errors
259
System.err.println("Failed to deserialize record: " + e.getMessage());
260
return null; // Or throw runtime exception based on requirements
261
}
262
}
263
})
264
.filter(Objects::nonNull); // Filter out failed deserializations
265
```