CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-parent

Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities

Pending
Overview
Eval results
Files

connectors.mddocs/

Connector Framework

Unified connector framework for integrating with external systems, supporting both source and sink operations with exactly-once processing guarantees.

Capabilities

Source Framework

New unified source interface for reading data from external systems.

/**
 * Unified source interface
 * @param <T> Output element type
 * @param <SplitT> Split type
 * @param <EnumChkT> Enumerator checkpoint type
 */
interface Source<T, SplitT extends SourceSplit, EnumChkT> {
    /**
     * Get source reader
     * @param readerContext Reader context
     * @return Source reader
     */
    SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
    
    /**
     * Create split enumerator
     * @param enumContext Enumerator context
     * @return Split enumerator
     */
    SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);
    
    /**
     * Restore split enumerator
     * @param enumContext Enumerator context
     * @param checkpoint Checkpoint
     * @return Restored enumerator
     */
    SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint);
    
    /**
     * Get split serializer
     * @return Split serializer
     */
    SimpleVersionedSerializer<SplitT> getSplitSerializer();
    
    /**
     * Get enumerator checkpoint serializer
     * @return Checkpoint serializer
     */
    SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();
    
    /**
     * Get source boundedness
     * @return Boundedness
     */
    Boundedness getBoundedness();
}

/**
 * Source reader interface
 * @param <T> Element type
 * @param <SplitT> Split type
 */
interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {
    /**
     * Start the reader
     */
    void start();
    
    /**
     * Poll for next batch of records
     * @return Input status
     * @throws Exception
     */
    InputStatus pollNext(ReaderOutput<T> output) throws Exception;
    
    /**
     * List completed checkpoints
     * @return List of completed checkpoints
     */
    List<SplitT> snapshotState(long checkpointId);
    
    /**
     * Add splits to reader
     * @param splits Splits to add
     */
    void addSplits(List<SplitT> splits);
    
    /**
     * Handle no more splits
     */
    void notifyNoMoreSplits();
    
    @Override
    void close() throws Exception;
}

/**
 * Split enumerator interface
 * @param <SplitT> Split type
 * @param <CheckpointT> Checkpoint type
 */
interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {
    /**
     * Start the enumerator
     */
    void start();
    
    /**
     * Handle split request
     * @param subtaskId Subtask requesting splits
     * @param requesterHostname Requester hostname
     */
    void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
    
    /**
     * Add splits back to enumerator
     * @param splits Splits to add back
     * @param subtaskId Subtask ID
     */
    void addSplitsBack(List<SplitT> splits, int subtaskId);
    
    /**
     * Add new reader
     * @param subtaskId Subtask ID
     */
    void addReader(int subtaskId);
    
    /**
     * Snapshot enumerator state
     * @param checkpointId Checkpoint ID
     * @return Checkpoint state
     * @throws Exception
     */
    CheckpointT snapshotState(long checkpointId) throws Exception;
    
    @Override
    void close() throws IOException;
}

Sink Framework

New unified sink interface for writing data to external systems.

/**
 * Unified sink interface
 * @param <InputT> Input element type
 */
interface Sink<InputT> {
    /**
     * Create sink writer
     * @param context Writer initialization context
     * @return Sink writer
     */
    SinkWriter<InputT> createWriter(WriterInitContext context);
    
    /**
     * Restore sink writer
     * @param context Writer initialization context
     * @param recoveredState Recovered state
     * @return Restored sink writer
     * @throws IOException
     */
    default SinkWriter<InputT> restoreWriter(WriterInitContext context, Collection<WriterState> recoveredState) throws IOException {
        return createWriter(context);
    }
    
    /**
     * Create committer
     * @param <CommittableT> Committable type
     * @return Optional committer
     */
    default <CommittableT> Optional<Committer<CommittableT>> createCommitter() {
        return Optional.empty();
    }
    
    /**
     * Create global committer
     * @param <CommittableT> Committable type
     * @param <GlobalCommittableT> Global committable type
     * @return Optional global committer
     */
    default <CommittableT, GlobalCommittableT> Optional<GlobalCommitter<CommittableT, GlobalCommittableT>> createGlobalCommitter() {
        return Optional.empty();
    }
    
    /**
     * Get writer state serializer
     * @return Optional state serializer
     */
    default Optional<SimpleVersionedSerializer<WriterState>> getWriterStateSerializer() {
        return Optional.empty();
    }
}

/**
 * Sink writer interface
 * @param <InputT> Input element type
 */
interface SinkWriter<InputT> extends AutoCloseable {
    /**
     * Write element
     * @param element Element to write
     * @param context Write context
     * @throws IOException
     * @throws InterruptedException
     */
    void write(InputT element, Context context) throws IOException, InterruptedException;
    
    /**
     * Flush buffered elements
     * @param endOfInput Whether this is end of input
     * @return List of committables
     * @throws IOException
     * @throws InterruptedException
     */
    List<CommittableT> flush(boolean endOfInput) throws IOException, InterruptedException;
    
    /**
     * Snapshot writer state
     * @param checkpointId Checkpoint ID
     * @return Writer state
     * @throws IOException
     */
    default List<WriterState> snapshotState(long checkpointId) throws IOException {
        return Collections.emptyList();
    }
    
    @Override
    void close() throws Exception;
    
    /**
     * Write context
     */
    interface Context {
        /**
         * Get current event time
         * @return Event time
         */
        long currentEventTime();
        
        /**
         * Get element timestamp
         * @return Element timestamp
         */
        Long timestamp();
    }
}

Base Connector Components

Base classes and interfaces for building connectors.

/**
 * Base split interface
 */
