CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

data-exchange.mddocs/

Data Exchange and Networking

The Data Exchange and Networking system provides the fundamental infrastructure for inter-task communication in Flink clusters. This system defines data exchange patterns, result partition types, and networking mechanisms that enable efficient data transfer between operators across the distributed execution environment.

Data Exchange Modes

DataExchangeMode

Enumeration defining different data exchange patterns that control how data flows between operators.

public enum DataExchangeMode {
    PIPELINED,                      // Streamed data exchange with back-pressure
    BATCH,                          // Decoupled data exchange with full result materialization
    PIPELINE_WITH_BATCH_FALLBACK;   // Pipelined with batch fallback for recovery
    
    public static DataExchangeMode getForForwardExchange(ExecutionMode mode);
    public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);
    public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);
    public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy, boolean breakPipeline);
}

DistributionPattern

Defines how data is distributed between upstream and downstream operators.

public enum DistributionPattern {
    POINTWISE,     // Each upstream subtask sends data to one downstream subtask
    ALL_TO_ALL;    // Each upstream subtask sends data to all downstream subtasks
    
    public boolean isPointwise();
    public boolean isAllToAll();
}

Result Partition System

ResultPartitionType

Enumeration of result partition types that determine data exchange characteristics and buffering behavior.

public enum ResultPartitionType {
    BLOCKING(true, false, false),           // Data is fully produced before consumption
    PIPELINED(false, true, false),          // Data is consumed as it's produced  
    PIPELINED_BOUNDED(false, true, true);   // Pipelined with bounded buffers
    
    private final boolean isBlocking;
    private final boolean isPipelined;
    private final boolean isBounded;
    
    public boolean isBlocking();
    public boolean isPipelined();
    public boolean isBounded();
    
    public boolean hasBackPressure();
    public boolean requiresFiniteStreams();
}

ResultPartition

Represents a partition of results produced by an operator that can be consumed by downstream tasks.

public class ResultPartition implements AutoCloseable {
    public ResultPartition(
        String owningTaskName,
        TaskActions taskActions,
        JobID jobId,
        ResultPartitionID partitionId,
        ResultPartitionType partitionType,
        int numberOfSubpartitions,
        int numberOfQueuedBuffers,
        ResultPartitionManager partitionManager,
        @Nullable ResultPartitionMetrics metrics
    );
    
    public void setup() throws IOException;
    public void finish() throws IOException;
    public void release();
    public void release(Throwable cause);
    
    public BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
    public BufferBuilder tryGetBufferBuilder() throws IOException;
    
    public void flushAll();
    public void flush(int targetSubpartition);
    
    public ResultPartitionID getPartitionId();
    public ResultPartitionType getResultType();
    public int getNumberOfSubpartitions();
    
    public boolean isReleased();
    public Throwable getFailureCause();
    
    @Override
    public void close();
}

IntermediateDataSet

Represents a data set produced by one job vertex and consumed by another, defining the connection in the job graph.

public class IntermediateDataSet implements Serializable {
    public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer);
    
    public IntermediateDataSetID getId();
    public JobVertex getProducer();
    public List<JobEdge> getConsumers();
    
    public ResultPartitionType getResultType();
    
    public void addConsumer(JobEdge edge);
    
    public int getConsumerParallelism();
    public DistributionPattern getDistributionPattern();
}

Networking Infrastructure

ResultPartitionWriter

Interface for writing data to result partitions, providing the output mechanism for operators.

public interface ResultPartitionWriter extends AutoCloseable {
    ResultPartition getPartition();
    
    BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
    BufferBuilder tryGetBufferBuilder() throws IOException;
    
    void flushAll();
    void flushAllSubpartitions(boolean finishProducers);
    
    void fail(Throwable throwable);
    void finish() throws IOException;
    
    @Override
    void close();
}

InputGate

Abstract base class for input gates that manage reading data from multiple input channels.

public abstract class InputGate implements AutoCloseable {
    public abstract int getNumberOfInputChannels();
    public abstract boolean isFinished();
    
    public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
    public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
    
    public abstract void sendTaskEvent(TaskEvent event) throws IOException;
    public abstract void registerListener(InputGateListener listener);
    
    public abstract int getPageSize();
    
    @Override
    public abstract void close() throws Exception;
}

InputChannel

Abstract base class representing an input channel that reads data from a specific result partition.

