CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

connectors.mddocs/

Connectors

Apache Flink Core provides comprehensive connector APIs for building data sources and sinks that integrate with external systems. These APIs enable developers to create efficient, fault-tolerant connectors with features like checkpointing, parallelism, and exactly-once semantics.

Source Connector Framework

Basic Source Interface

The foundation for all Flink data sources.

import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;

// Basic source implementation
public class CustomSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
    
    @Override
    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED; // or BOUNDED
    }
    
    @Override
    public SourceReader<MyRecord, MySourceSplit> createReader(SourceReaderContext readerContext) {
        return new MySourceReader(readerContext);
    }
    
    @Override
    public SplitEnumerator<MySourceSplit, MyEnumeratorState> createEnumerator(
            SplitEnumeratorContext<MySourceSplit> enumContext) {
        return new MySplitEnumerator(enumContext);
    }
    
    @Override
    public SplitEnumerator<MySourceSplit, MyEnumeratorState> restoreEnumerator(
            SplitEnumeratorContext<MySourceSplit> enumContext,
            MyEnumeratorState checkpoint) {
        return new MySplitEnumerator(enumContext, checkpoint);
    }
    
    @Override
    public SimpleVersionedSerializer<MySourceSplit> getSplitSerializer() {
        return new MySourceSplitSerializer();
    }
    
    @Override
    public SimpleVersionedSerializer<MyEnumeratorState> getEnumeratorCheckpointSerializer() {
        return new MyEnumeratorStateSerializer();
    }
}

Source Split Definition

Define how data is partitioned and processed.

import org.apache.flink.api.connector.source.SourceSplit;

// Custom source split
public class MySourceSplit implements SourceSplit {
    private final String splitId;
    private final String filepath;
    private final long startOffset;
    private final long endOffset;
    
    public MySourceSplit(String splitId, String filepath, long startOffset, long endOffset) {
        this.splitId = splitId;
        this.filepath = filepath;
        this.startOffset = startOffset;
        this.endOffset = endOffset;
    }
    
    @Override
    public String splitId() {
        return splitId;
    }
    
    // Getters
    public String getFilepath() { return filepath; }
    public long getStartOffset() { return startOffset; }
    public long getEndOffset() { return endOffset; }
    
    @Override
    public String toString() {
        return String.format("MySourceSplit{id='%s', file='%s', range=[%d, %d]}", 
            splitId, filepath, startOffset, endOffset);
    }
}

// Split serializer for checkpointing
public class MySourceSplitSerializer implements SimpleVersionedSerializer<MySourceSplit> {
    
    @Override
    public int getVersion() {
        return 1;
    }
    
    @Override
    public byte[] serialize(MySourceSplit split) throws IOException {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream out = new DataOutputStream(baos)) {
            
            out.writeUTF(split.splitId());
            out.writeUTF(split.getFilepath());
            out.writeLong(split.getStartOffset());
            out.writeLong(split.getEndOffset());
            
            return baos.toByteArray();
        }
    }
    
    @Override
    public MySourceSplit deserialize(int version, byte[] serialized) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
             DataInputStream in = new DataInputStream(bais)) {
            
            String splitId = in.readUTF();
            String filepath = in.readUTF();
            long startOffset = in.readLong();
            long endOffset = in.readLong();
            
            return new MySourceSplit(splitId, filepath, startOffset, endOffset);
        }
    }
}

Split Enumerator

Discovers and assigns splits to source readers.

import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;

public class MySplitEnumerator implements SplitEnumerator<MySourceSplit, MyEnumeratorState> {
    private final SplitEnumeratorContext<MySourceSplit> context;
    private final Set<String> remainingFiles;
    private final Map<Integer, Set<String>> readerAssignments;
    
