Apache Flink batch processing connectors for Avro, JDBC, Hadoop, HBase, and HCatalog data sources
—
Apache Avro serialization support for Flink batch processing, providing efficient binary serialization with schema evolution support.
Reads Apache Avro files into Flink DataSets with full type safety and schema support.
/**
* Input format for reading Avro files in Flink batch jobs
* @param <E> The type of records to read
*/
public class AvroInputFormat<E> extends FileInputFormat<E>
implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
/**
* Creates an AvroInputFormat for reading Avro files
* @param filePath Path to the Avro file or directory
* @param type Class representing the record type to read
*/
public AvroInputFormat(Path filePath, Class<E> type);
/**
* Sets whether to reuse Avro value instances for better performance
* @param reuseAvroValue true to reuse instances, false to create new ones
*/
public void setReuseAvroValue(boolean reuseAvroValue);
/**
* Sets whether files should be read as whole (non-splittable)
* @param unsplittable true to read files as whole, false to allow splitting
*/
public void setUnsplittable(boolean unsplittable);
/**
* Returns the type information for the records produced by this format
* @return TypeInformation describing the output type
*/
public TypeInformation<E> getProducedType();
/**
* Opens the input split for reading
* @param split The file input split to read from
*/
public void open(FileInputSplit split) throws IOException;
/**
* Checks if the end of the input has been reached
* @return true if no more records are available
*/
public boolean reachedEnd() throws IOException;
/**
* Reads the next record from the input
* @param reuseValue Record instance to reuse (may be null)
* @return The next record
*/
public E nextRecord(E reuseValue) throws IOException;
/**
* Returns the number of records read from the current block
* @return Number of records read
*/
public long getRecordsReadFromBlock();
/**
* Gets the current checkpoint state for fault tolerance
* @return Checkpoint state as (position, recordsRead) tuple
*/
public Tuple2<Long, Long> getCurrentState();
/**
* Reopens the format with a previous checkpoint state
* @param split The file input split to read
* @param state The checkpoint state to restore
*/
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException;
}Usage Example:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.AvroInputFormat;
import org.apache.flink.core.fs.Path;
// Define your Avro record class
public class User {
public String name;
public int age;
public String email;
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create AvroInputFormat
AvroInputFormat<User> avroInput = new AvroInputFormat<>(
new Path("hdfs://path/to/users.avro"),
User.class
);
// Configure for better performance
avroInput.setReuseAvroValue(true);
// Read the data
DataSet<User> users = env.createInput(avroInput);
users.print();Writes Flink DataSets to Apache Avro files with automatic schema generation or custom schema support.
/**
* Output format for writing Avro files in Flink batch jobs
* @param <E> The type of records to write
*/
public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {
/**
* Creates an AvroOutputFormat with file path and record type
* @param filePath Path where the Avro file will be written
* @param type Class representing the record type to write
*/
public AvroOutputFormat(Path filePath, Class<E> type);
/**
* Creates an AvroOutputFormat with only record type (path set later)
* @param type Class representing the record type to write
*/
public AvroOutputFormat(Class<E> type);
/**
* Sets a custom Avro schema to use for writing
* @param schema The Avro schema to use
*/
public void setSchema(Schema schema);
/**
* Writes a record to the output
* @param record The record to write
*/
public void writeRecord(E record) throws IOException;
/**
* Opens the output format for writing
* @param taskNumber The number of the parallel task
* @param numTasks The total number of parallel tasks
*/
public void open(int taskNumber, int numTasks) throws IOException;
/**
* Closes the output format and flushes any remaining data
*/
public void close() throws IOException;
}Usage Example:
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
// Create output format
AvroOutputFormat<User> avroOutput = new AvroOutputFormat<>(
new Path("hdfs://path/to/output.avro"),
User.class
);
// Optional: Set custom schema
Schema customSchema = SchemaBuilder.record("User")
.fields()
.name("name").type().stringType().noDefault()
.name("age").type().intType().noDefault()
.name("email").type().stringType().noDefault()
.endRecord();
avroOutput.setSchema(customSchema);
// Write data
DataSet<User> users = // ... your data
users.output(avroOutput);Low-level Avro decoder for reading from Java DataInput streams.
/**
* Avro decoder that reads from Java DataInput streams
*/
public class DataInputDecoder extends org.apache.avro.io.Decoder {
// Note: Uses package-level constructor in actual implementation
/**
* Sets the input data source
* @param in DataInput stream to read from
*/
public void setIn(DataInput in);
/**
* Reads null value
*/
public void readNull() throws IOException;
/**
* Reads a boolean value
* @return The boolean value
*/
public boolean readBoolean() throws IOException;
/**
* Reads an integer value
* @return The integer value
*/
public int readInt() throws IOException;
/**
* Reads a long value
* @return The long value
*/
public long readLong() throws IOException;
/**
* Reads a float value
* @return The float value
*/
public float readFloat() throws IOException;
/**
* Reads a double value
* @return The double value
*/
public double readDouble() throws IOException;
/**
* Reads variable-length bytes
* @param old ByteBuffer to reuse (may be null)
* @return ByteBuffer containing the bytes
*/
public ByteBuffer readBytes(ByteBuffer old) throws IOException;
/**
* Reads an enum value
* @return The enum ordinal
*/
public int readEnum() throws IOException;
/**
* Reads fixed-length bytes
* @param bytes Destination byte array
* @param start Starting offset in the array
* @param length Number of bytes to read
*/
public void readFixed(byte[] bytes, int start, int length) throws IOException;
/**
* Skips fixed-length bytes
* @param length Number of bytes to skip
*/
public void skipFixed(int length) throws IOException;
/**
* Skips variable-length bytes
*/
public void skipBytes() throws IOException;
/**
* Reads UTF-8 string
* @param old Utf8 object to reuse (may be null)
* @return UTF-8 string
*/
public Utf8 readString(Utf8 old) throws IOException;
/**
* Reads a string value
* @return The string value
*/
public String readString() throws IOException;
/**
* Skips string value
*/
public void skipString() throws IOException;
/**
* Starts reading an array
* @return Number of elements in the array
*/
public long readArrayStart() throws IOException;
/**
* Reads next array element count
* @return Number of remaining elements
*/
public long arrayNext() throws IOException;
/**
* Skips entire array
* @return Number of elements skipped
*/
public long skipArray() throws IOException;
/**
* Starts reading a map
* @return Number of entries in the map
*/
public long readMapStart() throws IOException;
/**
* Reads next map element count
* @return Number of remaining entries
*/
public long mapNext() throws IOException;
/**
* Skips entire map
* @return Number of entries skipped
*/
public long skipMap() throws IOException;
/**
* Reads union index
* @return Union branch index
*/
public int readIndex() throws IOException;
/**
* Utility method for reading variable-length long count
* @param in DataInput stream to read from
* @return Variable-length long value
*/
public static long readVarLongCount(DataInput in) throws IOException;
}Low-level Avro encoder for writing to Java DataOutput streams.
/**
* Avro encoder that writes to Java DataOutput streams
*/
public class DataOutputEncoder extends org.apache.avro.io.Encoder implements Serializable {
// Note: Uses package-level constructor in actual implementation
/**
* Sets the output data destination
* @param out DataOutput stream to write to
*/
public void setOut(DataOutput out);
/**
* Writes a boolean value
* @param b The boolean value to write
*/
public void writeBoolean(boolean b) throws IOException;
/**
* Writes an integer value
* @param n The integer value to write
*/
public void writeInt(int n) throws IOException;
/**
* Writes a long value
* @param n The long value to write
*/
public void writeLong(long n) throws IOException;
/**
* Flushes the output (no-op implementation)
*/
public void flush() throws IOException;
/**
* Writes null value
*/
public void writeNull() throws IOException;
/**
* Writes a float value
* @param f The float value to write
*/
public void writeFloat(float f) throws IOException;
/**
* Writes a double value
* @param d The double value to write
*/
public void writeDouble(double d) throws IOException;
/**
* Writes an enum value
* @param e The enum ordinal to write
*/
public void writeEnum(int e) throws IOException;
/**
* Writes fixed-length bytes
* @param bytes Byte array containing data
* @param start Starting offset in the array
* @param len Number of bytes to write
*/
public void writeFixed(byte[] bytes, int start, int len) throws IOException;
/**
* Writes variable-length bytes from ByteBuffer
* @param bytes ByteBuffer containing data
*/
public void writeBytes(ByteBuffer bytes) throws IOException;
/**
* Writes UTF-8 string
* @param utf8 The UTF-8 string to write
*/
public void writeString(Utf8 utf8) throws IOException;
/**
* Writes a string value
* @param str The string to write
*/
public void writeString(String str) throws IOException;
/**
* Writes variable-length bytes
* @param bytes Byte array containing data
* @param start Starting offset in the array
* @param len Number of bytes to write
*/
public void writeBytes(byte[] bytes, int start, int len) throws IOException;
/**
* Starts writing an array
*/
public void writeArrayStart() throws IOException;
/**
* Sets the item count for arrays and maps
* @param itemCount Number of items to write
*/
public void setItemCount(long itemCount) throws IOException;
/**
* Starts writing an individual item
*/
public void startItem() throws IOException;
/**
* Ends writing an array
*/
public void writeArrayEnd() throws IOException;
/**
* Starts writing a map
*/
public void writeMapStart() throws IOException;
/**
* Ends writing a map
*/
public void writeMapEnd() throws IOException;
/**
* Writes union index
* @param unionIndex The union branch index
*/
public void writeIndex(int unionIndex) throws IOException;
/**
* Utility method for writing variable-length long count
* @param out DataOutput stream to write to
* @param val Variable-length long value to write
*/
public static void writeVarLongCount(DataOutput out, long val) throws IOException;
}Wrapper to make Flink's FSDataInputStream compatible with Avro's SeekableInput interface.
/**
* Wrapper for Flink FSDataInputStream to make it compatible with Avro SeekableInput
*/
public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
/**
* Creates a wrapper around a Flink FSDataInputStream
* @param stream The FSDataInputStream to wrap
* @param len Length of the stream in bytes
*/
public FSDataInputStreamWrapper(FSDataInputStream stream, long len);
/**
* Returns the length of the stream
* @return Stream length in bytes
*/
public long length();
/**
* Reads data into a byte array
* @param b Destination byte array
* @param off Offset in the destination array
* @param len Maximum number of bytes to read
* @return Number of bytes actually read
*/
public int read(byte[] b, int off, int len) throws IOException;
/**
* Seeks to a specific position in the stream
* @param p Position to seek to
*/
public void seek(long p) throws IOException;
/**
* Returns the current position in the stream
* @return Current position
*/
public long tell() throws IOException;
/**
* Closes the underlying stream
*/
public void close() throws IOException;
}// Avro types
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.util.Utf8;
// Flink types
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
// Java types
import java.io.DataInput;
import java.io.DataOutput;
import java.io.Closeable;
import java.io.Serializable;
import java.io.IOException;
import java.nio.ByteBuffer;