0
# Serialization and Deserialization
1
2
Core serialization and deserialization functionality for converting between Java objects and Avro binary/JSON format in Flink streaming applications. Supports both generic and specific record types with configurable encoding options.
3
4
## AvroSerializationSchema
5
6
Serializes Java objects to Avro binary or JSON format for use in Flink streaming operations.
7
8
```java { .api }
9
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
10
// Static factory methods for specific records
11
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass);
12
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
13
14
// Static factory methods for generic records
15
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
16
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
17
18
// Instance methods
19
public byte[] serialize(T object);
20
public Schema getSchema();
21
public void open(InitializationContext context) throws Exception;
22
}
23
```
24
25
### Usage Examples
26
27
**For Specific Records (generated from Avro schema):**
28
29
```java
30
import org.apache.flink.formats.avro.AvroSerializationSchema;
31
import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
32
33
// Binary encoding (default)
34
AvroSerializationSchema<User> userSerializer = AvroSerializationSchema.forSpecific(User.class);
35
36
// JSON encoding
37
AvroSerializationSchema<User> jsonSerializer = AvroSerializationSchema.forSpecific(User.class, AvroEncoding.JSON);
38
39
// Use in DataStream
40
DataStream<User> userStream = ...;
41
DataStream<byte[]> serializedStream = userStream.map(userSerializer::serialize);
42
```
43
44
**For Generic Records:**
45
46
```java
47
import org.apache.avro.Schema;
48
import org.apache.avro.generic.GenericRecord;
49
50
// Parse schema from string
51
Schema schema = new Schema.Parser().parse(schemaString);
52
53
// Create serializer
54
AvroSerializationSchema<GenericRecord> genericSerializer = AvroSerializationSchema.forGeneric(schema);
55
56
// Use with generic records
57
DataStream<GenericRecord> recordStream = ...;
58
DataStream<byte[]> serializedStream = recordStream.map(genericSerializer::serialize);
59
```
60
61
## AvroDeserializationSchema
62
63
Deserializes Avro binary or JSON format back to Java objects for use in Flink streaming operations.
64
65
```java { .api }
66
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
67
// Static factory methods for generic records
68
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
69
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
70
71
// Static factory methods for specific records
72
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass);
73
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
74
75
// Instance methods
76
public T deserialize(byte[] message) throws IOException;
77
public TypeInformation<T> getProducedType();
78
public boolean isEndOfStream(T nextElement);
79
}
80
```
81
82
### Usage Examples
83
84
**For Specific Records:**
85
86
```java
87
// Create deserializer for specific record type
88
AvroDeserializationSchema<User> userDeserializer = AvroDeserializationSchema.forSpecific(User.class);
89
90
// Use in Kafka source
91
FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(
92
"user-topic",
93
userDeserializer,
94
properties
95
);
96
97
DataStream<User> userStream = env.addSource(consumer);
98
```
99
100
**For Generic Records:**
101
102
```java
103
// Create deserializer for generic records
104
AvroDeserializationSchema<GenericRecord> genericDeserializer =
105
AvroDeserializationSchema.forGeneric(schema);
106
107
// Use in streaming context
108
DataStream<byte[]> byteStream = ...;
109
DataStream<GenericRecord> recordStream = byteStream.map(data -> {
110
try {
111
return genericDeserializer.deserialize(data);
112
} catch (IOException e) {
113
throw new RuntimeException("Deserialization failed", e);
114
}
115
});
116
```
117
118
## Encoding Options
119
120
Both serialization and deserialization schemas support two encoding formats:
121
122
```java { .api }
123
public enum AvroEncoding implements DescribedEnum {
124
BINARY("binary", "Use binary encoding for serialization and deserialization."),
125
JSON("json", "Use JSON encoding for serialization and deserialization.");
126
}
127
```
128
129
### Binary Encoding
130
- **Default encoding**
131
- More compact and space-efficient
132
- Better performance for high-throughput scenarios
133
- Standard Avro binary format
134
135
### JSON Encoding
136
- Human-readable format
137
- Useful for debugging and development
138
- Larger message size compared to binary
139
- Compatible with JSON processing tools
140
141
## Error Handling
142
143
**Serialization Errors:**
144
- Returns `null` for `null` input objects
145
- Throws `WrappingRuntimeException` for schema or encoding failures
146
147
**Deserialization Errors:**
148
- Returns `null` for `null` input bytes
149
- Throws `IOException` for malformed data or schema mismatches
150
- Use try-catch blocks when deserializing in user functions
151
152
## Integration with Flink Type System
153
154
The deserialization schema automatically provides type information to Flink:
155
156
```java
157
// Type information is automatically inferred
158
TypeInformation<User> typeInfo = userDeserializer.getProducedType();
159
160
// For generic records, includes schema information
161
TypeInformation<GenericRecord> genericTypeInfo = genericDeserializer.getProducedType();
162
```
163
164
## Performance Considerations
165
166
- **Reuse Objects**: The schemas handle object reuse internally for better performance
167
- **Schema Caching**: Schemas are parsed once and cached for reuse
168
- **Thread Safety**: Schemas are thread-safe and can be shared across operators
169
- **Memory Management**: Binary encoding is more memory-efficient than JSON for large datasets