Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-files@2.1.0Apache Flink File Connector provides a unified data source and sink for reading and writing files in both batch and streaming modes. It supports various file formats through StreamFormat and BulkFormat interfaces, with features like continuous monitoring, file splitting, compression support, and distributed processing capabilities across different file systems and object stores.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>2.1.0</version></dependency>import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import java.time.Duration;
// Create a file source for reading text files
FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineInputFormat(), new Path("/path/to/input/files"))
.monitorContinuously(Duration.ofSeconds(10))
.build();
// Use in DataStream API
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
// Create a file sink for writing text files
FileSink<String> sink = FileSink
.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.withRolloverInterval(Duration.ofMinutes(15))
.build())
.build();
// Use in DataStream API
stream.sinkTo(sink);The Flink File Connector is built around several key components:
FileSource provides unified reading with support for both streaming and batch modesFileSink handles writing with exactly-once semantics and file rollingStreamFormat for record-by-record reading, BulkFormat for batch-oriented readingFileEnumerator implementations discover files and create splits for parallel processingFileSplitAssigner manages distribution of file splits to reader nodes with locality awarenessUnified file reading with support for various formats, continuous monitoring, and distributed processing.
public static <T> FileSourceBuilder<T> forRecordStreamFormat(
final StreamFormat<T> streamFormat, final Path... paths);
public static <T> FileSourceBuilder<T> forBulkFileFormat(
final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);Unified file writing with exactly-once semantics, bucketing, rolling policies, and optional compaction.
public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
final Path basePath, final Encoder<IN> encoder);
public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);Interface for record-by-record file reading with automatic compression support.
@PublicEvolving
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
Reader<T> createReader(
Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
throws IOException;
Reader<T> restoreReader(
Configuration config,
FSDataInputStream stream,
long restoredOffset,
long fileLen,
long splitEnd)
throws IOException;
boolean isSplittable();
@Override
TypeInformation<T> getProducedType();
@PublicEvolving
interface Reader<T> extends Closeable {
@Nullable
T read() throws IOException;
@Override
void close() throws IOException;
@Nullable
default CheckpointedPosition getCheckpointedPosition() {
return null;
}
}
}Interface for batch-oriented reading optimized for formats like ORC and Parquet.
@PublicEvolving
public interface BulkFormat<T, SplitT extends FileSourceSplit>
extends Serializable, ResultTypeQueryable<T> {
BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
boolean isSplittable();
@Override
TypeInformation<T> getProducedType();
interface Reader<T> extends Closeable {
@Nullable
RecordIterator<T> readBatch() throws IOException;
@Override
void close() throws IOException;
}
interface RecordIterator<T> {
@Nullable
RecordAndPosition<T> next();
void releaseBatch();
}
}File discovery mechanisms for finding and splitting files across distributed storage systems.
public interface FileEnumerator {
Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
interface Provider extends Serializable {
FileEnumerator create();
}
}Split assignment strategies for distributing file processing across nodes with locality awareness.
public interface FileSplitAssigner {
Optional<FileSourceSplit> getNext(String hostname);
void addSplits(Collection<FileSourceSplit> splits);
Collection<FileSourceSplit> remainingSplits();
interface Provider extends Serializable {
FileSplitAssigner create(Collection<FileSourceSplit> splits);
}
}File compaction system for merging small files to improve performance and reduce metadata overhead.
public interface FileCompactor {
void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}
public interface FileCompactStrategy {
long getSizeThreshold();
int getNumCheckpointsBeforeCompaction();
int getNumCompactThreads();
}public class FileSourceSplit {
public FileSourceSplit(String id, Path path, long offset, long length,
long modificationTime, long fileSize, String... hostnames);
public Path path();
public long offset();
public long length();
public long fileModificationTime();
public String[] hostnames();
public CheckpointedPosition getReaderPosition();
public FileSourceSplit updateWithCheckpointedPosition(CheckpointedPosition position);
}
public class CheckpointedPosition {
public static final long NO_OFFSET = -1L;
public CheckpointedPosition(long offset, long recordsAfterOffset);
public long getOffset();
public long getRecordsAfterOffset();
}public class ContinuousEnumerationSettings {
public ContinuousEnumerationSettings(Duration discoveryInterval);
public Duration getDiscoveryInterval();
}