    public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context) {
        this.context = context;
        this.remainingFiles = discoverFiles();
        this.readerAssignments = new HashMap<>();
    }
    
    public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context, 
                           MyEnumeratorState restoredState) {
        this.context = context;
        this.remainingFiles = restoredState.getRemainingFiles();
        this.readerAssignments = restoredState.getReaderAssignments();
    }
    
    @Override
    public void start() {
        // Initialize and assign initial splits
        assignSplitsToReaders();
    }
    
    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        // Assign more splits when requested
        if (!remainingFiles.isEmpty()) {
            String nextFile = remainingFiles.iterator().next();
            remainingFiles.remove(nextFile);
            
            MySourceSplit split = createSplitFromFile(nextFile, subtaskId);
            context.assignSplit(split, subtaskId);
            
            // Track assignment
            readerAssignments.computeIfAbsent(subtaskId, k -> new HashSet<>()).add(nextFile);
        } else {
            // No more splits available
            context.signalNoMoreSplits(subtaskId);
        }
    }
    
    @Override
    public void addSplitsBack(List<MySourceSplit> splits, int subtaskId) {
        // Handle split reassignment on failure
        for (MySourceSplit split : splits) {
            remainingFiles.add(split.getFilepath());
            readerAssignments.get(subtaskId).remove(split.getFilepath());
        }
    }
    
    @Override
    public void addReader(int subtaskId) {
        // New reader registered
        readerAssignments.put(subtaskId, new HashSet<>());
        assignSplitsToReader(subtaskId);
    }
    
    @Override
    public MyEnumeratorState snapshotState(long checkpointId) throws Exception {
        return new MyEnumeratorState(remainingFiles, readerAssignments);
    }
    
    @Override
    public void close() throws IOException {
        // Cleanup resources
    }
    
    private void assignSplitsToReaders() {
        for (int readerId : context.registeredReaders().keySet()) {
            assignSplitsToReader(readerId);
        }
    }
    
    private void assignSplitsToReader(int readerId) {
        // Assign initial splits to reader
        if (!remainingFiles.isEmpty()) {
            String file = remainingFiles.iterator().next();
            remainingFiles.remove(file);
            
            MySourceSplit split = createSplitFromFile(file, readerId);
            context.assignSplit(split, readerId);
            readerAssignments.get(readerId).add(file);
        }
    }
    
    private MySourceSplit createSplitFromFile(String filepath, int readerId) {
        String splitId = String.format("%s-%d", filepath, readerId);
        // Calculate file offsets based on parallelism
        return new MySourceSplit(splitId, filepath, 0, getFileSize(filepath));
    }
    
    private Set<String> discoverFiles() {
        // Discover files to process
        return new HashSet<>(Arrays.asList("file1.txt", "file2.txt", "file3.txt"));
    }
    
    private long getFileSize(String filepath) {
        // Get file size for split calculation
        return 1024 * 1024; // 1MB example
    }
}

Source Reader

Reads records from assigned splits.

import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;

public class MySourceReader implements SourceReader<MyRecord, MySourceSplit> {
    private final SourceReaderContext context;
    private final Queue<MySourceSplit> pendingSplits;
    private final Map<String, MyFileReader> activeReaders;
    
    public MySourceReader(SourceReaderContext context) {
        this.context = context;
        this.pendingSplits = new LinkedList<>();
        this.activeReaders = new HashMap<>();
    }
    
    @Override
    public void start() {
        // Initialize reader
    }
    
    @Override
    public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
        // Check for available data
        if (activeReaders.isEmpty() && pendingSplits.isEmpty()) {
            // Request more splits if needed
            context.sendSplitRequest();
            return InputStatus.NOTHING_AVAILABLE;
        }
        
        // Process pending splits
        while (!pendingSplits.isEmpty()) {
            MySourceSplit split = pendingSplits.poll();
            MyFileReader fileReader = new MyFileReader(split);
            activeReaders.put(split.splitId(), fileReader);
        }
        
        // Read records from active readers
        boolean hasData = false;
        Iterator<Map.Entry<String, MyFileReader>> iterator = activeReaders.entrySet().iterator();
        
        while (iterator.hasNext()) {
            Map.Entry<String, MyFileReader> entry = iterator.next();
            MyFileReader reader = entry.getValue();
            
            MyRecord record = reader.readNext();
            if (record != null) {
                output.collect(record);
                hasData = true;
            } else if (reader.isFinished()) {
                // Split is exhausted
                reader.close();
                iterator.remove();
            }
        }
        
        if (activeReaders.isEmpty()) {
            return InputStatus.END_OF_INPUT;
        }
        
        return hasData ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
    }
    
    @Override
    public List<MySourceSplit> snapshotState(long checkpointId) {
        // Return unprocessed splits for checkpointing
        List<MySourceSplit> splitsToSnapshot = new ArrayList<>(pendingSplits);
        
        // Add partially processed splits
        for (MyFileReader reader : activeReaders.values()) {
            if (!reader.isFinished()) {
                splitsToSnapshot.add(reader.getCurrentSplit());
            }
        }
        
        return splitsToSnapshot;
    }
    
    @Override
    public void addSplits(List<MySourceSplit> splits) {
        pendingSplits.addAll(splits);
    }
    
    @Override
    public void notifyNoMoreSplits() {
        // No more splits will be assigned
    }
    
    @Override
    public void close() throws Exception {
        for (MyFileReader reader : activeReaders.values()) {
            reader.close();
        }
        activeReaders.clear();
    }
}

