CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Pending
Overview
Eval results
Files

bulk-formats.mddocs/

Bulk Formats

Bulk formats provide batch-oriented reading interfaces optimized for columnar formats like ORC, Parquet, and other high-performance file formats.

Capabilities

BulkFormat Interface

Core interface for implementing batch-oriented file reading with optimized performance.

/**
 * The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats
 * are formats like ORC or Parquet.
 * 
 * The outer 'BulkFormat' class acts mainly as a configuration holder and factory for the
 * reader. The actual reading is done by the Reader, which is created in the
 * createReader method. If a bulk reader is created based on a checkpoint during checkpointed 
 * streaming execution, then the reader is re-created in the restoreReader method.
 */
@PublicEvolving
public interface BulkFormat<T, SplitT extends FileSourceSplit>
        extends Serializable, ResultTypeQueryable<T> {
    
    /**
     * Creates a new reader that reads from the split's path starting
     * at the split's offset and reads length bytes after the offset.
     */
    BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
    
    /**
     * Creates a new reader that reads from split.path() starting at offset and
     * reads until length bytes after the offset. A number of recordsToSkip records
     * should be read and discarded after the offset. This is typically part of restoring a reader
     * to a checkpointed position.
     */
    BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
    
    /**
     * Checks whether this format is splittable. Splittable formats allow Flink to create multiple
     * splits per file, so that Flink can read multiple regions of the file concurrently.
     */
    boolean isSplittable();
    
    /**
     * Gets the type produced by this format. This type will be the type produced by the file source
     * as a whole.
     */
    @Override
    TypeInformation<T> getProducedType();
}

BulkFormat.Reader Interface

Nested interface for reading batches of records with efficient iteration.

/**
 * The actual reader that reads the batches of records.
 */
interface Reader<T> extends Closeable {

    /**
     * Reads one batch. The method should return null when reaching the end of the input. The
     * returned batch will be handed over to the processing threads as one.
     * 
     * The returned iterator object and any contained objects may be held onto by the file
     * source for some time, so it should not be immediately reused by the reader.
     * 
     * To implement reuse and to save object allocation, consider using a Pool and recycle objects 
     * into the Pool in the the RecordIterator.releaseBatch() method.
     */
    @Nullable
    RecordIterator<T> readBatch() throws IOException;

    /**
     * Closes the reader and should release all resources.
     */
    @Override
    void close() throws IOException;
}

BulkFormat.RecordIterator Interface

Iterator interface for efficiently processing batches of records.

/**
 * An iterator over records with their position in the file. The iterator is closeable to
 * support clean resource release and recycling.
 * 
 * @param <T> The type of the record.
 */
interface RecordIterator<T> {

    /**
     * Gets the next record from the file, together with its position.
     * 
     * The position information returned with the record point to the record AFTER the
     * returned record, because it defines the point where the reading should resume once the
     * current record is emitted. The position information is put in the source's state when the
     * record is emitted.
     * 
     * Objects returned by this method may be reused by the iterator. By the time that this
     * method is called again, no object returned from the previous call will be referenced any
     * more. That makes it possible to have a single MutableRecordAndPosition object and
     * return the same instance (with updated record and position) on every call.
     */
    @Nullable
    RecordAndPosition<T> next();

    /**
     * Releases the batch that this iterator iterated over. This is not supposed to close the
     * reader and its resources, but is simply a signal that this iterator is no used any more.
     * This method can be used as a hook to recycle/reuse heavyweight object structures.
     */
    void releaseBatch();
}

Usage Examples:

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;

// Example usage with a hypothetical Parquet bulk format
BulkFormat<RowData, FileSourceSplit> parquetFormat = ParquetBulkFormat.builder()
    .setSchema(schema)
    .setProjection(projection)
    .build();

FileSource<RowData> parquetSource = FileSource
    .forBulkFileFormat(parquetFormat, new Path("/data/parquet"))
    .build();

// Use in DataStream API  
DataStream<RowData> rows = env.fromSource(parquetSource, WatermarkStrategy.noWatermarks(), "parquet-source");

Utility Record Iterators

Built-in implementations for common record iteration patterns.

/**
 * Record iterator wrapping an array of records
 */
public class ArrayResultIterator<T> implements BulkFormat.RecordIterator<T> {
    /**
     * Creates iterator for array of records
     * @param records Array of records to iterate over
     */
    public ArrayResultIterator(T[] records);
    
    public T next();
    public boolean hasNext();
    public void releaseBatch();
}

/**
 * Record iterator wrapping another iterator
 */  
public class IteratorResultIterator<T> implements BulkFormat.RecordIterator<T> {
    /**
     * Creates iterator wrapping another iterator
     * @param iterator Iterator to wrap
     */
    public IteratorResultIterator(Iterator<T> iterator);
    
    public T next();
    public boolean hasNext(); 
    public void releaseBatch();
}

/**
 * Record iterator for single records
 */
public class SingletonResultIterator<T> implements BulkFormat.RecordIterator<T> {
    /**
     * Creates iterator for single record
     * @param record Single record to return
     */
    public SingletonResultIterator(T record);
    
    public T next();
    public boolean hasNext();
    public void releaseBatch();
}

Custom BulkFormat Implementation

Example of implementing a custom bulk format for efficient batch processing.

/**
 * Example custom bulk format for reading JSON records in batches
 */
public class JsonBulkFormat implements BulkFormat<JsonNode, FileSourceSplit> {
    private final int batchSize;
    private final ObjectMapper mapper;
    
    public JsonBulkFormat(int batchSize) {
        this.batchSize = batchSize;
        this.mapper = new ObjectMapper();
    }
    