public abstract class InputChannel {
    protected final InputGate inputGate;
    protected final int channelIndex;
    protected final ResultPartitionID partitionId;
    protected final int initialBackoff;
    protected final int maxBackoff;
    
    public InputChannel(
        InputGate inputGate,
        int channelIndex, 
        ResultPartitionID partitionId,
        int initialBackoff,
        int maxBackoff
    );
    
    public int getChannelIndex();
    public ResultPartitionID getPartitionId();
    
    public abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException;
    
    public abstract void sendTaskEvent(TaskEvent event) throws IOException;
    
    public abstract boolean isReleased();
    public abstract void releaseAllResources() throws IOException;
    
    public abstract int getBuffersInUseCount();
}

Buffer Management

Buffer

Interface representing a buffer containing data or events in the network stack.

public interface Buffer extends AutoCloseable {
    boolean isBuffer();
    boolean isEvent();
    
    MemorySegment getMemorySegment();
    int getMemorySegmentOffset();
    BufferRecycler getRecycler();
    
    void recycleBuffer();
    boolean isRecycled();
    
    Buffer retainBuffer();
    Buffer readOnlySlice();
    Buffer readOnlySlice(int index, int length);
    
    int getMaxCapacity();
    int getSize();
    void setSize(int writerIndex);
    
    int getReaderIndex();
    void setReaderIndex(int readerIndex);
    
    ByteBuffer getNioBufferReadable();
    ByteBuffer getNioBuffer(int index, int length);
    
    @Override
    void close();
}

BufferBuilder

Interface for building buffers by appending data incrementally.

public interface BufferBuilder extends AutoCloseable {
    boolean append(ByteBuffer source) throws IOException;
    boolean appendAndCommit(ByteBuffer source) throws IOException;
    
    BufferConsumer createBufferConsumer();
    boolean isFull();
    boolean isFinished();
    
    int getWrittenBytes();
    int getMaxCapacity();
    
    void finish();
    
    @Override
    void close();
}

BufferPool

Interface for managing pools of network buffers to control memory usage.

public interface BufferPool extends AutoCloseable {
    Buffer requestBuffer() throws IOException;
    Buffer requestBuffer(boolean isBlocking) throws IOException, InterruptedException;
    BufferBuilder requestBufferBuilder() throws IOException;
    BufferBuilder requestBufferBuilder(boolean isBlocking) throws IOException, InterruptedException;
    
    void recycle(MemorySegment memorySegment);
    
    boolean addBufferListener(BufferListener listener);
    
    boolean isDestroyed();
    
    int getNumberOfRequestedMemorySegments();
    int getNumberOfAvailableMemorySegments();
    int getNumBuffers();
    int getMaxNumberOfMemorySegments();
    
    void setNumBuffers(int numBuffers) throws IOException;
    
    @Override
    void close();
}

Task Events and Communication

TaskEvent

Base class for events that can be sent between tasks through the network stack.

public abstract class TaskEvent implements Serializable {
    public static final int MAX_SIZE = 1024;
    
    // Subclasses implement specific event types
}

BufferOrEvent

Container that holds either a data buffer or a task event from the network stack.

public class BufferOrEvent {
    public BufferOrEvent(Buffer buffer, int channelIndex);
    public BufferOrEvent(AbstractEvent event, int channelIndex);
    
    public boolean isBuffer();
    public boolean isEvent();
    
    public Buffer getBuffer();
    public AbstractEvent getEvent();
    
    public int getChannelIndex();
    
    public void recycleBuffer();
    
    public Optional<Buffer> getOptionalBuffer();
    public Optional<AbstractEvent> getOptionalEvent();
}

InputGateListener

Interface for listening to input gate events and buffer availability.

public interface InputGateListener {
    void notifyInputGateNonEmpty(InputGate inputGate);
}

Partitioning Strategies

ChannelSelector

Interface for selecting output channels based on record content, implementing different partitioning strategies.

public interface ChannelSelector<T> {
    int[] selectChannels(T record, int numberOfOutputChannels);
    
    boolean isBroadcast();
}

OutputEmitter

Manages the emission of records to downstream tasks with appropriate partitioning.

public class OutputEmitter<T> {
    public OutputEmitter(
        ShipStrategyType strategy,
        ChannelSelector<T> channelSelector
    );
    
    public void emit(T record) throws IOException, InterruptedException;
    public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException;
    
    public void flush() throws IOException;
    public void close();
    
    public void clearBuffers();
    
    public ShipStrategyType getShipStrategy();
}