Built-in Sources

Using pre-built source implementations.

import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class BuiltInSourceExamples {
    
    public static void numberSequenceExample() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Number sequence source
        NumberSequenceSource source = new NumberSequenceSource(1, 1000000);
        
        DataStream<Long> numbers = env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "number-sequence"
        );
        
        numbers.print();
    }
    
    // Custom bounded source
    public static DataStream<String> createBoundedFileSource(StreamExecutionEnvironment env) {
        CustomFileSource source = new CustomFileSource("/path/to/files");
        
        return env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "file-source"
        );
    }
    
    // Custom unbounded source with watermarks
    public static DataStream<Event> createUnboundedEventSource(StreamExecutionEnvironment env) {
        CustomEventSource source = new CustomEventSource("kafka-topic");
        
        WatermarkStrategy<Event> watermarkStrategy = 
            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
        
        return env.fromSource(
            source,
            watermarkStrategy,
            "event-source"
        );
    }
}

Sink Connector Framework

Basic Sink Interface

The foundation for all Flink data sinks.

import org.apache.flink.api.connector.sink2.*;

// Simple stateless sink
public class MyBasicSink implements Sink<MyRecord> {
    
    @Override
    public SinkWriter<MyRecord> createWriter(InitContext context) throws IOException {
        return new MyBasicSinkWriter(context);
    }
}

// Basic sink writer implementation
public class MyBasicSinkWriter implements SinkWriter<MyRecord> {
    private final InitContext context;
    private final DatabaseConnection connection;
    
    public MyBasicSinkWriter(InitContext context) throws IOException {
        this.context = context;
        this.connection = new DatabaseConnection();
    }
    
    @Override
    public void write(MyRecord element, Context context) throws IOException, InterruptedException {
        // Write record to external system
        connection.insert(element);
    }
    
    @Override
    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        // Flush any buffered data
        connection.flush();
    }
    
    @Override
    public void close() throws Exception {
        connection.close();
    }
}

Stateful Sink with Checkpointing

Handle state for exactly-once guarantees.

import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;

// Sink supporting writer state
public class MyStatefulSink implements Sink<MyRecord>, SupportsWriterState<MyRecord, MyWriterState> {
    
    @Override
    public StatefulSinkWriter<MyRecord, MyWriterState> createWriter(InitContext context) 
            throws IOException {
        return new MyStatefulSinkWriter(context);
    }
    
    @Override
    public StatefulSinkWriter<MyRecord, MyWriterState> restoreWriter(
            InitContext context, 
            Collection<MyWriterState> recoveredState) throws IOException {
        return new MyStatefulSinkWriter(context, recoveredState);
    }
    
    @Override
    public SimpleVersionedSerializer<MyWriterState> getWriterStateSerializer() {
        return new MyWriterStateSerializer();
    }
}

// Stateful sink writer
public class MyStatefulSinkWriter implements StatefulSinkWriter<MyRecord, MyWriterState> {
    private final List<MyRecord> pendingRecords;
    private final Map<String, Long> processedCounts;
    
    public MyStatefulSinkWriter(InitContext context) {
        this.pendingRecords = new ArrayList<>();
        this.processedCounts = new HashMap<>();
    }
    
    public MyStatefulSinkWriter(InitContext context, Collection<MyWriterState> recoveredState) {
        this.pendingRecords = new ArrayList<>();
        this.processedCounts = new HashMap<>();
        
        // Restore state
        for (MyWriterState state : recoveredState) {
            pendingRecords.addAll(state.getPendingRecords());
            processedCounts.putAll(state.getProcessedCounts());
        }
    }
    
    @Override
    public void write(MyRecord element, Context context) throws IOException, InterruptedException {
        pendingRecords.add(element);
        
        String key = element.getKey();
        processedCounts.merge(key, 1L, Long::sum);
        
        // Batch write when buffer is full
        if (pendingRecords.size() >= 1000) {
            flushPendingRecords();
        }
    }
    
    @Override
    public List<MyWriterState> snapshotState(long checkpointId) throws IOException {
        // Create state snapshot
        MyWriterState state = new MyWriterState(
            new ArrayList<>(pendingRecords),
            new HashMap<>(processedCounts),
            checkpointId
        );
        
        return Collections.singletonList(state);
    }
    
