CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Pending
Overview
Eval results
Files

stream-formats.mddocs/

Stream Formats

Stream formats provide record-by-record reading interfaces with automatic compression support for various file formats.

Capabilities

StreamFormat Interface

Core interface for implementing record-wise file reading with compression support.

/**
 * A reader format that reads individual records from a stream.
 * 
 * The outer class StreamFormat acts mainly as a configuration holder and factory for the
 * reader. The actual reading is done by the Reader, which is created based on
 * an input stream in the createReader method and restored (from checkpointed positions) 
 * in the restoreReader method.
 * 
 * Compared to the BulkFormat, the stream format handles a few things out-of-the-box,
 * like deciding how to batch records or dealing with compression.
 */
@PublicEvolving
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
    /**
     * Creates a new reader to read in this format. This method is called when a fresh reader is
     * created for a split that was assigned from the enumerator. This method may also be called on
     * recovery from a checkpoint, if the reader never stored an offset in the checkpoint.
     * 
     * If the format is splittable, then the stream is positioned to the beginning of the file split,
     * otherwise it will be at position zero.
     * 
     * The fileLen is the length of the entire file, while splitEnd is the offset
     * of the first byte after the split end boundary (exclusive end boundary). For non-splittable
     * formats, both values are identical.
     */
    Reader<T> createReader(
            Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
            throws IOException;
    
    /**
     * Restores a reader from a checkpointed position. This method is called when the reader is
     * recovered from a checkpoint and the reader has previously stored an offset into the
     * checkpoint, by returning from the Reader.getCheckpointedPosition() a value with
     * non-negative offset. That value is supplied as the restoredOffset.
     * 
     * If the format is splittable, then the stream is positioned to the beginning of the file split,
     * otherwise it will be at position zero. The stream is NOT positioned to the checkpointed offset,
     * because the format is free to interpret this offset in a different way than the byte offset in the file.
     */
    Reader<T> restoreReader(
            Configuration config,
            FSDataInputStream stream,
            long restoredOffset,
            long fileLen,
            long splitEnd)
            throws IOException;
    
    /**
     * Checks whether this format is splittable. Splittable formats allow Flink to create multiple
     * splits per file, so that Flink can read multiple regions of the file concurrently.
     */
    boolean isSplittable();
    
    /**
     * Gets the type produced by this format. This type will be the type produced by the file source
     * as a whole.
     */
    @Override
    TypeInformation<T> getProducedType();
    
    /**
     * The config option to define how many bytes to be read by the I/O thread in one fetch
     * operation.
     */
    ConfigOption<MemorySize> FETCH_IO_SIZE =
            ConfigOptions.key("source.file.stream.io-fetch-size")
                    .memoryType()
                    .defaultValue(MemorySize.ofMebiBytes(1L))
                    .withDescription(
                            "The approximate of bytes per fetch that is passed from the I/O thread to file reader.");
}

StreamFormat.Reader Interface

Nested interface for reading individual records from a stream.

/**
 * The actual reader that reads the records.
 */
@PublicEvolving
public interface Reader<T> extends Closeable {
    
    /**
     * Reads the next record. Returns null when the input has reached its end.
     */
    @Nullable
    T read() throws IOException;
    
    /**
     * Closes the reader to release all resources.
     */
    @Override
    void close() throws IOException;
    
    /**
     * Optionally returns the current position of the reader. This can be implemented by readers
     * that want to speed up recovery from a checkpoint.
     * 
     * The current position of the reader is the position of the next record that will be
     * returned in a call to read(). This can be implemented by readers that want to
     * speed up recovery from a checkpoint.
     */
    @Nullable
    default CheckpointedPosition getCheckpointedPosition() {
        return null;
    }
}

SimpleStreamFormat

Abstract base class for non-splittable stream formats.

/**
 * Simplified stream format for non-splittable files
 */
public abstract class SimpleStreamFormat<T> implements StreamFormat<T> {
    /**
     * Creates a reader for the entire file stream (simplified interface)
     * @param config Configuration for the reader
     * @param stream Input stream to read from
     * @return Reader instance for reading records
     * @throws IOException If reader creation fails
     */
    public abstract Reader<T> createReader(Configuration config, FSDataInputStream stream) 
        throws IOException;
    
    /**
     * Always returns false for simple formats
     * @return false (simple formats are not splittable)
     */
    public final boolean isSplittable() {
        return false;
    }
}

TextLineInputFormat

Built-in implementation for reading text files line by line.

/**
 * Stream format for reading text files line by line with charset support
 */  
public class TextLineInputFormat extends SimpleStreamFormat<String> {
    /**
     * Creates TextLineInputFormat with UTF-8 encoding
     */
    public TextLineInputFormat();
    
