Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink Core provides comprehensive connector APIs for building data sources and sinks that integrate with external systems. These APIs enable developers to create efficient, fault-tolerant connectors with features like checkpointing, parallelism, and exactly-once semantics.
The foundation for all Flink data sources.
import org.apache.flink.api.connector.source.*;
import org.apache.flink.core.io.SimpleVersionedSerializer;
// Basic source implementation
public class CustomSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED; // or BOUNDED
}
@Override
public SourceReader<MyRecord, MySourceSplit> createReader(SourceReaderContext readerContext) {
return new MySourceReader(readerContext);
}
@Override
public SplitEnumerator<MySourceSplit, MyEnumeratorState> createEnumerator(
SplitEnumeratorContext<MySourceSplit> enumContext) {
return new MySplitEnumerator(enumContext);
}
@Override
public SplitEnumerator<MySourceSplit, MyEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<MySourceSplit> enumContext,
MyEnumeratorState checkpoint) {
return new MySplitEnumerator(enumContext, checkpoint);
}
@Override
public SimpleVersionedSerializer<MySourceSplit> getSplitSerializer() {
return new MySourceSplitSerializer();
}
@Override
public SimpleVersionedSerializer<MyEnumeratorState> getEnumeratorCheckpointSerializer() {
return new MyEnumeratorStateSerializer();
}
}Define how data is partitioned and processed.
import org.apache.flink.api.connector.source.SourceSplit;
// Custom source split
public class MySourceSplit implements SourceSplit {
private final String splitId;
private final String filepath;
private final long startOffset;
private final long endOffset;
public MySourceSplit(String splitId, String filepath, long startOffset, long endOffset) {
this.splitId = splitId;
this.filepath = filepath;
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
public String splitId() {
return splitId;
}
// Getters
public String getFilepath() { return filepath; }
public long getStartOffset() { return startOffset; }
public long getEndOffset() { return endOffset; }
@Override
public String toString() {
return String.format("MySourceSplit{id='%s', file='%s', range=[%d, %d]}",
splitId, filepath, startOffset, endOffset);
}
}
// Split serializer for checkpointing
public class MySourceSplitSerializer implements SimpleVersionedSerializer<MySourceSplit> {
@Override
public int getVersion() {
return 1;
}
@Override
public byte[] serialize(MySourceSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(split.splitId());
out.writeUTF(split.getFilepath());
out.writeLong(split.getStartOffset());
out.writeLong(split.getEndOffset());
return baos.toByteArray();
}
}
@Override
public MySourceSplit deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String splitId = in.readUTF();
String filepath = in.readUTF();
long startOffset = in.readLong();
long endOffset = in.readLong();
return new MySourceSplit(splitId, filepath, startOffset, endOffset);
}
}
}Discovers and assigns splits to source readers.
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
public class MySplitEnumerator implements SplitEnumerator<MySourceSplit, MyEnumeratorState> {
private final SplitEnumeratorContext<MySourceSplit> context;
private final Set<String> remainingFiles;
private final Map<Integer, Set<String>> readerAssignments;
public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context) {
this.context = context;
this.remainingFiles = discoverFiles();
this.readerAssignments = new HashMap<>();
}
public MySplitEnumerator(SplitEnumeratorContext<MySourceSplit> context,
MyEnumeratorState restoredState) {
this.context = context;
this.remainingFiles = restoredState.getRemainingFiles();
this.readerAssignments = restoredState.getReaderAssignments();
}
@Override
public void start() {
// Initialize and assign initial splits
assignSplitsToReaders();
}
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
// Assign more splits when requested
if (!remainingFiles.isEmpty()) {
String nextFile = remainingFiles.iterator().next();
remainingFiles.remove(nextFile);
MySourceSplit split = createSplitFromFile(nextFile, subtaskId);
context.assignSplit(split, subtaskId);
// Track assignment
readerAssignments.computeIfAbsent(subtaskId, k -> new HashSet<>()).add(nextFile);
} else {
// No more splits available
context.signalNoMoreSplits(subtaskId);
}
}
@Override
public void addSplitsBack(List<MySourceSplit> splits, int subtaskId) {
// Handle split reassignment on failure
for (MySourceSplit split : splits) {
remainingFiles.add(split.getFilepath());
readerAssignments.get(subtaskId).remove(split.getFilepath());
}
}
@Override
public void addReader(int subtaskId) {
// New reader registered
readerAssignments.put(subtaskId, new HashSet<>());
assignSplitsToReader(subtaskId);
}
@Override
public MyEnumeratorState snapshotState(long checkpointId) throws Exception {
return new MyEnumeratorState(remainingFiles, readerAssignments);
}
@Override
public void close() throws IOException {
// Cleanup resources
}
private void assignSplitsToReaders() {
for (int readerId : context.registeredReaders().keySet()) {
assignSplitsToReader(readerId);
}
}
private void assignSplitsToReader(int readerId) {
// Assign initial splits to reader
if (!remainingFiles.isEmpty()) {
String file = remainingFiles.iterator().next();
remainingFiles.remove(file);
MySourceSplit split = createSplitFromFile(file, readerId);
context.assignSplit(split, readerId);
readerAssignments.get(readerId).add(file);
}
}
private MySourceSplit createSplitFromFile(String filepath, int readerId) {
String splitId = String.format("%s-%d", filepath, readerId);
// Calculate file offsets based on parallelism
return new MySourceSplit(splitId, filepath, 0, getFileSize(filepath));
}
private Set<String> discoverFiles() {
// Discover files to process
return new HashSet<>(Arrays.asList("file1.txt", "file2.txt", "file3.txt"));
}
private long getFileSize(String filepath) {
// Get file size for split calculation
return 1024 * 1024; // 1MB example
}
}Reads records from assigned splits.
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
public class MySourceReader implements SourceReader<MyRecord, MySourceSplit> {
private final SourceReaderContext context;
private final Queue<MySourceSplit> pendingSplits;
private final Map<String, MyFileReader> activeReaders;
public MySourceReader(SourceReaderContext context) {
this.context = context;
this.pendingSplits = new LinkedList<>();
this.activeReaders = new HashMap<>();
}
@Override
public void start() {
// Initialize reader
}
@Override
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
// Check for available data
if (activeReaders.isEmpty() && pendingSplits.isEmpty()) {
// Request more splits if needed
context.sendSplitRequest();
return InputStatus.NOTHING_AVAILABLE;
}
// Process pending splits
while (!pendingSplits.isEmpty()) {
MySourceSplit split = pendingSplits.poll();
MyFileReader fileReader = new MyFileReader(split);
activeReaders.put(split.splitId(), fileReader);
}
// Read records from active readers
boolean hasData = false;
Iterator<Map.Entry<String, MyFileReader>> iterator = activeReaders.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MyFileReader> entry = iterator.next();
MyFileReader reader = entry.getValue();
MyRecord record = reader.readNext();
if (record != null) {
output.collect(record);
hasData = true;
} else if (reader.isFinished()) {
// Split is exhausted
reader.close();
iterator.remove();
}
}
if (activeReaders.isEmpty()) {
return InputStatus.END_OF_INPUT;
}
return hasData ? InputStatus.MORE_AVAILABLE : InputStatus.NOTHING_AVAILABLE;
}
@Override
public List<MySourceSplit> snapshotState(long checkpointId) {
// Return unprocessed splits for checkpointing
List<MySourceSplit> splitsToSnapshot = new ArrayList<>(pendingSplits);
// Add partially processed splits
for (MyFileReader reader : activeReaders.values()) {
if (!reader.isFinished()) {
splitsToSnapshot.add(reader.getCurrentSplit());
}
}
return splitsToSnapshot;
}
@Override
public void addSplits(List<MySourceSplit> splits) {
pendingSplits.addAll(splits);
}
@Override
public void notifyNoMoreSplits() {
// No more splits will be assigned
}
@Override
public void close() throws Exception {
for (MyFileReader reader : activeReaders.values()) {
reader.close();
}
activeReaders.clear();
}
}Using pre-built source implementations.
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class BuiltInSourceExamples {
public static void numberSequenceExample() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Number sequence source
NumberSequenceSource source = new NumberSequenceSource(1, 1000000);
DataStream<Long> numbers = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"number-sequence"
);
numbers.print();
}
// Custom bounded source
public static DataStream<String> createBoundedFileSource(StreamExecutionEnvironment env) {
CustomFileSource source = new CustomFileSource("/path/to/files");
return env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"file-source"
);
}
// Custom unbounded source with watermarks
public static DataStream<Event> createUnboundedEventSource(StreamExecutionEnvironment env) {
CustomEventSource source = new CustomEventSource("kafka-topic");
WatermarkStrategy<Event> watermarkStrategy =
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
return env.fromSource(
source,
watermarkStrategy,
"event-source"
);
}
}The foundation for all Flink data sinks.
import org.apache.flink.api.connector.sink2.*;
// Simple stateless sink
public class MyBasicSink implements Sink<MyRecord> {
@Override
public SinkWriter<MyRecord> createWriter(InitContext context) throws IOException {
return new MyBasicSinkWriter(context);
}
}
// Basic sink writer implementation
public class MyBasicSinkWriter implements SinkWriter<MyRecord> {
private final InitContext context;
private final DatabaseConnection connection;
public MyBasicSinkWriter(InitContext context) throws IOException {
this.context = context;
this.connection = new DatabaseConnection();
}
@Override
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
// Write record to external system
connection.insert(element);
}
@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// Flush any buffered data
connection.flush();
}
@Override
public void close() throws Exception {
connection.close();
}
}Handle state for exactly-once guarantees.
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.SupportsWriterState;
// Sink supporting writer state
public class MyStatefulSink implements Sink<MyRecord>, SupportsWriterState<MyRecord, MyWriterState> {
@Override
public StatefulSinkWriter<MyRecord, MyWriterState> createWriter(InitContext context)
throws IOException {
return new MyStatefulSinkWriter(context);
}
@Override
public StatefulSinkWriter<MyRecord, MyWriterState> restoreWriter(
InitContext context,
Collection<MyWriterState> recoveredState) throws IOException {
return new MyStatefulSinkWriter(context, recoveredState);
}
@Override
public SimpleVersionedSerializer<MyWriterState> getWriterStateSerializer() {
return new MyWriterStateSerializer();
}
}
// Stateful sink writer
public class MyStatefulSinkWriter implements StatefulSinkWriter<MyRecord, MyWriterState> {
private final List<MyRecord> pendingRecords;
private final Map<String, Long> processedCounts;
public MyStatefulSinkWriter(InitContext context) {
this.pendingRecords = new ArrayList<>();
this.processedCounts = new HashMap<>();
}
public MyStatefulSinkWriter(InitContext context, Collection<MyWriterState> recoveredState) {
this.pendingRecords = new ArrayList<>();
this.processedCounts = new HashMap<>();
// Restore state
for (MyWriterState state : recoveredState) {
pendingRecords.addAll(state.getPendingRecords());
processedCounts.putAll(state.getProcessedCounts());
}
}
@Override
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
pendingRecords.add(element);
String key = element.getKey();
processedCounts.merge(key, 1L, Long::sum);
// Batch write when buffer is full
if (pendingRecords.size() >= 1000) {
flushPendingRecords();
}
}
@Override
public List<MyWriterState> snapshotState(long checkpointId) throws IOException {
// Create state snapshot
MyWriterState state = new MyWriterState(
new ArrayList<>(pendingRecords),
new HashMap<>(processedCounts),
checkpointId
);
return Collections.singletonList(state);
}
@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
flushPendingRecords();
}
@Override
public void close() throws Exception {
flushPendingRecords();
}
private void flushPendingRecords() throws IOException {
if (!pendingRecords.isEmpty()) {
// Write records to external system
for (MyRecord record : pendingRecords) {
writeToExternalSystem(record);
}
pendingRecords.clear();
}
}
private void writeToExternalSystem(MyRecord record) throws IOException {
// Implementation specific to external system
}
}Implement exactly-once semantics with two-phase commit.
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
// Sink with two-phase commit
public class MyTransactionalSink implements
Sink<MyRecord>,
SupportsCommitter<MyCommittable> {
@Override
public CommittingSinkWriter<MyRecord, MyCommittable> createWriter(InitContext context)
throws IOException {
return new MyCommittingSinkWriter(context);
}
@Override
public Committer<MyCommittable> createCommitter() throws IOException {
return new MyCommitter();
}
@Override
public SimpleVersionedSerializer<MyCommittable> getCommittableSerializer() {
return new MyCommittableSerializer();
}
}
// Committing sink writer (first phase)
public class MyCommittingSinkWriter implements CommittingSinkWriter<MyRecord, MyCommittable> {
private final String transactionId;
private final List<MyRecord> currentBatch;
private final DatabaseTransaction transaction;
public MyCommittingSinkWriter(InitContext context) throws IOException {
this.transactionId = generateTransactionId(context);
this.currentBatch = new ArrayList<>();
this.transaction = new DatabaseTransaction(transactionId);
}
@Override
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
currentBatch.add(element);
transaction.prepare(element);
}
@Override
public Collection<MyCommittable> prepareCommit() throws IOException, InterruptedException {
if (currentBatch.isEmpty()) {
return Collections.emptyList();
}
// Prepare transaction for commit
transaction.prepareForCommit();
MyCommittable committable = new MyCommittable(
transactionId,
new ArrayList<>(currentBatch),
System.currentTimeMillis()
);
currentBatch.clear();
return Collections.singletonList(committable);
}
@Override
public void flush(boolean endOfInput) throws IOException, InterruptedException {
// Ensure all data is prepared
prepareCommit();
}
@Override
public void close() throws Exception {
transaction.close();
}
private String generateTransactionId(InitContext context) {
return String.format("txn_%d_%d_%d",
context.getSubtaskId(),
context.getAttemptNumber(),
System.currentTimeMillis());
}
}
// Committer (second phase)
public class MyCommitter implements Committer<MyCommittable> {
@Override
public void commit(Collection<CommitRequest<MyCommittable>> requests)
throws IOException, InterruptedException {
for (CommitRequest<MyCommittable> request : requests) {
MyCommittable committable = request.getCommittable();
try {
// Commit the transaction
DatabaseTransaction transaction =
DatabaseTransaction.resume(committable.getTransactionId());
transaction.commit();
} catch (Exception e) {
// Handle commit failure
throw new IOException("Failed to commit transaction: " +
committable.getTransactionId(), e);
}
}
}
@Override
public void close() throws Exception {
// Cleanup resources
}
}import org.apache.flink.api.connector.source.DynamicParallelismInference;
public class DynamicSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState>,
DynamicParallelismInference {
@Override
public int inferParallelism(SourceReaderFactory readerFactory) {
// Infer optimal parallelism based on source characteristics
int availableFiles = discoverAvailableFiles();
int maxParallelism = getMaxRecommendedParallelism();
return Math.min(availableFiles, maxParallelism);
}
private int discoverAvailableFiles() {
// Count available data partitions/files
return 10; // Example
}
private int getMaxRecommendedParallelism() {
// Based on external system limits or performance characteristics
return 50;
}
}import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
public class RateLimitedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
private final SourceReaderContext context;
private final RateLimiterStrategy rateLimiter;
public RateLimitedSourceReader(SourceReaderContext context) {
this.context = context;
this.rateLimiter = RateLimiterStrategy.perSecond(1000); // 1000 records/sec
}
@Override
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
// Check rate limit before processing
if (!rateLimiter.tryAcquire(1)) {
return InputStatus.NOTHING_AVAILABLE;
}
// Regular record processing
MyRecord record = readNextRecord();
if (record != null) {
output.collect(record);
return InputStatus.MORE_AVAILABLE;
}
return InputStatus.NOTHING_AVAILABLE;
}
private MyRecord readNextRecord() {
// Read from external source
return null; // Implementation specific
}
}public class RobustSinkWriter implements SinkWriter<MyRecord> {
private final RetryPolicy retryPolicy;
private final DeadLetterQueue<MyRecord> dlq;
public RobustSinkWriter(InitContext context) {
this.retryPolicy = RetryPolicy.builder()
.maxAttempts(3)
.backoff(Duration.ofSeconds(1), Duration.ofSeconds(30))
.build();
this.dlq = new DeadLetterQueue<>();
}
@Override
public void write(MyRecord element, Context context) throws IOException, InterruptedException {
retryPolicy.execute(() -> {
try {
writeToExternalSystem(element);
} catch (TransientException e) {
throw new RetryableException("Transient error, will retry", e);
} catch (PermanentException e) {
// Send to dead letter queue
dlq.send(element, e);
return; // Don't retry permanent failures
}
});
}
private void writeToExternalSystem(MyRecord record) throws Exception {
// Implementation specific
}
}public class InstrumentedSourceReader implements SourceReader<MyRecord, MySourceSplit> {
private final Counter recordsRead;
private final Counter errors;
private final Histogram readLatency;
private final Gauge<Integer> pendingSplits;
public InstrumentedSourceReader(SourceReaderContext context) {
MetricGroup metricGroup = context.metricGroup();
this.recordsRead = metricGroup.counter("records_read");
this.errors = metricGroup.counter("errors");
this.readLatency = metricGroup.histogram("read_latency");
this.pendingSplits = metricGroup.gauge("pending_splits",
() -> this.pendingSplitQueue.size());
}
@Override
public InputStatus pollNext(ReaderOutput<MyRecord> output) throws Exception {
long startTime = System.nanoTime();
try {
MyRecord record = readRecord();
if (record != null) {
output.collect(record);
recordsRead.inc();
readLatency.update(System.nanoTime() - startTime);
return InputStatus.MORE_AVAILABLE;
}
return InputStatus.NOTHING_AVAILABLE;
} catch (Exception e) {
errors.inc();
throw e;
}
}
}public class ConfigurableSource implements Source<MyRecord, MySourceSplit, MyEnumeratorState> {
private final MySourceConfig config;
public ConfigurableSource(MySourceConfig config) {
this.config = config;
}
public static class MySourceConfig implements Serializable {
private final String connectionUrl;
private final int batchSize;
private final Duration pollInterval;
private final boolean enableMetrics;
private MySourceConfig(Builder builder) {
this.connectionUrl = builder.connectionUrl;
this.batchSize = builder.batchSize;
this.pollInterval = builder.pollInterval;
this.enableMetrics = builder.enableMetrics;
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private String connectionUrl;
private int batchSize = 1000;
private Duration pollInterval = Duration.ofSeconds(1);
private boolean enableMetrics = true;
public Builder connectionUrl(String url) {
this.connectionUrl = url;
return this;
}
public Builder batchSize(int size) {
this.batchSize = size;
return this;
}
public Builder pollInterval(Duration interval) {
this.pollInterval = interval;
return this;
}
public Builder enableMetrics(boolean enable) {
this.enableMetrics = enable;
return this;
}
public MySourceConfig build() {
Preconditions.checkNotNull(connectionUrl, "Connection URL is required");
return new MySourceConfig(this);
}
}
// Getters
public String getConnectionUrl() { return connectionUrl; }
public int getBatchSize() { return batchSize; }
public Duration getPollInterval() { return pollInterval; }
public boolean isMetricsEnabled() { return enableMetrics; }
}
}
// Usage
MySourceConfig config = MySourceConfig.builder()
.connectionUrl("jdbc:postgresql://localhost:5432/mydb")
.batchSize(500)
.pollInterval(Duration.ofSeconds(2))
.enableMetrics(true)
.build();
MySource source = new MySource(config);Apache Flink's connector framework provides a powerful foundation for building efficient, fault-tolerant data sources and sinks. By understanding these APIs and following best practices, you can create connectors that integrate seamlessly with Flink's runtime and provide reliable data processing capabilities.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core