    @Override
    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        flushPendingRecords();
    }
    
    @Override
    public void close() throws Exception {
        flushPendingRecords();
    }
    
    private void flushPendingRecords() throws IOException {
        if (!pendingRecords.isEmpty()) {
            // Write records to external system
            for (MyRecord record : pendingRecords) {
                writeToExternalSystem(record);
            }
            pendingRecords.clear();
        }
    }
    
    private void writeToExternalSystem(MyRecord record) throws IOException {
        // Implementation specific to external system
    }
}

Two-Phase Commit Sink

Implement exactly-once semantics with two-phase commit.

import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Committer;

// Sink with two-phase commit
public class MyTransactionalSink implements 
        Sink<MyRecord>, 
        SupportsCommitter<MyCommittable> {
    
    @Override
    public CommittingSinkWriter<MyRecord, MyCommittable> createWriter(InitContext context) 
            throws IOException {
        return new MyCommittingSinkWriter(context);
    }
    
    @Override
    public Committer<MyCommittable> createCommitter() throws IOException {
        return new MyCommitter();
    }
    
    @Override
    public SimpleVersionedSerializer<MyCommittable> getCommittableSerializer() {
        return new MyCommittableSerializer();
    }
}

// Committing sink writer (first phase)
public class MyCommittingSinkWriter implements CommittingSinkWriter<MyRecord, MyCommittable> {
    private final String transactionId;
    private final List<MyRecord> currentBatch;
    private final DatabaseTransaction transaction;
    
    public MyCommittingSinkWriter(InitContext context) throws IOException {
        this.transactionId = generateTransactionId(context);
        this.currentBatch = new ArrayList<>();
        this.transaction = new DatabaseTransaction(transactionId);
    }
    
    @Override
    public void write(MyRecord element, Context context) throws IOException, InterruptedException {
        currentBatch.add(element);
        transaction.prepare(element);
    }
    
    @Override
    public Collection<MyCommittable> prepareCommit() throws IOException, InterruptedException {
        if (currentBatch.isEmpty()) {
            return Collections.emptyList();
        }
        
        // Prepare transaction for commit
        transaction.prepareForCommit();
        
        MyCommittable committable = new MyCommittable(
            transactionId,
            new ArrayList<>(currentBatch),
            System.currentTimeMillis()
        );
        
        currentBatch.clear();
        return Collections.singletonList(committable);
    }
    
    @Override
    public void flush(boolean endOfInput) throws IOException, InterruptedException {
        // Ensure all data is prepared
        prepareCommit();
    }
    
    @Override
    public void close() throws Exception {
        transaction.close();
    }
    
    private String generateTransactionId(InitContext context) {
        return String.format("txn_%d_%d_%d", 
            context.getSubtaskId(), 
            context.getAttemptNumber(),
            System.currentTimeMillis());
    }
}

// Committer (second phase)
public class MyCommitter implements Committer<MyCommittable> {
    
    @Override
    public void commit(Collection<CommitRequest<MyCommittable>> requests) 
            throws IOException, InterruptedException {
        
        for (CommitRequest<MyCommittable> request : requests) {
            MyCommittable committable = request.getCommittable();
            
            try {
                // Commit the transaction
                DatabaseTransaction transaction = 
                    DatabaseTransaction.resume(committable.getTransactionId());
                transaction.commit();
                
            } catch (Exception e) {
                // Handle commit failure
                throw new IOException("Failed to commit transaction: " + 
                    committable.getTransactionId(), e);
            }
        }
    }
    
    @Override
    public void close() throws Exception {
        // Cleanup resources
    }
}

Advanced Connector Features

Dynamic Parallelism Inference

import org.apache.flink.api.connector.source.DynamicParallelismInference;

public class DynamicSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState>,
                                      DynamicParallelismInference {
    
    @Override
    public int inferParallelism(SourceReaderFactory readerFactory) {
        // Infer optimal parallelism based on source characteristics
        int availableFiles = discoverAvailableFiles();
        int maxParallelism = getMaxRecommendedParallelism();
        
        return Math.min(availableFiles, maxParallelism);
    }
    
    private int discoverAvailableFiles() {
        // Count available data partitions/files
        return 10; // Example
    }
    
    private int getMaxRecommendedParallelism() {
        // Based on external system limits or performance characteristics
        return 50;
    }
}

Rate Limiting

import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;

public class RateLimitedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
    private final SourceReaderContext context;
    private final RateLimiterStrategy rateLimiter;
    
    public RateLimitedSourceReader(SourceReaderContext context) {
        this.context = context;
        this.rateLimiter = RateLimiterStrategy.perSecond(1000); // 1000 records/sec
    }
    
    @Override
    public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
        // Check rate limit before processing
        if (!rateLimiter.tryAcquire(1)) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        
        // Regular record processing
        MyRecord record = readNextRecord();
        if (record != null) {
            output.collect(record);
            return InputStatus.MORE_AVAILABLE;
        }
        
        return InputStatus.NOTHING_AVAILABLE;
    }
    
    private MyRecord readNextRecord() {
        // Read from external source
        return null; // Implementation specific
    }
}

