Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The Data Exchange and Networking system provides the fundamental infrastructure for inter-task communication in Flink clusters. This system defines data exchange patterns, result partition types, and networking mechanisms that enable efficient data transfer between operators across the distributed execution environment.
Enumeration defining different data exchange patterns that control how data flows between operators.
public enum DataExchangeMode {
PIPELINED, // Streamed data exchange with back-pressure
BATCH, // Decoupled data exchange with full result materialization
PIPELINE_WITH_BATCH_FALLBACK; // Pipelined with batch fallback for recovery
public static DataExchangeMode getForForwardExchange(ExecutionMode mode);
public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);
public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);
public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy, boolean breakPipeline);
}Defines how data is distributed between upstream and downstream operators.
public enum DistributionPattern {
POINTWISE, // Each upstream subtask sends data to one downstream subtask
ALL_TO_ALL; // Each upstream subtask sends data to all downstream subtasks
public boolean isPointwise();
public boolean isAllToAll();
}Enumeration of result partition types that determine data exchange characteristics and buffering behavior.
public enum ResultPartitionType {
BLOCKING(true, false, false), // Data is fully produced before consumption
PIPELINED(false, true, false), // Data is consumed as it's produced
PIPELINED_BOUNDED(false, true, true); // Pipelined with bounded buffers
private final boolean isBlocking;
private final boolean isPipelined;
private final boolean isBounded;
public boolean isBlocking();
public boolean isPipelined();
public boolean isBounded();
public boolean hasBackPressure();
public boolean requiresFiniteStreams();
}Represents a partition of results produced by an operator that can be consumed by downstream tasks.
public class ResultPartition implements AutoCloseable {
public ResultPartition(
String owningTaskName,
TaskActions taskActions,
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
int numberOfQueuedBuffers,
ResultPartitionManager partitionManager,
@Nullable ResultPartitionMetrics metrics
);
public void setup() throws IOException;
public void finish() throws IOException;
public void release();
public void release(Throwable cause);
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
public BufferBuilder tryGetBufferBuilder() throws IOException;
public void flushAll();
public void flush(int targetSubpartition);
public ResultPartitionID getPartitionId();
public ResultPartitionType getResultType();
public int getNumberOfSubpartitions();
public boolean isReleased();
public Throwable getFailureCause();
@Override
public void close();
}Represents a data set produced by one job vertex and consumed by another, defining the connection in the job graph.
public class IntermediateDataSet implements Serializable {
public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer);
public IntermediateDataSetID getId();
public JobVertex getProducer();
public List<JobEdge> getConsumers();
public ResultPartitionType getResultType();
public void addConsumer(JobEdge edge);
public int getConsumerParallelism();
public DistributionPattern getDistributionPattern();
}Interface for writing data to result partitions, providing the output mechanism for operators.
public interface ResultPartitionWriter extends AutoCloseable {
ResultPartition getPartition();
BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
BufferBuilder tryGetBufferBuilder() throws IOException;
void flushAll();
void flushAllSubpartitions(boolean finishProducers);
void fail(Throwable throwable);
void finish() throws IOException;
@Override
void close();
}Abstract base class for input gates that manage reading data from multiple input channels.
public abstract class InputGate implements AutoCloseable {
public abstract int getNumberOfInputChannels();
public abstract boolean isFinished();
public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
public abstract void registerListener(InputGateListener listener);
public abstract int getPageSize();
@Override
public abstract void close() throws Exception;
}Abstract base class representing an input channel that reads data from a specific result partition.
public abstract class InputChannel {
protected final InputGate inputGate;
protected final int channelIndex;
protected final ResultPartitionID partitionId;
protected final int initialBackoff;
protected final int maxBackoff;
public InputChannel(
InputGate inputGate,
int channelIndex,
ResultPartitionID partitionId,
int initialBackoff,
int maxBackoff
);
public int getChannelIndex();
public ResultPartitionID getPartitionId();
public abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException;
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
public abstract boolean isReleased();
public abstract void releaseAllResources() throws IOException;
public abstract int getBuffersInUseCount();
}Interface representing a buffer containing data or events in the network stack.
public interface Buffer extends AutoCloseable {
boolean isBuffer();
boolean isEvent();
MemorySegment getMemorySegment();
int getMemorySegmentOffset();
BufferRecycler getRecycler();
void recycleBuffer();
boolean isRecycled();
Buffer retainBuffer();
Buffer readOnlySlice();
Buffer readOnlySlice(int index, int length);
int getMaxCapacity();
int getSize();
void setSize(int writerIndex);
int getReaderIndex();
void setReaderIndex(int readerIndex);
ByteBuffer getNioBufferReadable();
ByteBuffer getNioBuffer(int index, int length);
@Override
void close();
}Interface for building buffers by appending data incrementally.
public interface BufferBuilder extends AutoCloseable {
boolean append(ByteBuffer source) throws IOException;
boolean appendAndCommit(ByteBuffer source) throws IOException;
BufferConsumer createBufferConsumer();
boolean isFull();
boolean isFinished();
int getWrittenBytes();
int getMaxCapacity();
void finish();
@Override
void close();
}Interface for managing pools of network buffers to control memory usage.
public interface BufferPool extends AutoCloseable {
Buffer requestBuffer() throws IOException;
Buffer requestBuffer(boolean isBlocking) throws IOException, InterruptedException;
BufferBuilder requestBufferBuilder() throws IOException;
BufferBuilder requestBufferBuilder(boolean isBlocking) throws IOException, InterruptedException;
void recycle(MemorySegment memorySegment);
boolean addBufferListener(BufferListener listener);
boolean isDestroyed();
int getNumberOfRequestedMemorySegments();
int getNumberOfAvailableMemorySegments();
int getNumBuffers();
int getMaxNumberOfMemorySegments();
void setNumBuffers(int numBuffers) throws IOException;
@Override
void close();
}Base class for events that can be sent between tasks through the network stack.
public abstract class TaskEvent implements Serializable {
public static final int MAX_SIZE = 1024;
// Subclasses implement specific event types
}Container that holds either a data buffer or a task event from the network stack.
public class BufferOrEvent {
public BufferOrEvent(Buffer buffer, int channelIndex);
public BufferOrEvent(AbstractEvent event, int channelIndex);
public boolean isBuffer();
public boolean isEvent();
public Buffer getBuffer();
public AbstractEvent getEvent();
public int getChannelIndex();
public void recycleBuffer();
public Optional<Buffer> getOptionalBuffer();
public Optional<AbstractEvent> getOptionalEvent();
}Interface for listening to input gate events and buffer availability.
public interface InputGateListener {
void notifyInputGateNonEmpty(InputGate inputGate);
}Interface for selecting output channels based on record content, implementing different partitioning strategies.
public interface ChannelSelector<T> {
int[] selectChannels(T record, int numberOfOutputChannels);
boolean isBroadcast();
}Manages the emission of records to downstream tasks with appropriate partitioning.
public class OutputEmitter<T> {
public OutputEmitter(
ShipStrategyType strategy,
ChannelSelector<T> channelSelector
);
public void emit(T record) throws IOException, InterruptedException;
public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException;
public void flush() throws IOException;
public void close();
public void clearBuffers();
public ShipStrategyType getShipStrategy();
}Enumeration of data shipping strategies that determine how records are distributed to downstream tasks.
public enum ShipStrategyType {
FORWARD, // Direct forwarding to single downstream task
BROADCAST, // Send to all downstream tasks
PARTITION_HASH, // Hash-based partitioning
PARTITION_RANGE,// Range-based partitioning
PARTITION_RANDOM,// Random distribution
PARTITION_FORCED_REBALANCE, // Forced rebalancing
PARTITION_CUSTOM; // Custom partitioning logic
public boolean isNetworkStrategy();
public boolean isForward();
public boolean isBroadcast();
public boolean isPartitioned();
}import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.*;
// Create job vertices
JobVertex sourceVertex = new JobVertex("Source");
JobVertex mapVertex = new JobVertex("Map");
JobVertex sinkVertex = new JobVertex("Sink");
// Configure different result partition types
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
IntermediateDataSet mapOutput = mapVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);
// Create edges with different distribution patterns
JobEdge sourceToMap = new JobEdge(sourceOutput, mapVertex, DistributionPattern.ALL_TO_ALL);
sourceToMap.setShipStrategy(ShipStrategyType.PARTITION_HASH);
JobEdge mapToSink = new JobEdge(mapOutput, sinkVertex, DistributionPattern.FORWARD);
mapToSink.setShipStrategy(ShipStrategyType.FORWARD);
// Add edges to job graph
JobGraph jobGraph = new JobGraph("Data Exchange Example");
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);
jobGraph.addVertex(sinkVertex);
jobGraph.addEdge(sourceToMap);
jobGraph.addEdge(mapToSink);
// Configure global data exchange mode
Configuration jobConfig = new Configuration();
jobConfig.setString("execution.batch-shuffle-mode", DataExchangeMode.ALL_EDGES_BLOCKING.toString());
jobGraph.setJobConfiguration(jobConfig);import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
public class CustomResultPartitionWriter implements ResultPartitionWriter {
private final ResultPartition partition;
private final BufferPool bufferPool;
private volatile boolean finished = false;
public CustomResultPartitionWriter(ResultPartition partition, BufferPool bufferPool) {
this.partition = partition;
this.bufferPool = bufferPool;
}
@Override
public ResultPartition getPartition() {
return partition;
}
@Override
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
if (finished) {
throw new IllegalStateException("Writer has been finished");
}
// Request buffer from pool
BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(true);
if (bufferBuilder == null) {
throw new IOException("Failed to get buffer from pool");
}
return bufferBuilder;
}
@Override
public BufferBuilder tryGetBufferBuilder() throws IOException {
if (finished) {
return null;
}
return bufferPool.requestBufferBuilder(false);
}
@Override
public void flushAll() {
partition.flushAll();
}
@Override
public void flushAllSubpartitions(boolean finishProducers) {
partition.flushAll();
if (finishProducers) {
try {
finish();
} catch (IOException e) {
throw new RuntimeException("Failed to finish partition", e);
}
}
}
@Override
public void fail(Throwable throwable) {
finished = true;
partition.fail(throwable);
}
@Override
public void finish() throws IOException {
if (!finished) {
finished = true;
partition.finish();
}
}
@Override
public void close() {
try {
finish();
} catch (IOException e) {
// Log error but don't throw in close()
System.err.println("Error finishing partition during close: " + e.getMessage());
}
}
public boolean isFinished() {
return finished;
}
}import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
public class CustomInputGate extends InputGate {
private final InputChannel[] inputChannels;
private final Set<InputChannel> channelsWithData = new LinkedHashSet<>();
private final Object lock = new Object();
private volatile boolean finished = false;
public CustomInputGate(String owningTaskName, JobID jobID,
IntermediateDataSetID consumedResultId,
ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
InputChannel[] inputChannels) {
super(owningTaskName, jobID, consumedResultId, consumedPartitionType, consumedSubpartitionIndex);
this.inputChannels = inputChannels;
}
@Override
public int getNumberOfInputChannels() {
return inputChannels.length;
}
@Override
public boolean isFinished() {
synchronized (lock) {
return finished;
}
}
@Override
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
while (true) {
synchronized (lock) {
if (finished) {
return Optional.empty();
}
// Check channels with available data
InputChannel channelToRead = getChannelWithData();
if (channelToRead != null) {
Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
if (result.isPresent()) {
BufferAndAvailability bufferAndAvailability = result.get();
// Handle the buffer
if (bufferAndAvailability.buffer().isBuffer()) {
return Optional.of(new BufferOrEvent(
bufferAndAvailability.buffer(),
channelToRead.getChannelIndex()
));
} else {
// Handle events
AbstractEvent event = EventSerializer.fromBuffer(
bufferAndAvailability.buffer(),
getClass().getClassLoader()
);
bufferAndAvailability.buffer().recycleBuffer();
// Check if this is an end-of-partition event
if (event.getClass() == EndOfPartitionEvent.class) {
channelToRead.releaseAllResources();
checkForFinished();
}
return Optional.of(new BufferOrEvent(
event,
channelToRead.getChannelIndex()
));
}
}
}
}
// Wait for data availability
waitForData();
}
}
@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
synchronized (lock) {
if (finished) {
return Optional.empty();
}
InputChannel channelToRead = getChannelWithData();
if (channelToRead != null) {
Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
if (result.isPresent()) {
// Similar processing as in getNext()
return processBuffer(result.get(), channelToRead);
}
}
return Optional.empty();
}
}
@Override
public void sendTaskEvent(TaskEvent event) throws IOException {
for (InputChannel channel : inputChannels) {
channel.sendTaskEvent(event);
}
}
@Override
public void registerListener(InputGateListener listener) {
// Implementation for listener registration
this.inputGateListener = listener;
}
@Override
public int getPageSize() {
return 32768; // Default page size
}
@Override
public void close() throws Exception {
synchronized (lock) {
finished = true;
for (InputChannel channel : inputChannels) {
try {
channel.releaseAllResources();
} catch (Exception e) {
// Log but continue cleanup
System.err.println("Error releasing channel: " + e.getMessage());
}
}
}
}
private InputChannel getChannelWithData() {
Iterator<InputChannel> iterator = channelsWithData.iterator();
if (iterator.hasNext()) {
InputChannel channel = iterator.next();
iterator.remove();
return channel;
}
return null;
}
private void waitForData() throws InterruptedException {
synchronized (lock) {
while (channelsWithData.isEmpty() && !finished) {
lock.wait();
}
}
}
private void checkForFinished() {
boolean allFinished = true;
for (InputChannel channel : inputChannels) {
if (!channel.isReleased()) {
allFinished = false;
break;
}
}
if (allFinished) {
finished = true;
synchronized (lock) {
lock.notifyAll();
}
}
}
public void notifyChannelNonEmpty(InputChannel channel) {
synchronized (lock) {
channelsWithData.add(channel);
lock.notifyAll();
}
// Notify input gate listener
if (inputGateListener != null) {
inputGateListener.notifyInputGateNonEmpty(this);
}
}
}import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
public class BufferPoolManager {
private final NetworkBufferPool networkBufferPool;
private final Map<String, BufferPool> bufferPools = new ConcurrentHashMap<>();
public BufferPoolManager(int totalBuffers, int bufferSize) {
this.networkBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
}
public BufferPool createBufferPool(String poolName, int minBuffers, int maxBuffers) throws IOException {
BufferPool bufferPool = networkBufferPool.createBufferPool(minBuffers, maxBuffers);
bufferPools.put(poolName, bufferPool);
System.out.println("Created buffer pool '" + poolName + "' with " +
minBuffers + "-" + maxBuffers + " buffers");
return bufferPool;
}
public void destroyBufferPool(String poolName) {
BufferPool bufferPool = bufferPools.remove(poolName);
if (bufferPool != null) {
bufferPool.close();
System.out.println("Destroyed buffer pool '" + poolName + "'");
}
}
public void printBufferPoolStats() {
System.out.println("=== Buffer Pool Statistics ===");
System.out.println("Network Pool - Total: " + networkBufferPool.getTotalNumberOfMemorySegments() +
", Available: " + networkBufferPool.getNumberOfAvailableMemorySegments());
for (Map.Entry<String, BufferPool> entry : bufferPools.entrySet()) {
BufferPool pool = entry.getValue();
System.out.println("Pool '" + entry.getKey() + "' - " +
"Requested: " + pool.getNumberOfRequestedMemorySegments() +
", Available: " + pool.getNumberOfAvailableMemorySegments() +
", Max: " + pool.getMaxNumberOfMemorySegments());
}
}
public void shutdown() {
// Close all buffer pools
for (BufferPool pool : bufferPools.values()) {
pool.close();
}
bufferPools.clear();
// Close network buffer pool
networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
}
}public class BackpressureAwareWriter {
private final ResultPartitionWriter writer;
private final AtomicLong backpressureCount = new AtomicLong(0);
private final long maxBackpressureWait = 5000; // 5 seconds
public BackpressureAwareWriter(ResultPartitionWriter writer) {
this.writer = writer;
}
public void writeWithBackpressure(ByteBuffer data) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
while (true) {
try {
BufferBuilder bufferBuilder = writer.tryGetBufferBuilder();
if (bufferBuilder != null) {
// Successfully got buffer, write data
boolean success = bufferBuilder.append(data);
if (success) {
bufferBuilder.finish();
return;
} else {
// Buffer full, need another buffer
bufferBuilder.finish();
continue;
}
}
// No buffer available - backpressure
long elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime > maxBackpressureWait) {
throw new IOException("Backpressure timeout: unable to get buffer after " +
maxBackpressureWait + "ms");
}
backpressureCount.incrementAndGet();
Thread.sleep(10); // Brief wait before retry
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to write data", e);
}
}
}
public long getBackpressureCount() {
return backpressureCount.get();
}
}public class NetworkMetricsCollector {
private final Counter buffersRequested;
private final Counter buffersRecycled;
private final Histogram bufferWaitTime;
private final Gauge<Integer> buffersInUse;
private final AtomicInteger currentBuffersInUse = new AtomicInteger(0);
public NetworkMetricsCollector(MetricGroup metricGroup) {
this.buffersRequested = metricGroup.counter("buffers_requested");
this.buffersRecycled = metricGroup.counter("buffers_recycled");
this.bufferWaitTime = metricGroup.histogram("buffer_wait_time_ms",
new DescriptiveStatisticsHistogram(1000));
this.buffersInUse = metricGroup.gauge("buffers_in_use", currentBuffersInUse::get);
}
public void recordBufferRequest(long waitTimeMs) {
buffersRequested.inc();
bufferWaitTime.update(waitTimeMs);
currentBuffersInUse.incrementAndGet();
}
public void recordBufferRecycle() {
buffersRecycled.inc();
currentBuffersInUse.decrementAndGet();
}
public NetworkStats getNetworkStats() {
NetworkStats stats = new NetworkStats();
stats.buffersRequested = buffersRequested.getCount();
stats.buffersRecycled = buffersRecycled.getCount();
stats.buffersInUse = currentBuffersInUse.get();
stats.avgBufferWaitTime = bufferWaitTime.getStatistics().getMean();
return stats;
}
public static class NetworkStats {
public long buffersRequested;
public long buffersRecycled;
public int buffersInUse;
public double avgBufferWaitTime;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10