Shipping Strategies

ShipStrategyType

Enumeration of data shipping strategies that determine how records are distributed to downstream tasks.

public enum ShipStrategyType {
    FORWARD,        // Direct forwarding to single downstream task
    BROADCAST,      // Send to all downstream tasks  
    PARTITION_HASH, // Hash-based partitioning
    PARTITION_RANGE,// Range-based partitioning
    PARTITION_RANDOM,// Random distribution
    PARTITION_FORCED_REBALANCE, // Forced rebalancing
    PARTITION_CUSTOM; // Custom partitioning logic
    
    public boolean isNetworkStrategy();
    public boolean isForward();
    public boolean isBroadcast();
    public boolean isPartitioned();
}

Usage Examples

Configuring Data Exchange Patterns

import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.*;

// Create job vertices
JobVertex sourceVertex = new JobVertex("Source");
JobVertex mapVertex = new JobVertex("Map");  
JobVertex sinkVertex = new JobVertex("Sink");

// Configure different result partition types
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
IntermediateDataSet mapOutput = mapVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);

// Create edges with different distribution patterns
JobEdge sourceToMap = new JobEdge(sourceOutput, mapVertex, DistributionPattern.ALL_TO_ALL);
sourceToMap.setShipStrategy(ShipStrategyType.PARTITION_HASH);

JobEdge mapToSink = new JobEdge(mapOutput, sinkVertex, DistributionPattern.FORWARD);
mapToSink.setShipStrategy(ShipStrategyType.FORWARD);

// Add edges to job graph
JobGraph jobGraph = new JobGraph("Data Exchange Example");
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);
jobGraph.addVertex(sinkVertex);
jobGraph.addEdge(sourceToMap);
jobGraph.addEdge(mapToSink);

// Configure global data exchange mode
Configuration jobConfig = new Configuration();
jobConfig.setString("execution.batch-shuffle-mode", DataExchangeMode.ALL_EDGES_BLOCKING.toString());
jobGraph.setJobConfiguration(jobConfig);

Custom Result Partition Writer

import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;

public class CustomResultPartitionWriter implements ResultPartitionWriter {
    private final ResultPartition partition;
    private final BufferPool bufferPool;
    private volatile boolean finished = false;
    
    public CustomResultPartitionWriter(ResultPartition partition, BufferPool bufferPool) {
        this.partition = partition;
        this.bufferPool = bufferPool;
    }
    
    @Override
    public ResultPartition getPartition() {
        return partition;
    }
    
    @Override
    public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
        if (finished) {
            throw new IllegalStateException("Writer has been finished");
        }
        
        // Request buffer from pool
        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(true);
        if (bufferBuilder == null) {
            throw new IOException("Failed to get buffer from pool");
        }
        
        return bufferBuilder;
    }
    
    @Override
    public BufferBuilder tryGetBufferBuilder() throws IOException {
        if (finished) {
            return null;
        }
        
        return bufferPool.requestBufferBuilder(false);
    }
    
    @Override
    public void flushAll() {
        partition.flushAll();
    }
    
    @Override
    public void flushAllSubpartitions(boolean finishProducers) {
        partition.flushAll();
        if (finishProducers) {
            try {
                finish();
            } catch (IOException e) {
                throw new RuntimeException("Failed to finish partition", e);
            }
        }
    }
    
    @Override
    public void fail(Throwable throwable) {
        finished = true;
        partition.fail(throwable);
    }
    
    @Override
    public void finish() throws IOException {
        if (!finished) {
            finished = true;
            partition.finish();
        }
    }
    
    @Override
    public void close() {
        try {
            finish();
        } catch (IOException e) {
            // Log error but don't throw in close()
            System.err.println("Error finishing partition during close: " + e.getMessage());
        }
    }
    
    public boolean isFinished() {
        return finished;
    }
}

Input Gate Implementation

import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;

public class CustomInputGate extends InputGate {
    private final InputChannel[] inputChannels;
    private final Set<InputChannel> channelsWithData = new LinkedHashSet<>();
    private final Object lock = new Object();
    private volatile boolean finished = false;
    
    public CustomInputGate(String owningTaskName, JobID jobID, 
                          IntermediateDataSetID consumedResultId,
                          ResultPartitionType consumedPartitionType,
                          int consumedSubpartitionIndex,
                          InputChannel[] inputChannels) {
        super(owningTaskName, jobID, consumedResultId, consumedPartitionType, consumedSubpartitionIndex);
        this.inputChannels = inputChannels;
    }
    
