Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities
—
Unified connector framework for integrating with external systems, supporting both source and sink operations with exactly-once processing guarantees.
New unified source interface for reading data from external systems.
/**
* Unified source interface
* @param <T> Output element type
* @param <SplitT> Split type
* @param <EnumChkT> Enumerator checkpoint type
*/
interface Source<T, SplitT extends SourceSplit, EnumChkT> {
/**
* Get source reader
* @param readerContext Reader context
* @return Source reader
*/
SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
/**
* Create split enumerator
* @param enumContext Enumerator context
* @return Split enumerator
*/
SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);
/**
* Restore split enumerator
* @param enumContext Enumerator context
* @param checkpoint Checkpoint
* @return Restored enumerator
*/
SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint);
/**
* Get split serializer
* @return Split serializer
*/
SimpleVersionedSerializer<SplitT> getSplitSerializer();
/**
* Get enumerator checkpoint serializer
* @return Checkpoint serializer
*/
SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
/**
* Get source boundedness
* @return Boundedness
*/
Boundedness getBoundedness();
}
/**
* Source reader interface
* @param <T> Element type
* @param <SplitT> Split type
*/
interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {
/**
* Start the reader
*/
void start();
/**
* Poll for next batch of records
* @return Input status
* @throws Exception
*/
InputStatus pollNext(ReaderOutput<T> output) throws Exception;
/**
* List completed checkpoints
* @return List of completed checkpoints
*/
List<SplitT> snapshotState(long checkpointId);
/**
* Add splits to reader
* @param splits Splits to add
*/
void addSplits(List<SplitT> splits);
/**
* Handle no more splits
*/
void notifyNoMoreSplits();
@Override
void close() throws Exception;
}
/**
* Split enumerator interface
* @param <SplitT> Split type
* @param <CheckpointT> Checkpoint type
*/
interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {
/**
* Start the enumerator
*/
void start();
/**
* Handle split request
* @param subtaskId Subtask requesting splits
* @param requesterHostname Requester hostname
*/
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
/**
* Add splits back to enumerator
* @param splits Splits to add back
* @param subtaskId Subtask ID
*/
void addSplitsBack(List<SplitT> splits, int subtaskId);
/**
* Add new reader
* @param subtaskId Subtask ID
*/
void addReader(int subtaskId);
/**
* Snapshot enumerator state
* @param checkpointId Checkpoint ID
* @return Checkpoint state
* @throws Exception
*/
CheckpointT snapshotState(long checkpointId) throws Exception;
@Override
void close() throws IOException;
}New unified sink interface for writing data to external systems.
/**
* Unified sink interface
* @param <InputT> Input element type
*/
interface Sink<InputT> {
/**
* Create sink writer
* @param context Writer initialization context
* @return Sink writer
*/
SinkWriter<InputT> createWriter(WriterInitContext context);
/**
* Restore sink writer
* @param context Writer initialization context
* @param recoveredState Recovered state
* @return Restored sink writer
* @throws IOException
*/
default SinkWriter<InputT> restoreWriter(WriterInitContext context, Collection<WriterState> recoveredState) throws IOException {
return createWriter(context);
}
/**
* Create committer
* @param <CommittableT> Committable type
* @return Optional committer
*/
default <CommittableT> Optional<Committer<CommittableT>> createCommitter() {
return Optional.empty();
}
/**
* Create global committer
* @param <CommittableT> Committable type
* @param <GlobalCommittableT> Global committable type
* @return Optional global committer
*/
default <CommittableT, GlobalCommittableT> Optional<GlobalCommitter<CommittableT, GlobalCommittableT>> createGlobalCommitter() {
return Optional.empty();
}
/**
* Get writer state serializer
* @return Optional state serializer
*/
default Optional<SimpleVersionedSerializer<WriterState>> getWriterStateSerializer() {
return Optional.empty();
}
}
/**
* Sink writer interface
* @param <InputT> Input element type
*/
interface SinkWriter<InputT> extends AutoCloseable {
/**
* Write element
* @param element Element to write
* @param context Write context
* @throws IOException
* @throws InterruptedException
*/
void write(InputT element, Context context) throws IOException, InterruptedException;
/**
* Flush buffered elements
* @param endOfInput Whether this is end of input
* @return List of committables
* @throws IOException
* @throws InterruptedException
*/
List<CommittableT> flush(boolean endOfInput) throws IOException, InterruptedException;
/**
* Snapshot writer state
* @param checkpointId Checkpoint ID
* @return Writer state
* @throws IOException
*/
default List<WriterState> snapshotState(long checkpointId) throws IOException {
return Collections.emptyList();
}
@Override
void close() throws Exception;
/**
* Write context
*/
interface Context {
/**
* Get current event time
* @return Event time
*/
long currentEventTime();
/**
* Get element timestamp
* @return Element timestamp
*/
Long timestamp();
}
}Base classes and interfaces for building connectors.
/**
* Base split interface
*/
interface SourceSplit {
/**
* Get split ID
* @return Split identifier
*/
String splitId();
}
/**
* Source split with file information
*/
class FileSourceSplit implements SourceSplit {
/**
* Create file source split
* @param id Split ID
* @param path File path
* @param offset Start offset
* @param length Split length
*/
public FileSourceSplit(String id, Path path, long offset, long length);
/**
* Get file path
* @return File path
*/
public Path path();
/**
* Get start offset
* @return Start offset
*/
public long offset();
/**
* Get split length
* @return Split length
*/
public long length();
}
/**
* Input status enumeration
*/
enum InputStatus {
/** More data available */
MORE_AVAILABLE,
/** No more data */
NOTHING_AVAILABLE,
/** End of input */
END_OF_INPUT
}
/**
* Boundedness enumeration
*/
enum Boundedness {
/** Bounded source */
BOUNDED,
/** Unbounded source */
CONTINUOUS_UNBOUNDED
}
/**
* Reader output interface
* @param <T> Element type
*/
interface ReaderOutput<T> {
/**
* Collect element
* @param element Element to collect
*/
void collect(T element);
/**
* Collect element with timestamp
* @param element Element to collect
* @param timestamp Element timestamp
*/
void collect(T element, long timestamp);
/**
* Emit watermark
* @param watermark Watermark to emit
*/
void emitWatermark(Watermark watermark);
/**
* Mark source idle
*/
void markIdle();
/**
* Mark source active
*/
void markActive();
}File-based source and sink connectors.
/**
* File source for reading files
* @param <T> Element type
*/
class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint> {
/**
* Create file source builder
* @param <T> Element type
* @return File source builder
*/
public static <T> FileSourceBuilder<T> forRecordStreamFormat(StreamFormat<T> streamFormat);
/**
* Create file source for bulk format
* @param bulkFormat Bulk format
* @param <T> Element type
* @return File source builder
*/
public static <T> FileSourceBuilder<T> forBulkFileFormat(BulkFormat<T, ?> bulkFormat);
}
/**
* File source builder
* @param <T> Element type
*/
class FileSourceBuilder<T> {
/**
* Set file paths to read
* @param inputPaths Input paths
* @return Builder
*/
public FileSourceBuilder<T> setFilePaths(Path... inputPaths);
/**
* Set file paths to read
* @param inputPaths Input paths
* @return Builder
*/
public FileSourceBuilder<T> setFilePaths(Collection<Path> inputPaths);
/**
* Monitor path for new files
* @param monitoredPath Path to monitor
* @param processingMode Processing mode
* @return Builder
*/
public FileSourceBuilder<T> monitorContinuously(Path monitoredPath, Duration discoveryInterval);
/**
* Set file path filter
* @param pathFilter Path filter
* @return Builder
*/
public FileSourceBuilder<T> setFilePathFilter(PathFilter pathFilter);
/**
* Build file source
* @return File source
*/
public FileSource<T> build();
}
/**
* Stream format interface
* @param <T> Element type
*/
interface StreamFormat<T> {
/**
* Create reader for input stream
* @param config Configuration
* @param inputStream Input stream
* @param fileLen File length
* @param splitEnd Split end position
* @return Stream format reader
* @throws IOException
*/
Reader<T> createReader(Configuration config, FSDataInputStream inputStream, long fileLen, long splitEnd) throws IOException;
/**
* Check if format is splittable
* @return true if splittable
*/
boolean isSplittable();
/**
* Stream format reader
* @param <T> Element type
*/
interface Reader<T> extends AutoCloseable {
/**
* Read next record
* @return Next record or null if end of split
* @throws IOException
*/
T read() throws IOException;
@Override
void close() throws IOException;
}
}Types used in the sink framework for two-phase commit scenarios.
/**
* Base interface for committable data
*/
interface Committable {}
/**
* Marker interface for writer state
*/
interface WriterState {}
/**
* Committer interface for two-phase commit
* @param <CommittableT> Committable type
*/
interface Committer<CommittableT> extends AutoCloseable {
/**
* Commit the committables
* @param committables List of committables to commit
* @return List of retry committables
* @throws IOException
* @throws InterruptedException
*/
List<CommittableT> commit(List<CommittableT> committables) throws IOException, InterruptedException;
@Override
void close() throws Exception;
}
/**
* Global committer interface
* @param <CommittableT> Committable type
* @param <GlobalCommittableT> Global committable type
*/
interface GlobalCommitter<CommittableT, GlobalCommittableT> extends AutoCloseable {
/**
* Combine committables for global commit
* @param committables List of committables
* @return Combined global committable
* @throws IOException
*/
GlobalCommittableT combine(List<CommittableT> committables) throws IOException;
/**
* Commit global committable
* @param globalCommittables List of global committables
* @return List of retry global committables
* @throws IOException
* @throws InterruptedException
*/
List<GlobalCommittableT> commit(List<GlobalCommittableT> globalCommittables) throws IOException, InterruptedException;
@Override
void close() throws Exception;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-parent