interface SourceSplit {
    /**
     * Get split ID
     * @return Split identifier
     */
    String splitId();
}

/**
 * Source split with file information
 */
class FileSourceSplit implements SourceSplit {
    /**
     * Create file source split
     * @param id Split ID
     * @param path File path
     * @param offset Start offset
     * @param length Split length
     */
    public FileSourceSplit(String id, Path path, long offset, long length);
    
    /**
     * Get file path
     * @return File path
     */
    public Path path();
    
    /**
     * Get start offset
     * @return Start offset
     */
    public long offset();
    
    /**
     * Get split length
     * @return Split length
     */
    public long length();
}

/**
 * Input status enumeration
 */
enum InputStatus {
    /** More data available */
    MORE_AVAILABLE,
    /** No more data */
    NOTHING_AVAILABLE,
    /** End of input */
    END_OF_INPUT
}

/**
 * Boundedness enumeration
 */
enum Boundedness {
    /** Bounded source */
    BOUNDED,
    /** Unbounded source */
    CONTINUOUS_UNBOUNDED
}

/**
 * Reader output interface
 * @param <T> Element type
 */
interface ReaderOutput<T> {
    /**
     * Collect element
     * @param element Element to collect
     */
    void collect(T element);
    
    /**
     * Collect element with timestamp
     * @param element Element to collect
     * @param timestamp Element timestamp
     */
    void collect(T element, long timestamp);
    
    /**
     * Emit watermark
     * @param watermark Watermark to emit
     */
    void emitWatermark(Watermark watermark);
    
    /**
     * Mark source idle
     */
    void markIdle();
    
    /**
     * Mark source active
     */
    void markActive();
}

File Connectors

File-based source and sink connectors.

/**
 * File source for reading files
 * @param <T> Element type
 */
class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint> {
    /**
     * Create file source builder
     * @param <T> Element type
     * @return File source builder
     */
    public static <T> FileSourceBuilder<T> forRecordStreamFormat(StreamFormat<T> streamFormat);
    
    /**
     * Create file source for bulk format
     * @param bulkFormat Bulk format
     * @param <T> Element type
     * @return File source builder
     */
    public static <T> FileSourceBuilder<T> forBulkFileFormat(BulkFormat<T, ?> bulkFormat);
}

/**
 * File source builder
 * @param <T> Element type
 */
class FileSourceBuilder<T> {
    /**
     * Set file paths to read
     * @param inputPaths Input paths
     * @return Builder
     */
    public FileSourceBuilder<T> setFilePaths(Path... inputPaths);
    
    /**
     * Set file paths to read
     * @param inputPaths Input paths
     * @return Builder
     */
    public FileSourceBuilder<T> setFilePaths(Collection<Path> inputPaths);
    
    /**
     * Monitor path for new files
     * @param monitoredPath Path to monitor
     * @param processingMode Processing mode
     * @return Builder
     */
    public FileSourceBuilder<T> monitorContinuously(Path monitoredPath, Duration discoveryInterval);
    
    /**
     * Set file path filter
     * @param pathFilter Path filter
     * @return Builder
     */
    public FileSourceBuilder<T> setFilePathFilter(PathFilter pathFilter);
    
    /**
     * Build file source
     * @return File source
     */
    public FileSource<T> build();
}

/**
 * Stream format interface
 * @param <T> Element type
 */
interface StreamFormat<T> {
    /**
     * Create reader for input stream
     * @param config Configuration
     * @param inputStream Input stream
     * @param fileLen File length
     * @param splitEnd Split end position
     * @return Stream format reader
     * @throws IOException
     */
    Reader<T> createReader(Configuration config, FSDataInputStream inputStream, long fileLen, long splitEnd) throws IOException;
    
    /**
     * Check if format is splittable
     * @return true if splittable
     */
    boolean isSplittable();
    
    /**
     * Stream format reader
     * @param <T> Element type
     */
    interface Reader<T> extends AutoCloseable {
        /**
         * Read next record
         * @return Next record or null if end of split
         * @throws IOException
         */
        T read() throws IOException;
        
        @Override
        void close() throws IOException;
    }
}

Committable Types

Types used in the sink framework for two-phase commit scenarios.

/**
 * Base interface for committable data
 */
interface Committable {}

/**
 * Marker interface for writer state
 */
interface WriterState {}

/**
 * Committer interface for two-phase commit
 * @param <CommittableT> Committable type
 */
interface Committer<CommittableT> extends AutoCloseable {
    /**
     * Commit the committables
     * @param committables List of committables to commit
     * @return List of retry committables
     * @throws IOException
     * @throws InterruptedException
     */
    List<CommittableT> commit(List<CommittableT> committables) throws IOException, InterruptedException;
    
    @Override
    void close() throws Exception;
}

/**
 * Global committer interface
 * @param <CommittableT> Committable type
 * @param <GlobalCommittableT> Global committable type
 */
interface GlobalCommitter<CommittableT, GlobalCommittableT> extends AutoCloseable {
    /**
     * Combine committables for global commit
     * @param committables List of committables
     * @return Combined global committable
     * @throws IOException
     */
    GlobalCommittableT combine(List<CommittableT> committables) throws IOException;
    
    /**
     * Commit global committable
     * @param globalCommittables List of global committables
     * @return List of retry global committables
     * @throws IOException
     * @throws InterruptedException
     */
    List<GlobalCommittableT> commit(List<GlobalCommittableT> globalCommittables) throws IOException, InterruptedException;
    
    @Override
    void close() throws Exception;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-parent

docs

configuration.md

connectors.md

core-functions.md

datastream-traditional.md

datastream-v2.md

index.md

state-management.md

table-api.md

windowing.md

tile.json