    @Override
    public int getNumberOfInputChannels() {
        return inputChannels.length;
    }
    
    @Override
    public boolean isFinished() {
        synchronized (lock) {
            return finished;
        }
    }
    
    @Override
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        while (true) {
            synchronized (lock) {
                if (finished) {
                    return Optional.empty();
                }
                
                // Check channels with available data
                InputChannel channelToRead = getChannelWithData();
                if (channelToRead != null) {
                    Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
                    if (result.isPresent()) {
                        BufferAndAvailability bufferAndAvailability = result.get();
                        
                        // Handle the buffer
                        if (bufferAndAvailability.buffer().isBuffer()) {
                            return Optional.of(new BufferOrEvent(
                                bufferAndAvailability.buffer(),
                                channelToRead.getChannelIndex()
                            ));
                        } else {
                            // Handle events
                            AbstractEvent event = EventSerializer.fromBuffer(
                                bufferAndAvailability.buffer(),
                                getClass().getClassLoader()
                            );
                            
                            bufferAndAvailability.buffer().recycleBuffer();
                            
                            // Check if this is an end-of-partition event
                            if (event.getClass() == EndOfPartitionEvent.class) {
                                channelToRead.releaseAllResources();
                                checkForFinished();
                            }
                            
                            return Optional.of(new BufferOrEvent(
                                event,
                                channelToRead.getChannelIndex()
                            ));
                        }
                    }
                }
            }
            
            // Wait for data availability
            waitForData();
        }
    }
    
    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        synchronized (lock) {
            if (finished) {
                return Optional.empty();
            }
            
            InputChannel channelToRead = getChannelWithData();
            if (channelToRead != null) {
                Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
                if (result.isPresent()) {
                    // Similar processing as in getNext()
                    return processBuffer(result.get(), channelToRead);
                }
            }
            
            return Optional.empty();
        }
    }
    
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        for (InputChannel channel : inputChannels) {
            channel.sendTaskEvent(event);
        }
    }
    
    @Override
    public void registerListener(InputGateListener listener) {
        // Implementation for listener registration
        this.inputGateListener = listener;
    }
    
    @Override
    public int getPageSize() {
        return 32768; // Default page size
    }
    
    @Override
    public void close() throws Exception {
        synchronized (lock) {
            finished = true;
            
            for (InputChannel channel : inputChannels) {
                try {
                    channel.releaseAllResources();
                } catch (Exception e) {
                    // Log but continue cleanup
                    System.err.println("Error releasing channel: " + e.getMessage());
                }
            }
        }
    }
    
    private InputChannel getChannelWithData() {
        Iterator<InputChannel> iterator = channelsWithData.iterator();
        if (iterator.hasNext()) {
            InputChannel channel = iterator.next();
            iterator.remove();
            return channel;
        }
        return null;
    }
    
    private void waitForData() throws InterruptedException {
        synchronized (lock) {
            while (channelsWithData.isEmpty() && !finished) {
                lock.wait();
            }
        }
    }
    
    private void checkForFinished() {
        boolean allFinished = true;
        for (InputChannel channel : inputChannels) {
            if (!channel.isReleased()) {
                allFinished = false;
                break;
            }
        }
        
        if (allFinished) {
            finished = true;
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    }
    
    public void notifyChannelNonEmpty(InputChannel channel) {
        synchronized (lock) {
            channelsWithData.add(channel);
            lock.notifyAll();
        }
        
        // Notify input gate listener
        if (inputGateListener != null) {
            inputGateListener.notifyInputGateNonEmpty(this);
        }
    }
}

Buffer Pool Management

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;

public class BufferPoolManager {
    private final NetworkBufferPool networkBufferPool;
    private final Map<String, BufferPool> bufferPools = new ConcurrentHashMap<>();
    
    public BufferPoolManager(int totalBuffers, int bufferSize) {
        this.networkBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
    }
    
    public BufferPool createBufferPool(String poolName, int minBuffers, int maxBuffers) throws IOException {
        BufferPool bufferPool = networkBufferPool.createBufferPool(minBuffers, maxBuffers);
        bufferPools.put(poolName, bufferPool);
        
        System.out.println("Created buffer pool '" + poolName + "' with " + 
                          minBuffers + "-" + maxBuffers + " buffers");
        
        return bufferPool;
    }
    