    /**
     * Creates TextLineInputFormat with specified charset
     * @param charsetName Name of charset to use for decoding
     */
    public TextLineInputFormat(String charsetName);
    
    /**
     * Creates a reader for reading text lines
     * @param config Configuration for the reader
     * @param stream Input stream to read from
     * @return Reader that returns String lines
     * @throws IOException If reader creation fails
     */
    public Reader<String> createReader(Configuration config, FSDataInputStream stream) 
        throws IOException;
    
    /**
     * Returns TypeInformation for String output
     * @return TypeInformation describing String type
     */
    public TypeInformation<String> getProducedType() {
        return Types.STRING;
    }
}

Usage Examples:

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.Path;

// Reading text files with UTF-8 encoding
FileSource<String> textSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/data/logs"))
    .build();

// Reading text files with custom encoding  
FileSource<String> customEncodingSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat("ISO-8859-1"), new Path("/data/legacy"))
    .build();

// Using the source in a Flink job
DataStream<String> lines = env.fromSource(textSource, WatermarkStrategy.noWatermarks(), "text-source");

Custom StreamFormat Implementation

Example of implementing a custom stream format.

/**
 * Example custom stream format for reading CSV records
 */
public class CsvStreamFormat implements StreamFormat<String[]> {
    private final String delimiter;
    
    public CsvStreamFormat(String delimiter) {
        this.delimiter = delimiter;
    }
    
    @Override
    public Reader<String[]> createReader(
            Configuration config, 
            FSDataInputStream stream, 
            long fileLen, 
            long splitEnd) throws IOException {
        return new CsvReader(stream, splitEnd, delimiter);
    }
    
    @Override
    public Reader<String[]> restoreReader(
            Configuration config,
            FSDataInputStream stream,
            long fileLen,
            long splitEnd,
            long checkpointedOffset) throws IOException {
        stream.seek(checkpointedOffset);
        return new CsvReader(stream, splitEnd, delimiter);
    }
    
    @Override
    public boolean isSplittable() {
        return true; // CSV can be split at line boundaries
    }
    
    @Override
    public TypeInformation<String[]> getProducedType() {
        return Types.OBJECT_ARRAY(Types.STRING);
    }
    
    private static class CsvReader implements StreamFormat.Reader<String[]> {
        private final BufferedReader reader;
        private final long splitEnd;
        private final String delimiter;
        private long bytesRead = 0;
        
        public CsvReader(FSDataInputStream stream, long splitEnd, String delimiter) {
            this.reader = new BufferedReader(new InputStreamReader(stream));
            this.splitEnd = splitEnd;
            this.delimiter = delimiter;
        }
        
        @Override
        public String[] read() throws IOException {
            if (bytesRead >= splitEnd) {
                return null;
            }
            
            String line = reader.readLine();
            if (line == null) {
                return null;
            }
            
            bytesRead += line.getBytes().length + 1; // +1 for newline
            return line.split(delimiter);
        }
    }
}

Compression Support Integration

Stream formats automatically support compression through the compression detection system.

/**
 * Stream formats automatically detect and handle compressed files
 * Supported extensions: .gz, .gzip, .bz2, .xz, .deflate
 */

// Reading compressed text files - compression is handled automatically
FileSource<String> compressedSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), 
        new Path("/data/logs.gz"),
        new Path("/data/archive.bz2"))
    .build();

// Custom format with compression support
FileSource<String[]> compressedCsvSource = FileSource
    .forRecordStreamFormat(new CsvStreamFormat(","), new Path("/data/data.csv.gz"))
    .build();

Error Handling

Stream formats handle various error conditions during reading:

  • IOException: File system read errors, stream corruption
  • UnsupportedEncodingException: Invalid charset specifications
  • EOFException: Unexpected end of file during reading
  • RuntimeException: Format-specific parsing errors
try {
    StreamFormat<String> format = new TextLineInputFormat("INVALID-CHARSET");
} catch (UnsupportedEncodingException e) {
    // Handle invalid charset
}

// Reader error handling
StreamFormat.Reader<String> reader = format.createReader(config, stream, fileLen, splitEnd);
try {
    String record;
    while ((record = reader.read()) != null) {
        // Process record
    }
} catch (IOException e) {
    // Handle read errors
}

Performance Considerations

  • Implement isSplittable() correctly - splittable formats can be processed in parallel
  • Use appropriate buffer sizes in custom readers for optimal I/O performance
  • Consider memory usage when reading large records or implementing custom formats
  • Compression detection adds minimal overhead and improves storage efficiency
  • For high-throughput scenarios, consider BulkFormat instead of StreamFormat
  • Implement proper checkpointing support for exactly-once processing guarantees

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-files

docs

bulk-formats.md

file-compaction.md

file-enumeration.md

file-sinks.md

file-sources.md

index.md

split-assignment.md

stream-formats.md

tile.json