0
# Avro Connectors
1
2
Apache Avro serialization support for Flink batch processing, providing efficient binary serialization with schema evolution support.
3
4
## Capabilities
5
6
### AvroInputFormat
7
8
Reads Apache Avro files into Flink DataSets with full type safety and schema support.
9
10
```java { .api }
11
/**
12
* Input format for reading Avro files in Flink batch jobs
13
* @param <E> The type of records to read
14
*/
15
public class AvroInputFormat<E> extends FileInputFormat<E>
16
implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
17
18
/**
19
* Creates an AvroInputFormat for reading Avro files
20
* @param filePath Path to the Avro file or directory
21
* @param type Class representing the record type to read
22
*/
23
public AvroInputFormat(Path filePath, Class<E> type);
24
25
/**
26
* Sets whether to reuse Avro value instances for better performance
27
* @param reuseAvroValue true to reuse instances, false to create new ones
28
*/
29
public void setReuseAvroValue(boolean reuseAvroValue);
30
31
/**
32
* Sets whether files should be read as whole (non-splittable)
33
* @param unsplittable true to read files as whole, false to allow splitting
34
*/
35
public void setUnsplittable(boolean unsplittable);
36
37
/**
38
* Returns the type information for the records produced by this format
39
* @return TypeInformation describing the output type
40
*/
41
public TypeInformation<E> getProducedType();
42
43
/**
44
* Opens the input split for reading
45
* @param split The file input split to read from
46
*/
47
public void open(FileInputSplit split) throws IOException;
48
49
/**
50
* Checks if the end of the input has been reached
51
* @return true if no more records are available
52
*/
53
public boolean reachedEnd() throws IOException;
54
55
/**
56
* Reads the next record from the input
57
* @param reuseValue Record instance to reuse (may be null)
58
* @return The next record
59
*/
60
public E nextRecord(E reuseValue) throws IOException;
61
62
/**
63
* Returns the number of records read from the current block
64
* @return Number of records read
65
*/
66
public long getRecordsReadFromBlock();
67
68
/**
69
* Gets the current checkpoint state for fault tolerance
70
* @return Checkpoint state as (position, recordsRead) tuple
71
*/
72
public Tuple2<Long, Long> getCurrentState();
73
74
/**
75
* Reopens the format with a previous checkpoint state
76
* @param split The file input split to read
77
* @param state The checkpoint state to restore
78
*/
79
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException;
80
}
81
```
82
83
**Usage Example:**
84
85
```java
86
import org.apache.flink.api.java.ExecutionEnvironment;
87
import org.apache.flink.api.java.DataSet;
88
import org.apache.flink.api.java.io.AvroInputFormat;
89
import org.apache.flink.core.fs.Path;
90
91
// Define your Avro record class
92
public class User {
93
public String name;
94
public int age;
95
public String email;
96
}
97
98
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
99
100
// Create AvroInputFormat
101
AvroInputFormat<User> avroInput = new AvroInputFormat<>(
102
new Path("hdfs://path/to/users.avro"),
103
User.class
104
);
105
106
// Configure for better performance
107
avroInput.setReuseAvroValue(true);
108
109
// Read the data
110
DataSet<User> users = env.createInput(avroInput);
111
users.print();
112
```
113
114
### AvroOutputFormat
115
116
Writes Flink DataSets to Apache Avro files with automatic schema generation or custom schema support.
117
118
```java { .api }
119
/**
120
* Output format for writing Avro files in Flink batch jobs
121
* @param <E> The type of records to write
122
*/
123
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
124
125
/**
126
* Creates an AvroOutputFormat with file path and record type
127
* @param filePath Path where the Avro file will be written
128
* @param type Class representing the record type to write
129
*/
130
public AvroOutputFormat(Path filePath, Class<E> type);
131
132
/**
133
* Creates an AvroOutputFormat with only record type (path set later)
134
* @param type Class representing the record type to write
135
*/
136
public AvroOutputFormat(Class<E> type);
137
138
/**
139
* Sets a custom Avro schema to use for writing
140
* @param schema The Avro schema to use
141
*/
142
public void setSchema(Schema schema);
143
144
/**
145
* Writes a record to the output
146
* @param record The record to write
147
*/
148
public void writeRecord(E record) throws IOException;
149
150
/**
151
* Opens the output format for writing
152
* @param taskNumber The number of the parallel task
153
* @param numTasks The total number of parallel tasks
154
*/
155
public void open(int taskNumber, int numTasks) throws IOException;
156
157
/**
158
* Closes the output format and flushes any remaining data
159
*/
160
public void close() throws IOException;
161
}
162
```
163
164
**Usage Example:**
165
166
```java
167
import org.apache.flink.api.java.io.AvroOutputFormat;
168
import org.apache.avro.Schema;
169
import org.apache.avro.SchemaBuilder;
170
171
// Create output format
172
AvroOutputFormat<User> avroOutput = new AvroOutputFormat<>(
173
new Path("hdfs://path/to/output.avro"),
174
User.class
175
);
176
177
// Optional: Set custom schema
178
Schema customSchema = SchemaBuilder.record("User")
179
.fields()
180
.name("name").type().stringType().noDefault()
181
.name("age").type().intType().noDefault()
182
.name("email").type().stringType().noDefault()
183
.endRecord();
184
avroOutput.setSchema(customSchema);
185
186
// Write data
187
DataSet<User> users = // ... your data
188
users.output(avroOutput);
189
```
190
191
### DataInputDecoder
192
193
Low-level Avro decoder for reading from Java DataInput streams.
194
195
```java { .api }
196
/**
197
* Avro decoder that reads from Java DataInput streams
198
*/
199
public class DataInputDecoder extends org.apache.avro.io.Decoder {
200
201
// Note: Uses package-level constructor in actual implementation
202
203
/**
204
* Sets the input data source
205
* @param in DataInput stream to read from
206
*/
207
public void setIn(DataInput in);
208
209
/**
210
* Reads null value
211
*/
212
public void readNull() throws IOException;
213
214
/**
215
* Reads a boolean value
216
* @return The boolean value
217
*/
218
public boolean readBoolean() throws IOException;
219
220
/**
221
* Reads an integer value
222
* @return The integer value
223
*/
224
public int readInt() throws IOException;
225
226
/**
227
* Reads a long value
228
* @return The long value
229
*/
230
public long readLong() throws IOException;
231
232
/**
233
* Reads a float value
234
* @return The float value
235
*/
236
public float readFloat() throws IOException;
237
238
/**
239
* Reads a double value
240
* @return The double value
241
*/
242
public double readDouble() throws IOException;
243
244
/**
245
* Reads variable-length bytes
246
* @param old ByteBuffer to reuse (may be null)
247
* @return ByteBuffer containing the bytes
248
*/
249
public ByteBuffer readBytes(ByteBuffer old) throws IOException;
250
251
/**
252
* Reads an enum value
253
* @return The enum ordinal
254
*/
255
public int readEnum() throws IOException;
256
257
/**
258
* Reads fixed-length bytes
259
* @param bytes Destination byte array
260
* @param start Starting offset in the array
261
* @param length Number of bytes to read
262
*/
263
public void readFixed(byte[] bytes, int start, int length) throws IOException;
264
265
/**
266
* Skips fixed-length bytes
267
* @param length Number of bytes to skip
268
*/
269
public void skipFixed(int length) throws IOException;
270
271
/**
272
* Skips variable-length bytes
273
*/
274
public void skipBytes() throws IOException;
275
276
/**
277
* Reads UTF-8 string
278
* @param old Utf8 object to reuse (may be null)
279
* @return UTF-8 string
280
*/
281
public Utf8 readString(Utf8 old) throws IOException;
282
283
/**
284
* Reads a string value
285
* @return The string value
286
*/
287
public String readString() throws IOException;
288
289
/**
290
* Skips string value
291
*/
292
public void skipString() throws IOException;
293
294
/**
295
* Starts reading an array
296
* @return Number of elements in the array
297
*/
298
public long readArrayStart() throws IOException;
299
300
/**
301
* Reads next array element count
302
* @return Number of remaining elements
303
*/
304
public long arrayNext() throws IOException;
305
306
/**
307
* Skips entire array
308
* @return Number of elements skipped
309
*/
310
public long skipArray() throws IOException;
311
312
/**
313
* Starts reading a map
314
* @return Number of entries in the map
315
*/
316
public long readMapStart() throws IOException;
317
318
/**
319
* Reads next map element count
320
* @return Number of remaining entries
321
*/
322
public long mapNext() throws IOException;
323
324
/**
325
* Skips entire map
326
* @return Number of entries skipped
327
*/
328
public long skipMap() throws IOException;
329
330
/**
331
* Reads union index
332
* @return Union branch index
333
*/
334
public int readIndex() throws IOException;
335
336
/**
337
* Utility method for reading variable-length long count
338
* @param in DataInput stream to read from
339
* @return Variable-length long value
340
*/
341
public static long readVarLongCount(DataInput in) throws IOException;
342
}
343
```
344
345
### DataOutputEncoder
346
347
Low-level Avro encoder for writing to Java DataOutput streams.
348
349
```java { .api }
350
/**
351
* Avro encoder that writes to Java DataOutput streams
352
*/
353
public class DataOutputEncoder extends org.apache.avro.io.Encoder implements Serializable {
354
355
// Note: Uses package-level constructor in actual implementation
356
357
/**
358
* Sets the output data destination
359
* @param out DataOutput stream to write to
360
*/
361
public void setOut(DataOutput out);
362
363
/**
364
* Writes a boolean value
365
* @param b The boolean value to write
366
*/
367
public void writeBoolean(boolean b) throws IOException;
368
369
/**
370
* Writes an integer value
371
* @param n The integer value to write
372
*/
373
public void writeInt(int n) throws IOException;
374
375
/**
376
* Writes a long value
377
* @param n The long value to write
378
*/
379
public void writeLong(long n) throws IOException;
380
381
/**
382
* Flushes the output (no-op implementation)
383
*/
384
public void flush() throws IOException;
385
386
/**
387
* Writes null value
388
*/
389
public void writeNull() throws IOException;
390
391
/**
392
* Writes a float value
393
* @param f The float value to write
394
*/
395
public void writeFloat(float f) throws IOException;
396
397
/**
398
* Writes a double value
399
* @param d The double value to write
400
*/
401
public void writeDouble(double d) throws IOException;
402
403
/**
404
* Writes an enum value
405
* @param e The enum ordinal to write
406
*/
407
public void writeEnum(int e) throws IOException;
408
409
/**
410
* Writes fixed-length bytes
411
* @param bytes Byte array containing data
412
* @param start Starting offset in the array
413
* @param len Number of bytes to write
414
*/
415
public void writeFixed(byte[] bytes, int start, int len) throws IOException;
416
417
/**
418
* Writes variable-length bytes from ByteBuffer
419
* @param bytes ByteBuffer containing data
420
*/
421
public void writeBytes(ByteBuffer bytes) throws IOException;
422
423
/**
424
* Writes UTF-8 string
425
* @param utf8 The UTF-8 string to write
426
*/
427
public void writeString(Utf8 utf8) throws IOException;
428
429
/**
430
* Writes a string value
431
* @param str The string to write
432
*/
433
public void writeString(String str) throws IOException;
434
435
/**
436
* Writes variable-length bytes
437
* @param bytes Byte array containing data
438
* @param start Starting offset in the array
439
* @param len Number of bytes to write
440
*/
441
public void writeBytes(byte[] bytes, int start, int len) throws IOException;
442
443
/**
444
* Starts writing an array
445
*/
446
public void writeArrayStart() throws IOException;
447
448
/**
449
* Sets the item count for arrays and maps
450
* @param itemCount Number of items to write
451
*/
452
public void setItemCount(long itemCount) throws IOException;
453
454
/**
455
* Starts writing an individual item
456
*/
457
public void startItem() throws IOException;
458
459
/**
460
* Ends writing an array
461
*/
462
public void writeArrayEnd() throws IOException;
463
464
/**
465
* Starts writing a map
466
*/
467
public void writeMapStart() throws IOException;
468
469
/**
470
* Ends writing a map
471
*/
472
public void writeMapEnd() throws IOException;
473
474
/**
475
* Writes union index
476
* @param unionIndex The union branch index
477
*/
478
public void writeIndex(int unionIndex) throws IOException;
479
480
/**
481
* Utility method for writing variable-length long count
482
* @param out DataOutput stream to write to
483
* @param val Variable-length long value to write
484
*/
485
public static void writeVarLongCount(DataOutput out, long val) throws IOException;
486
}
487
```
488
489
### FSDataInputStreamWrapper
490
491
Wrapper to make Flink's FSDataInputStream compatible with Avro's SeekableInput interface.
492
493
```java { .api }
494
/**
495
* Wrapper for Flink FSDataInputStream to make it compatible with Avro SeekableInput
496
*/
497
public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
498
499
/**
500
* Creates a wrapper around a Flink FSDataInputStream
501
* @param stream The FSDataInputStream to wrap
502
* @param len Length of the stream in bytes
503
*/
504
public FSDataInputStreamWrapper(FSDataInputStream stream, long len);
505
506
/**
507
* Returns the length of the stream
508
* @return Stream length in bytes
509
*/
510
public long length();
511
512
/**
513
* Reads data into a byte array
514
* @param b Destination byte array
515
* @param off Offset in the destination array
516
* @param len Maximum number of bytes to read
517
* @return Number of bytes actually read
518
*/
519
public int read(byte[] b, int off, int len) throws IOException;
520
521
/**
522
* Seeks to a specific position in the stream
523
* @param p Position to seek to
524
*/
525
public void seek(long p) throws IOException;
526
527
/**
528
* Returns the current position in the stream
529
* @return Current position
530
*/
531
public long tell() throws IOException;
532
533
/**
534
* Closes the underlying stream
535
*/
536
public void close() throws IOException;
537
}
538
```
539
540
## Common Types
541
542
```java { .api }
543
// Avro types
544
import org.apache.avro.Schema;
545
import org.apache.avro.io.Decoder;
546
import org.apache.avro.io.Encoder;
547
import org.apache.avro.file.SeekableInput;
548
import org.apache.avro.util.Utf8;
549
550
// Flink types
551
import org.apache.flink.api.common.io.FileInputFormat;
552
import org.apache.flink.api.common.io.FileOutputFormat;
553
import org.apache.flink.api.common.io.CheckpointableInputFormat;
554
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
555
import org.apache.flink.api.common.typeinfo.TypeInformation;
556
import org.apache.flink.api.java.tuple.Tuple2;
557
import org.apache.flink.core.io.InputSplit;
558
import org.apache.flink.core.fs.FSDataInputStream;
559
import org.apache.flink.core.fs.Path;
560
561
// Java types
562
import java.io.DataInput;
563
import java.io.DataOutput;
564
import java.io.Closeable;
565
import java.io.Serializable;
566
import java.io.IOException;
567
import java.nio.ByteBuffer;
568
```