    public void destroyBufferPool(String poolName) {
        BufferPool bufferPool = bufferPools.remove(poolName);
        if (bufferPool != null) {
            bufferPool.close();
            System.out.println("Destroyed buffer pool '" + poolName + "'");
        }
    }
    
    public void printBufferPoolStats() {
        System.out.println("=== Buffer Pool Statistics ===");
        System.out.println("Network Pool - Total: " + networkBufferPool.getTotalNumberOfMemorySegments() +
                          ", Available: " + networkBufferPool.getNumberOfAvailableMemorySegments());
        
        for (Map.Entry<String, BufferPool> entry : bufferPools.entrySet()) {
            BufferPool pool = entry.getValue();
            System.out.println("Pool '" + entry.getKey() + "' - " +
                              "Requested: " + pool.getNumberOfRequestedMemorySegments() +
                              ", Available: " + pool.getNumberOfAvailableMemorySegments() +
                              ", Max: " + pool.getMaxNumberOfMemorySegments());
        }
    }
    
    public void shutdown() {
        // Close all buffer pools
        for (BufferPool pool : bufferPools.values()) {
            pool.close();
        }
        bufferPools.clear();
        
        // Close network buffer pool
        networkBufferPool.destroyAllBufferPools();
        networkBufferPool.destroy();
    }
}

Common Patterns

Backpressure Handling

public class BackpressureAwareWriter {
    private final ResultPartitionWriter writer;
    private final AtomicLong backpressureCount = new AtomicLong(0);
    private final long maxBackpressureWait = 5000; // 5 seconds
    
    public BackpressureAwareWriter(ResultPartitionWriter writer) {
        this.writer = writer;
    }
    
    public void writeWithBackpressure(ByteBuffer data) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();
        
        while (true) {
            try {
                BufferBuilder bufferBuilder = writer.tryGetBufferBuilder();
                if (bufferBuilder != null) {
                    // Successfully got buffer, write data
                    boolean success = bufferBuilder.append(data);
                    if (success) {
                        bufferBuilder.finish();
                        return;
                    } else {
                        // Buffer full, need another buffer
                        bufferBuilder.finish();
                        continue;
                    }
                }
                
                // No buffer available - backpressure
                long elapsedTime = System.currentTimeMillis() - startTime;
                if (elapsedTime > maxBackpressureWait) {
                    throw new IOException("Backpressure timeout: unable to get buffer after " + 
                                        maxBackpressureWait + "ms");
                }
                
                backpressureCount.incrementAndGet();
                Thread.sleep(10); // Brief wait before retry
                
            } catch (IOException e) {
                throw e;
            } catch (Exception e) {
                throw new IOException("Failed to write data", e);
            }
        }
    }
    
    public long getBackpressureCount() {
        return backpressureCount.get();
    }
}

Network Metrics Collection

public class NetworkMetricsCollector {
    private final Counter buffersRequested;
    private final Counter buffersRecycled;
    private final Histogram bufferWaitTime;
    private final Gauge<Integer> buffersInUse;
    
    private final AtomicInteger currentBuffersInUse = new AtomicInteger(0);
    
    public NetworkMetricsCollector(MetricGroup metricGroup) {
        this.buffersRequested = metricGroup.counter("buffers_requested");
        this.buffersRecycled = metricGroup.counter("buffers_recycled");
        this.bufferWaitTime = metricGroup.histogram("buffer_wait_time_ms", 
                                                  new DescriptiveStatisticsHistogram(1000));
        this.buffersInUse = metricGroup.gauge("buffers_in_use", currentBuffersInUse::get);
    }
    
    public void recordBufferRequest(long waitTimeMs) {
        buffersRequested.inc();
        bufferWaitTime.update(waitTimeMs);
        currentBuffersInUse.incrementAndGet();
    }
    
    public void recordBufferRecycle() {
        buffersRecycled.inc();
        currentBuffersInUse.decrementAndGet();
    }
    
    public NetworkStats getNetworkStats() {
        NetworkStats stats = new NetworkStats();
        stats.buffersRequested = buffersRequested.getCount();
        stats.buffersRecycled = buffersRecycled.getCount();
        stats.buffersInUse = currentBuffersInUse.get();
        stats.avgBufferWaitTime = bufferWaitTime.getStatistics().getMean();
        return stats;
    }
    
    public static class NetworkStats {
        public long buffersRequested;
        public long buffersRecycled;
        public int buffersInUse;
        public double avgBufferWaitTime;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json