0
# Flink Avro
1
2
Apache Flink Avro format support library providing comprehensive serialization and deserialization capabilities for Apache Avro data format within Flink streaming and batch processing applications. It enables developers to work with Avro data using both generic and specific record types, with support for schema evolution, logical types, and integration with Confluent Schema Registry.
3
4
## Package Information
5
6
- **Package Name**: flink-avro
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: `org.apache.flink:flink-avro:2.1.0`
10
11
## Core Imports
12
13
```java
14
import org.apache.flink.formats.avro.AvroSerializationSchema;
15
import org.apache.flink.formats.avro.AvroDeserializationSchema;
16
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
17
import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
18
import org.apache.flink.formats.avro.AvroInputFormat;
19
import org.apache.flink.formats.avro.AvroOutputFormat;
20
import org.apache.flink.formats.avro.AvroWriters;
21
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
22
```
23
24
## Basic Usage
25
26
```java
27
import org.apache.flink.formats.avro.AvroSerializationSchema;
28
import org.apache.flink.formats.avro.AvroDeserializationSchema;
29
import org.apache.avro.Schema;
30
import org.apache.avro.generic.GenericRecord;
31
32
// For generic records
33
Schema schema = new Schema.Parser().parse(schemaString);
34
AvroSerializationSchema<GenericRecord> serializer = AvroSerializationSchema.forGeneric(schema);
35
AvroDeserializationSchema<GenericRecord> deserializer = AvroDeserializationSchema.forGeneric(schema);
36
37
// For specific records (generated from schema)
38
AvroSerializationSchema<User> userSerializer = AvroSerializationSchema.forSpecific(User.class);
39
AvroDeserializationSchema<User> userDeserializer = AvroDeserializationSchema.forSpecific(User.class);
40
41
// Use in DataStream
42
DataStream<GenericRecord> stream = ...;
43
stream.map(serializer::serialize);
44
```
45
46
## Architecture
47
48
Flink Avro is organized around several key components:
49
50
- **Serialization Schemas**: Convert Java objects to Avro bytes for streaming operations
51
- **File I/O Formats**: Read and write Avro files in batch processing scenarios
52
- **Type System Integration**: Seamless integration with Flink's type system for both generic and specific records
53
- **Table API Support**: Row-based serialization for SQL and Table API integration
54
- **Bulk Writers**: Efficient file writing with compression and schema support
55
- **Schema Conversion**: Bidirectional conversion between Flink and Avro type systems
56
57
## Capabilities
58
59
### Serialization and Deserialization
60
61
Core serialization and deserialization schemas for converting between Java objects and Avro binary/JSON format in streaming applications.
62
63
```java { .api }
64
public class AvroSerializationSchema<T> implements SerializationSchema<T> {
65
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass);
66
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema);
67
public static <T extends SpecificRecord> AvroSerializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
68
public static AvroSerializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
69
public byte[] serialize(T object);
70
public Schema getSchema();
71
public void open(InitializationContext context) throws Exception;
72
}
73
74
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
75
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema);
76
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, AvroEncoding encoding);
77
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass);
78
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass, AvroEncoding encoding);
79
public T deserialize(byte[] message) throws IOException;
80
public TypeInformation<T> getProducedType();
81
public boolean isEndOfStream(T nextElement);
82
}
83
```
84
85
[Serialization and Deserialization](./serialization-deserialization.md)
86
87
### Table API Integration
88
89
Row-based serialization and deserialization for integration with Flink's Table API and SQL layer.
90
91
```java { .api }
92
public class AvroRowDataSerializationSchema implements SerializationSchema<RowData> {
93
public AvroRowDataSerializationSchema(RowType rowType);
94
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding);
95
public AvroRowDataSerializationSchema(RowType rowType, AvroEncoding encoding, boolean legacyTimestampMapping);
96
public AvroRowDataSerializationSchema(RowType rowType, SerializationSchema<GenericRecord> nestedSchema, RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter);
97
public byte[] serialize(RowData row);
98
}
99
100
public class AvroRowDataDeserializationSchema implements DeserializationSchema<RowData> {
101
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo);
102
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding);
103
public AvroRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> producedTypeInfo, AvroEncoding encoding, boolean legacyTimestampMapping);
104
public AvroRowDataDeserializationSchema(DeserializationSchema<GenericRecord> nestedSchema, AvroToRowDataConverters.AvroToRowDataConverter runtimeConverter, TypeInformation<RowData> typeInfo);
105
public void open(InitializationContext context) throws Exception;
106
public RowData deserialize(byte[] message) throws IOException;
107
public TypeInformation<RowData> getProducedType();
108
}
109
```
110
111
[Table API Integration](./table-api-integration.md)
112
113
### File I/O Operations
114
115
Input and output formats for reading and writing Avro files in batch processing scenarios.
116
117
```java { .api }
118
public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
119
public AvroInputFormat(Path filePath, Class<E> type);
120
public void setReuseAvroValue(boolean reuseAvroValue);
121
public void setUnsplittable(boolean unsplittable);
122
public TypeInformation<E> getProducedType();
123
}
124
125
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
126
public AvroOutputFormat(Path filePath, Class<E> type);
127
public AvroOutputFormat(Class<E> type);
128
public void setSchema(Schema schema);
129
public void setCodec(Codec codec);
130
}
131
```
132
133
[File I/O Operations](./file-io-operations.md)
134
135
### Bulk Writers
136
137
Factory and writer classes for efficient bulk writing of Avro files with various record types and compression options.
138
139
```java { .api }
140
public class AvroWriters {
141
public static <T extends SpecificRecordBase> AvroWriterFactory<T> forSpecificRecord(Class<T> type);
142
public static AvroWriterFactory<GenericRecord> forGenericRecord(Schema schema);
143
public static <T> AvroWriterFactory<T> forReflectRecord(Class<T> type);
144
}
145
146
public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {
147
public AvroWriterFactory(AvroBuilder<T> builder);
148
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
149
}
150
151
public interface AvroBuilder<T> extends Serializable {
152
DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;
153
}
154
155
public class AvroBulkWriter<T> implements BulkWriter<T> {
156
public void addElement(T element) throws IOException;
157
public void flush() throws IOException;
158
public void finish() throws IOException;
159
}
160
```
161
162
[Bulk Writers](./bulk-writers.md)
163
164
### Type System Integration
165
166
Type information classes and utilities for seamless integration with Flink's type system.
167
168
```java { .api }
169
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
170
public AvroTypeInfo(Class<T> typeClass);
171
public TypeSerializer<T> createSerializer(SerializerConfig config);
172
}
173
174
public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
175
public GenericRecordAvroTypeInfo(Schema schema);
176
}
177
178
public class AvroSchemaConverter {
179
public static Schema convertToSchema(RowType rowType);
180
public static Schema convertToSchema(RowType rowType, boolean legacyTimestampMapping);
181
public static LogicalType convertToLogicalType(Schema schema);
182
}
183
```
184
185
[Type System Integration](./type-system-integration.md)
186
187
### Schema Registry Integration
188
189
Extended serialization and deserialization schemas with schema registry support using SchemaCoder.
190
191
```java { .api }
192
public class RegistryAvroSerializationSchema<T> extends AvroSerializationSchema<T> {
193
public RegistryAvroSerializationSchema(Class<T> recordClazz, Schema schema, SchemaCoder.SchemaCoderProvider schemaCoderProvider);
194
public RegistryAvroSerializationSchema(Class<T> recordClazz, Schema schema, SchemaCoder.SchemaCoderProvider schemaCoderProvider, AvroEncoding encoding);
195
}
196
197
public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
198
public RegistryAvroDeserializationSchema(Class<T> recordClazz, Schema reader, SchemaCoder.SchemaCoderProvider schemaCoderProvider);
199
public RegistryAvroDeserializationSchema(Class<T> recordClazz, Schema reader, SchemaCoder.SchemaCoderProvider schemaCoderProvider, AvroEncoding encoding);
200
}
201
```
202
203
[Schema Registry Integration](./schema-registry-integration.md)
204
205
## Configuration Options
206
207
```java { .api }
208
public class AvroFormatOptions {
209
public static final ConfigOption<String> AVRO_OUTPUT_CODEC;
210
public static final ConfigOption<AvroEncoding> AVRO_ENCODING;
211
public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING;
212
213
public enum AvroEncoding implements DescribedEnum {
214
BINARY("binary", "Use binary encoding for serialization and deserialization."),
215
JSON("json", "Use JSON encoding for serialization and deserialization.");
216
217
String toString();
218
InlineElement getDescription();
219
}
220
}
221
```
222
223
## Types
224
225
### Core Interfaces
226
227
```java { .api }
228
public interface SchemaCoder {
229
Schema readSchema(InputStream in) throws IOException;
230
void writeSchema(Schema schema, OutputStream out) throws IOException;
231
232
interface SchemaCoderProvider extends Serializable {
233
SchemaCoder get();
234
}
235
}
236
```
237
238
### Conversion Interfaces
239
240
```java { .api }
241
public interface AvroToRowDataConverter extends Serializable {
242
Object convert(Object object);
243
}
244
245
public interface RowDataToAvroConverter extends Serializable {
246
Object convert(Schema schema, Object object);
247
}
248
```
249
250
### Codec Options
251
252
```java { .api }
253
public enum Codec {
254
NULL, SNAPPY, BZIP2, DEFLATE, XZ
255
}
256
```