or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

serialization-deserialization.mddocs/

0

# Serialization and Deserialization

1

2

Core serialization and deserialization functionality for converting between Java objects and Avro binary/JSON format in Flink streaming applications. Supports both generic and specific record types with configurable encoding options.

3

4

## AvroSerializationSchema

5

6

Serializes Java objects to Avro binary or JSON format for use in Flink streaming operations.

7

8

```java { .api }

9

public class AvroSerializationSchema<T> implements SerializationSchema<T> {

10

// Static factory methods for specific records

11

public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass);

12

public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);

13

14

// Static factory methods for generic records

15

public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);

16

public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

17

18

// Instance methods

19

public byte[] serialize(T object);

20

public Schema getSchema();

21

public void open(InitializationContext context) throws Exception;

22

}

23

```

24

25

### Usage Examples

26

27

**For Specific Records (generated from Avro schema):**

28

29

```java

30

import org.apache.flink.formats.avro.AvroSerializationSchema;

31

import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;

32

33

// Binary encoding (default)

34

AvroSerializationSchema<User> userSerializer = AvroSerializationSchema.forSpecific(User.class);

35

36

// JSON encoding

37

AvroSerializationSchema<User> jsonSerializer = AvroSerializationSchema.forSpecific(User.class, AvroEncoding.JSON);

38

39

// Use in DataStream

40

DataStream<User> userStream = ...;

41

DataStream<byte[]> serializedStream = userStream.map(userSerializer::serialize);

42

```

43

44

**For Generic Records:**

45

46

```java

47

import org.apache.avro.Schema;

48

import org.apache.avro.generic.GenericRecord;

49

50

// Parse schema from string

51

Schema schema = new Schema.Parser().parse(schemaString);

52

53

// Create serializer

54

AvroSerializationSchema<GenericRecord> genericSerializer = AvroSerializationSchema.forGeneric(schema);

55

56

// Use with generic records

57

DataStream<GenericRecord> recordStream = ...;

58

DataStream<byte[]> serializedStream = recordStream.map(genericSerializer::serialize);

59

```

60

61

## AvroDeserializationSchema

62

63

Deserializes Avro binary or JSON format back to Java objects for use in Flink streaming operations.

64

65

```java { .api }

66

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

67

// Static factory methods for generic records

68

public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);

69

public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);

70

71

// Static factory methods for specific records

72

public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass);

73

public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);

74

75

// Instance methods

76

public T deserialize(byte[] message) throws IOException;

77

public TypeInformation<T> getProducedType();

78

public boolean isEndOfStream(T nextElement);

79

}

80

```

81

82

### Usage Examples

83

84

**For Specific Records:**

85

86

```java

87

// Create deserializer for specific record type

88

AvroDeserializationSchema<User> userDeserializer = AvroDeserializationSchema.forSpecific(User.class);

89

90

// Use in Kafka source

91

FlinkKafkaConsumer<User> consumer = new FlinkKafkaConsumer<>(

92

"user-topic",

93

userDeserializer,

94

properties

95

);

96

97

DataStream<User> userStream = env.addSource(consumer);

98

```

99

100

**For Generic Records:**

101

102

```java

103

// Create deserializer for generic records

104

AvroDeserializationSchema<GenericRecord> genericDeserializer =

105

AvroDeserializationSchema.forGeneric(schema);

106

107

// Use in streaming context

108

DataStream<byte[]> byteStream = ...;

109

DataStream<GenericRecord> recordStream = byteStream.map(data -> {

110

try {

111

return genericDeserializer.deserialize(data);

112

} catch (IOException e) {

113

throw new RuntimeException("Deserialization failed", e);

114

}

115

});

116

```

117

118

## Encoding Options

119

120

Both serialization and deserialization schemas support two encoding formats:

121

122

```java { .api }

123

public enum AvroEncoding implements DescribedEnum {

124

BINARY("binary", "Use binary encoding for serialization and deserialization."),

125

JSON("json", "Use JSON encoding for serialization and deserialization.");

126

}

127

```

128

129

### Binary Encoding

130

- **Default encoding**

131

- More compact and space-efficient

132

- Better performance for high-throughput scenarios

133

- Standard Avro binary format

134

135

### JSON Encoding

136

- Human-readable format

137

- Useful for debugging and development

138

- Larger message size compared to binary

139

- Compatible with JSON processing tools

140

141

## Error Handling

142

143

**Serialization Errors:**

144

- Returns `null` for `null` input objects

145

- Throws `WrappingRuntimeException` for schema or encoding failures

146

147

**Deserialization Errors:**

148

- Returns `null` for `null` input bytes

149

- Throws `IOException` for malformed data or schema mismatches

150

- Use try-catch blocks when deserializing in user functions

151

152

## Integration with Flink Type System

153

154

The deserialization schema automatically provides type information to Flink:

155

156

```java

157

// Type information is automatically inferred

158

TypeInformation<User> typeInfo = userDeserializer.getProducedType();

159

160

// For generic records, includes schema information

161

TypeInformation<GenericRecord> genericTypeInfo = genericDeserializer.getProducedType();

162

```

163

164

## Performance Considerations

165

166

- **Reuse Objects**: The schemas handle object reuse internally for better performance

167

- **Schema Caching**: Schemas are parsed once and cached for reuse

168

- **Thread Safety**: Schemas are thread-safe and can be shared across operators

169

- **Memory Management**: Binary encoding is more memory-efficient than JSON for large datasets