    @Override
    public Reader<JsonNode> createReader(Configuration config, FileSourceSplit split) 
            throws IOException {
        FSDataInputStream stream = FileSystem.get(split.path().toUri())
            .open(split.path(), 4096);
        stream.seek(split.offset());
        return new JsonBulkReader(stream, split.length(), batchSize, mapper);
    }
    
    @Override
    public Reader<JsonNode> restoreReader(Configuration config, FileSourceSplit split) 
            throws IOException {
        // For simplicity, restart from beginning of split
        return createReader(config, split);
    }
    
    @Override
    public TypeInformation<JsonNode> getProducedType() {
        return TypeInformation.of(JsonNode.class);
    }
    
    private static class JsonBulkReader implements BulkFormat.Reader<JsonNode> {
        private final BufferedReader reader;
        private final long splitLength;
        private final int batchSize;
        private final ObjectMapper mapper;
        private long bytesRead = 0;
        
        public JsonBulkReader(FSDataInputStream stream, long splitLength, 
                             int batchSize, ObjectMapper mapper) {
            this.reader = new BufferedReader(new InputStreamReader(stream));
            this.splitLength = splitLength;
            this.batchSize = batchSize;
            this.mapper = mapper;
        }
        
        @Override
        public BulkFormat.RecordIterator<JsonNode> readBatch() throws IOException {
            if (bytesRead >= splitLength) {
                return null;
            }
            
            List<JsonNode> batch = new ArrayList<>(batchSize);
            String line;
            int count = 0;
            
            while (count < batchSize && (line = reader.readLine()) != null) {
                if (bytesRead >= splitLength) break;
                
                bytesRead += line.getBytes().length + 1;
                JsonNode node = mapper.readTree(line);
                batch.add(node);
                count++;
            }
            
            return batch.isEmpty() ? null : new ArrayResultIterator<>(batch.toArray(new JsonNode[0]));
        }
    }
}

Integration with Table API

Bulk formats can be integrated with Flink's Table API for structured data processing.

/**
 * Adapter for using bulk formats with file info extraction
 */
public class FileInfoExtractorBulkFormat<T> implements BulkFormat<RowData, FileSourceSplit> {
    /**
     * Creates bulk format that extracts file metadata along with records
     * @param wrappedFormat The underlying bulk format
     * @param metadataColumns File metadata columns to extract
     */
    public FileInfoExtractorBulkFormat(
        BulkFormat<T, FileSourceSplit> wrappedFormat,
        String[] metadataColumns);
    
    @Override
    public Reader<RowData> createReader(Configuration config, FileSourceSplit split) 
        throws IOException;
    
    @Override 
    public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
        throws IOException;
}

/**
 * Bulk format with column projection support
 */
public class ProjectingBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
    /**
     * Creates bulk format with column projection
     * @param wrappedFormat The underlying bulk format
     * @param projectedFields Fields to include in output
     */
    public ProjectingBulkFormat(
        BulkFormat<T, FileSourceSplit> wrappedFormat,
        int[] projectedFields);
}

/**
 * Bulk format with record limit support
 */
public class LimitableBulkFormat<T> implements BulkFormat<T, FileSourceSplit> {
    /**
     * Creates bulk format with record limit
     * @param wrappedFormat The underlying bulk format
     * @param limit Maximum number of records to read
     */
    public LimitableBulkFormat(BulkFormat<T, FileSourceSplit> wrappedFormat, long limit);
}

Advanced Usage Examples:

// Bulk format with projection for columnar formats
int[] projectedColumns = {0, 2, 4}; // Only read columns 0, 2, and 4
BulkFormat<RowData, FileSourceSplit> projectedFormat = new ProjectingBulkFormat<>(
    originalFormat, projectedColumns);

// Bulk format with file metadata extraction
String[] metadataColumns = {"file.path", "file.size", "file.modification-time"};
BulkFormat<RowData, FileSourceSplit> metadataFormat = new FileInfoExtractorBulkFormat<>(
    originalFormat, metadataColumns);

// Limited bulk format for sampling
BulkFormat<RowData, FileSourceSplit> limitedFormat = new LimitableBulkFormat<>(
    originalFormat, 1000); // Only read first 1000 records

FileSource<RowData> advancedSource = FileSource
    .forBulkFileFormat(limitedFormat, new Path("/data/samples"))
    .build();

Error Handling

Bulk formats handle various error conditions during batch reading:

  • IOException: File system read errors, corrupted file structures
  • RuntimeException: Format-specific parsing errors, schema mismatches
  • OutOfMemoryError: Batch sizes too large for available memory
try {
    BulkFormat.Reader<JsonNode> reader = format.createReader(config, split);
    BulkFormat.RecordIterator<JsonNode> batch;
    
    while ((batch = reader.readBatch()) != null) {
        try {
            while (batch.hasNext()) {
                JsonNode record = batch.next();
                // Process record
            }
        } finally {
            batch.releaseBatch(); // Always release batch resources
        }
    }
} catch (IOException e) {
    // Handle read errors
} catch (OutOfMemoryError e) {
    // Handle memory issues - consider reducing batch size
}

Performance Considerations

  • Choose appropriate batch sizes to balance memory usage and I/O efficiency
  • Always call releaseBatch() to prevent memory leaks
  • Use column projection to reduce data transfer and processing overhead
  • Consider file format characteristics (row-oriented vs. columnar) when choosing batch sizes
  • Bulk formats are typically more efficient than stream formats for high-throughput scenarios
  • Implement proper resource cleanup in custom bulk format implementations
  • Monitor memory usage and adjust batch sizes based on record size and available heap space

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-files

docs

bulk-formats.md

file-compaction.md

file-enumeration.md

file-sinks.md

file-sources.md

index.md

split-assignment.md

stream-formats.md

tile.json