Connector Utilities and Best Practices

Error Handling and Retry Logic

public class RobustSinkWriter implements SinkWriter<MyRecord> {
    private final RetryPolicy retryPolicy;
    private final DeadLetterQueue<MyRecord> dlq;
    
    public RobustSinkWriter(InitContext context) {
        this.retryPolicy = RetryPolicy.builder()
            .maxAttempts(3)
            .backoff(Duration.ofSeconds(1), Duration.ofSeconds(30))
            .build();
        this.dlq = new DeadLetterQueue<>();
    }
    
    @Override
    public void write(MyRecord element, Context context) throws IOException, InterruptedException {
        retryPolicy.execute(() -> {
            try {
                writeToExternalSystem(element);
            } catch (TransientException e) {
                throw new RetryableException("Transient error, will retry", e);
            } catch (PermanentException e) {
                // Send to dead letter queue
                dlq.send(element, e);
                return; // Don't retry permanent failures
            }
        });
    }
    
    private void writeToExternalSystem(MyRecord record) throws Exception {
        // Implementation specific
    }
}

Monitoring and Metrics

public class InstrumentedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
    private final Counter recordsRead;
    private final Counter errors;
    private final Histogram readLatency;
    private final Gauge<Integer> pendingSplits;
    
    public InstrumentedSourceReader(SourceReaderContext context) {
        MetricGroup metricGroup = context.metricGroup();
        
        this.recordsRead = metricGroup.counter("records_read");
        this.errors = metricGroup.counter("errors");
        this.readLatency = metricGroup.histogram("read_latency");
        this.pendingSplits = metricGroup.gauge("pending_splits", 
            () -> this.pendingSplitQueue.size());
    }
    
    @Override
    public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
        long startTime = System.nanoTime();
        
        try {
            MyRecord record = readRecord();
            if (record != null) {
                output.collect(record);
                recordsRead.inc();
                readLatency.update(System.nanoTime() - startTime);
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.NOTHING_AVAILABLE;
            
        } catch (Exception e) {
            errors.inc();
            throw e;
        }
    }
}

Connector Configuration

public class ConfigurableSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
    private final MySourceConfig config;
    
    public ConfigurableSource(MySourceConfig config) {
        this.config = config;
    }
    
    public static class MySourceConfig implements Serializable {
        private final String connectionUrl;
        private final int batchSize;
        private final Duration pollInterval;
        private final boolean enableMetrics;
        
        private MySourceConfig(Builder builder) {
            this.connectionUrl = builder.connectionUrl;
            this.batchSize = builder.batchSize;
            this.pollInterval = builder.pollInterval;
            this.enableMetrics = builder.enableMetrics;
        }
        
        public static Builder builder() {
            return new Builder();
        }
        
        public static class Builder {
            private String connectionUrl;
            private int batchSize = 1000;
            private Duration pollInterval = Duration.ofSeconds(1);
            private boolean enableMetrics = true;
            
            public Builder connectionUrl(String url) {
                this.connectionUrl = url;
                return this;
            }
            
            public Builder batchSize(int size) {
                this.batchSize = size;
                return this;
            }
            
            public Builder pollInterval(Duration interval) {
                this.pollInterval = interval;
                return this;
            }
            
            public Builder enableMetrics(boolean enable) {
                this.enableMetrics = enable;
                return this;
            }
            
            public MySourceConfig build() {
                Preconditions.checkNotNull(connectionUrl, "Connection URL is required");
                return new MySourceConfig(this);
            }
        }
        
        // Getters
        public String getConnectionUrl() { return connectionUrl; }
        public int getBatchSize() { return batchSize; }
        public Duration getPollInterval() { return pollInterval; }
        public boolean isMetricsEnabled() { return enableMetrics; }
    }
}

// Usage
MySourceConfig config = MySourceConfig.builder()
    .connectionUrl("jdbc:postgresql://localhost:5432/mydb")
    .batchSize(500)
    .pollInterval(Duration.ofSeconds(2))
    .enableMetrics(true)
    .build();

MySource source = new MySource(config);

Apache Flink's connector framework provides a powerful foundation for building efficient, fault-tolerant data sources and sinks. By understanding these APIs and following best practices, you can create connectors that integrate seamlessly with Flink's runtime and provide reliable data processing capabilities.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json