Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
Bulk formats provide batch-oriented reading interfaces optimized for columnar formats like ORC, Parquet, and other high-performance file formats.
Core interface for implementing batch-oriented file reading with optimized performance.
/**
* The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
* are formats like ORC or Parquet.
*
* The outer 'BulkFormat' class acts mainly as a configuration holder and factory for the
* reader. The actual reading is done by the Reader, which is created in the
* createReader method. If a bulk reader is created based on a checkpoint during checkpointed
* streaming execution, then the reader is re-created in the restoreReader method.
*/
@PublicEvolving
public interface BulkFormat<T, SplitT extends FileSourceSplit>
extends Serializable, ResultTypeQueryable<T> {
/**
* Creates a new reader that reads from the split's path starting
* at the split's offset and reads length bytes after the offset.
*/
BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
/**
* Creates a new reader that reads from split.path() starting at offset and
* reads until length bytes after the offset. A number of recordsToSkip records
* should be read and discarded after the offset. This is typically part of restoring a reader
* to a checkpointed position.
*/
BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
/**
* Checks whether this format is splittable. Splittable formats allow Flink to create multiple
* splits per file, so that Flink can read multiple regions of the file concurrently.
*/
boolean isSplittable();
/**
* Gets the type produced by this format. This type will be the type produced by the file source
* as a whole.
*/
@Override
TypeInformation<T> getProducedType();
}Nested interface for reading batches of records with efficient iteration.
/**
* The actual reader that reads the batches of records.
*/
interface Reader<T> extends Closeable {
/**
* Reads one batch. The method should return null when reaching the end of the input. The
* returned batch will be handed over to the processing threads as one.
*
* The returned iterator object and any contained objects may be held onto by the file
* source for some time, so it should not be immediately reused by the reader.
*
* To implement reuse and to save object allocation, consider using a Pool and recycle objects
* into the Pool in the the RecordIterator.releaseBatch() method.
*/
@Nullable
RecordIterator<T> readBatch() throws IOException;
/**
* Closes the reader and should release all resources.
*/
@Override
void close() throws IOException;
}Iterator interface for efficiently processing batches of records.
/**
* An iterator over records with their position in the file. The iterator is closeable to
* support clean resource release and recycling.
*
* @param <T> The type of the record.
*/
interface RecordIterator<T> {
/**
* Gets the next record from the file, together with its position.
*
* The position information returned with the record point to the record AFTER the
* returned record, because it defines the point where the reading should resume once the
* current record is emitted. The position information is put in the source's state when the
* record is emitted.
*
* Objects returned by this method may be reused by the iterator. By the time that this
* method is called again, no object returned from the previous call will be referenced any
* more. That makes it possible to have a single MutableRecordAndPosition object and
* return the same instance (with updated record and position) on every call.
*/
@Nullable
RecordAndPosition<T> next();
/**
* Releases the batch that this iterator iterated over. This is not supposed to close the
* reader and its resources, but is simply a signal that this iterator is no used any more.
* This method can be used as a hook to recycle/reuse heavyweight object structures.
*/
void releaseBatch();
}Usage Examples:
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
// Example usage with a hypothetical Parquet bulk format
BulkFormat<RowData, FileSourceSplit> parquetFormat = ParquetBulkFormat.builder()
.setSchema(schema)
.setProjection(projection)
.build();
FileSource<RowData> parquetSource = FileSource
.forBulkFileFormat(parquetFormat, new Path("/data/parquet"))
.build();
// Use in DataStream API
DataStream<RowData> rows = env.fromSource(parquetSource, WatermarkStrategy.noWatermarks(), "parquet-source");Built-in implementations for common record iteration patterns.
/**
* Record iterator wrapping an array of records
*/
public class ArrayResultIterator<T> implements BulkFormat.RecordIterator<T> {
/**
* Creates iterator for array of records
* @param records Array of records to iterate over
*/
public ArrayResultIterator(T[] records);
public T next();
public boolean hasNext();
public void releaseBatch();
}
/**
* Record iterator wrapping another iterator
*/
public class IteratorResultIterator<T> implements BulkFormat.RecordIterator<T> {
/**
* Creates iterator wrapping another iterator
* @param iterator Iterator to wrap
*/
public IteratorResultIterator(Iterator<T> iterator);
public T next();
public boolean hasNext();
public void releaseBatch();
}
/**
* Record iterator for single records
*/
public class SingletonResultIterator<T> implements BulkFormat.RecordIterator<T> {
/**
* Creates iterator for single record
* @param record Single record to return
*/
public SingletonResultIterator(T record);
public T next();
public boolean hasNext();
public void releaseBatch();
}Example of implementing a custom bulk format for efficient batch processing.
/**
* Example custom bulk format for reading JSON records in batches
*/
public class JsonBulkFormat implements BulkFormat<JsonNode, FileSourceSplit> {
private final int batchSize;
private final ObjectMapper mapper;
public JsonBulkFormat(int batchSize) {
this.batchSize = batchSize;
this.mapper = new ObjectMapper();
}
@Override
public Reader<JsonNode> createReader(Configuration config, FileSourceSplit split)
throws IOException {
FSDataInputStream stream = FileSystem.get(split.path().toUri())
.open(split.path(), 4096);
stream.seek(split.offset());
return new JsonBulkReader(stream, split.length(), batchSize, mapper);
}
@Override
public Reader<JsonNode> restoreReader(Configuration config, FileSourceSplit split)
throws IOException {
// For simplicity, restart from beginning of split
return createReader(config, split);
}
@Override
public TypeInformation<JsonNode> getProducedType() {
return TypeInformation.of(JsonNode.class);
}
private static class JsonBulkReader implements BulkFormat.Reader<JsonNode> {
private final BufferedReader reader;
private final long splitLength;
private final int batchSize;
private final ObjectMapper mapper;
private long bytesRead = 0;
public JsonBulkReader(FSDataInputStream stream, long splitLength,
int batchSize, ObjectMapper mapper) {
this.reader = new BufferedReader(new InputStreamReader(stream));
this.splitLength = splitLength;
this.batchSize = batchSize;
this.mapper = mapper;
}
@Override
public BulkFormat.RecordIterator<JsonNode> readBatch() throws IOException {
if (bytesRead >= splitLength) {
return null;
}
List<JsonNode> batch = new ArrayList<>(batchSize);
String line;
int count = 0;
while (count < batchSize && (line = reader.readLine()) != null) {
if (bytesRead >= splitLength) break;
bytesRead += line.getBytes().length + 1;
JsonNode node = mapper.readTree(line);
batch.add(node);
count++;
}
return batch.isEmpty() ? null : new ArrayResultIterator<>(batch.toArray(new JsonNode[0]));
}
}
}Bulk formats can be integrated with Flink's Table API for structured data processing.
/**
* Adapter for using bulk formats with file info extraction
*/
public class FileInfoExtractorBulkFormat<T> implements BulkFormat<RowData, FileSourceSplit> {
/**
* Creates bulk format that extracts file metadata along with records
* @param wrappedFormat The underlying bulk format
* @param metadataColumns File metadata columns to extract
*/
public FileInfoExtractorBulkFormat(
BulkFormat<T, FileSourceSplit> wrappedFormat,
String[] metadataColumns);
@Override
public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
throws IOException;
@Override
public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
throws IOException;
}
/**
* Bulk format with column projection support
*/
public class ProjectingBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
/**
* Creates bulk format with column projection
* @param wrappedFormat The underlying bulk format
* @param projectedFields Fields to include in output
*/
public ProjectingBulkFormat(
BulkFormat<T, FileSourceSplit> wrappedFormat,
int[] projectedFields);
}
/**
* Bulk format with record limit support
*/
public class LimitableBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
/**
* Creates bulk format with record limit
* @param wrappedFormat The underlying bulk format
* @param limit Maximum number of records to read
*/
public LimitableBulkFormat(BulkFormat<T, FileSourceSplit> wrappedFormat, long limit);
}Advanced Usage Examples:
// Bulk format with projection for columnar formats
int[] projectedColumns = {0, 2, 4}; // Only read columns 0, 2, and 4
BulkFormat<RowData, FileSourceSplit> projectedFormat = new ProjectingBulkFormat<>(
originalFormat, projectedColumns);
// Bulk format with file metadata extraction
String[] metadataColumns = {"file.path", "file.size", "file.modification-time"};
BulkFormat<RowData, FileSourceSplit> metadataFormat = new FileInfoExtractorBulkFormat<>(
originalFormat, metadataColumns);
// Limited bulk format for sampling
BulkFormat<RowData, FileSourceSplit> limitedFormat = new LimitableBulkFormat<>(
originalFormat, 1000); // Only read first 1000 records
FileSource<RowData> advancedSource = FileSource
.forBulkFileFormat(limitedFormat, new Path("/data/samples"))
.build();Bulk formats handle various error conditions during batch reading:
try {
BulkFormat.Reader<JsonNode> reader = format.createReader(config, split);
BulkFormat.RecordIterator<JsonNode> batch;
while ((batch = reader.readBatch()) != null) {
try {
while (batch.hasNext()) {
JsonNode record = batch.next();
// Process record
}
} finally {
batch.releaseBatch(); // Always release batch resources
}
}
} catch (IOException e) {
// Handle read errors
} catch (OutOfMemoryError e) {
// Handle memory issues - consider reducing batch size
}releaseBatch() to prevent memory leaksInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files