0
# Data Exchange and Networking
1
2
The Data Exchange and Networking system provides the fundamental infrastructure for inter-task communication in Flink clusters. This system defines data exchange patterns, result partition types, and networking mechanisms that enable efficient data transfer between operators across the distributed execution environment.
3
4
## Data Exchange Modes
5
6
### DataExchangeMode
7
8
Enumeration defining different data exchange patterns that control how data flows between operators.
9
10
```java { .api }
11
public enum DataExchangeMode {
12
PIPELINED, // Streamed data exchange with back-pressure
13
BATCH, // Decoupled data exchange with full result materialization
14
PIPELINE_WITH_BATCH_FALLBACK; // Pipelined with batch fallback for recovery
15
16
public static DataExchangeMode getForForwardExchange(ExecutionMode mode);
17
public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);
18
public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);
19
public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy, boolean breakPipeline);
20
}
21
```
22
23
### DistributionPattern
24
25
Defines how data is distributed between upstream and downstream operators.
26
27
```java { .api }
28
public enum DistributionPattern {
29
POINTWISE, // Each upstream subtask sends data to one downstream subtask
30
ALL_TO_ALL; // Each upstream subtask sends data to all downstream subtasks
31
32
public boolean isPointwise();
33
public boolean isAllToAll();
34
}
35
```
36
37
## Result Partition System
38
39
### ResultPartitionType
40
41
Enumeration of result partition types that determine data exchange characteristics and buffering behavior.
42
43
```java { .api }
44
public enum ResultPartitionType {
45
BLOCKING(true, false, false), // Data is fully produced before consumption
46
PIPELINED(false, true, false), // Data is consumed as it's produced
47
PIPELINED_BOUNDED(false, true, true); // Pipelined with bounded buffers
48
49
private final boolean isBlocking;
50
private final boolean isPipelined;
51
private final boolean isBounded;
52
53
public boolean isBlocking();
54
public boolean isPipelined();
55
public boolean isBounded();
56
57
public boolean hasBackPressure();
58
public boolean requiresFiniteStreams();
59
}
60
```
61
62
### ResultPartition
63
64
Represents a partition of results produced by an operator that can be consumed by downstream tasks.
65
66
```java { .api }
67
public class ResultPartition implements AutoCloseable {
68
public ResultPartition(
69
String owningTaskName,
70
TaskActions taskActions,
71
JobID jobId,
72
ResultPartitionID partitionId,
73
ResultPartitionType partitionType,
74
int numberOfSubpartitions,
75
int numberOfQueuedBuffers,
76
ResultPartitionManager partitionManager,
77
@Nullable ResultPartitionMetrics metrics
78
);
79
80
public void setup() throws IOException;
81
public void finish() throws IOException;
82
public void release();
83
public void release(Throwable cause);
84
85
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
86
public BufferBuilder tryGetBufferBuilder() throws IOException;
87
88
public void flushAll();
89
public void flush(int targetSubpartition);
90
91
public ResultPartitionID getPartitionId();
92
public ResultPartitionType getResultType();
93
public int getNumberOfSubpartitions();
94
95
public boolean isReleased();
96
public Throwable getFailureCause();
97
98
@Override
99
public void close();
100
}
101
```
102
103
### IntermediateDataSet
104
105
Represents a data set produced by one job vertex and consumed by another, defining the connection in the job graph.
106
107
```java { .api }
108
public class IntermediateDataSet implements Serializable {
109
public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer);
110
111
public IntermediateDataSetID getId();
112
public JobVertex getProducer();
113
public List<JobEdge> getConsumers();
114
115
public ResultPartitionType getResultType();
116
117
public void addConsumer(JobEdge edge);
118
119
public int getConsumerParallelism();
120
public DistributionPattern getDistributionPattern();
121
}
122
```
123
124
## Networking Infrastructure
125
126
### ResultPartitionWriter
127
128
Interface for writing data to result partitions, providing the output mechanism for operators.
129
130
```java { .api }
131
public interface ResultPartitionWriter extends AutoCloseable {
132
ResultPartition getPartition();
133
134
BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
135
BufferBuilder tryGetBufferBuilder() throws IOException;
136
137
void flushAll();
138
void flushAllSubpartitions(boolean finishProducers);
139
140
void fail(Throwable throwable);
141
void finish() throws IOException;
142
143
@Override
144
void close();
145
}
146
```
147
148
### InputGate
149
150
Abstract base class for input gates that manage reading data from multiple input channels.
151
152
```java { .api }
153
public abstract class InputGate implements AutoCloseable {
154
public abstract int getNumberOfInputChannels();
155
public abstract boolean isFinished();
156
157
public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;
158
public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;
159
160
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
161
public abstract void registerListener(InputGateListener listener);
162
163
public abstract int getPageSize();
164
165
@Override
166
public abstract void close() throws Exception;
167
}
168
```
169
170
### InputChannel
171
172
Abstract base class representing an input channel that reads data from a specific result partition.
173
174
```java { .api }
175
public abstract class InputChannel {
176
protected final InputGate inputGate;
177
protected final int channelIndex;
178
protected final ResultPartitionID partitionId;
179
protected final int initialBackoff;
180
protected final int maxBackoff;
181
182
public InputChannel(
183
InputGate inputGate,
184
int channelIndex,
185
ResultPartitionID partitionId,
186
int initialBackoff,
187
int maxBackoff
188
);
189
190
public int getChannelIndex();
191
public ResultPartitionID getPartitionId();
192
193
public abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException;
194
195
public abstract void sendTaskEvent(TaskEvent event) throws IOException;
196
197
public abstract boolean isReleased();
198
public abstract void releaseAllResources() throws IOException;
199
200
public abstract int getBuffersInUseCount();
201
}
202
```
203
204
## Buffer Management
205
206
### Buffer
207
208
Interface representing a buffer containing data or events in the network stack.
209
210
```java { .api }
211
public interface Buffer extends AutoCloseable {
212
boolean isBuffer();
213
boolean isEvent();
214
215
MemorySegment getMemorySegment();
216
int getMemorySegmentOffset();
217
BufferRecycler getRecycler();
218
219
void recycleBuffer();
220
boolean isRecycled();
221
222
Buffer retainBuffer();
223
Buffer readOnlySlice();
224
Buffer readOnlySlice(int index, int length);
225
226
int getMaxCapacity();
227
int getSize();
228
void setSize(int writerIndex);
229
230
int getReaderIndex();
231
void setReaderIndex(int readerIndex);
232
233
ByteBuffer getNioBufferReadable();
234
ByteBuffer getNioBuffer(int index, int length);
235
236
@Override
237
void close();
238
}
239
```
240
241
### BufferBuilder
242
243
Interface for building buffers by appending data incrementally.
244
245
```java { .api }
246
public interface BufferBuilder extends AutoCloseable {
247
boolean append(ByteBuffer source) throws IOException;
248
boolean appendAndCommit(ByteBuffer source) throws IOException;
249
250
BufferConsumer createBufferConsumer();
251
boolean isFull();
252
boolean isFinished();
253
254
int getWrittenBytes();
255
int getMaxCapacity();
256
257
void finish();
258
259
@Override
260
void close();
261
}
262
```
263
264
### BufferPool
265
266
Interface for managing pools of network buffers to control memory usage.
267
268
```java { .api }
269
public interface BufferPool extends AutoCloseable {
270
Buffer requestBuffer() throws IOException;
271
Buffer requestBuffer(boolean isBlocking) throws IOException, InterruptedException;
272
BufferBuilder requestBufferBuilder() throws IOException;
273
BufferBuilder requestBufferBuilder(boolean isBlocking) throws IOException, InterruptedException;
274
275
void recycle(MemorySegment memorySegment);
276
277
boolean addBufferListener(BufferListener listener);
278
279
boolean isDestroyed();
280
281
int getNumberOfRequestedMemorySegments();
282
int getNumberOfAvailableMemorySegments();
283
int getNumBuffers();
284
int getMaxNumberOfMemorySegments();
285
286
void setNumBuffers(int numBuffers) throws IOException;
287
288
@Override
289
void close();
290
}
291
```
292
293
## Task Events and Communication
294
295
### TaskEvent
296
297
Base class for events that can be sent between tasks through the network stack.
298
299
```java { .api }
300
public abstract class TaskEvent implements Serializable {
301
public static final int MAX_SIZE = 1024;
302
303
// Subclasses implement specific event types
304
}
305
```
306
307
### BufferOrEvent
308
309
Container that holds either a data buffer or a task event from the network stack.
310
311
```java { .api }
312
public class BufferOrEvent {
313
public BufferOrEvent(Buffer buffer, int channelIndex);
314
public BufferOrEvent(AbstractEvent event, int channelIndex);
315
316
public boolean isBuffer();
317
public boolean isEvent();
318
319
public Buffer getBuffer();
320
public AbstractEvent getEvent();
321
322
public int getChannelIndex();
323
324
public void recycleBuffer();
325
326
public Optional<Buffer> getOptionalBuffer();
327
public Optional<AbstractEvent> getOptionalEvent();
328
}
329
```
330
331
### InputGateListener
332
333
Interface for listening to input gate events and buffer availability.
334
335
```java { .api }
336
public interface InputGateListener {
337
void notifyInputGateNonEmpty(InputGate inputGate);
338
}
339
```
340
341
## Partitioning Strategies
342
343
### ChannelSelector
344
345
Interface for selecting output channels based on record content, implementing different partitioning strategies.
346
347
```java { .api }
348
public interface ChannelSelector<T> {
349
int[] selectChannels(T record, int numberOfOutputChannels);
350
351
boolean isBroadcast();
352
}
353
```
354
355
### OutputEmitter
356
357
Manages the emission of records to downstream tasks with appropriate partitioning.
358
359
```java { .api }
360
public class OutputEmitter<T> {
361
public OutputEmitter(
362
ShipStrategyType strategy,
363
ChannelSelector<T> channelSelector
364
);
365
366
public void emit(T record) throws IOException, InterruptedException;
367
public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException;
368
369
public void flush() throws IOException;
370
public void close();
371
372
public void clearBuffers();
373
374
public ShipStrategyType getShipStrategy();
375
}
376
```
377
378
## Shipping Strategies
379
380
### ShipStrategyType
381
382
Enumeration of data shipping strategies that determine how records are distributed to downstream tasks.
383
384
```java { .api }
385
public enum ShipStrategyType {
386
FORWARD, // Direct forwarding to single downstream task
387
BROADCAST, // Send to all downstream tasks
388
PARTITION_HASH, // Hash-based partitioning
389
PARTITION_RANGE,// Range-based partitioning
390
PARTITION_RANDOM,// Random distribution
391
PARTITION_FORCED_REBALANCE, // Forced rebalancing
392
PARTITION_CUSTOM; // Custom partitioning logic
393
394
public boolean isNetworkStrategy();
395
public boolean isForward();
396
public boolean isBroadcast();
397
public boolean isPartitioned();
398
}
399
```
400
401
## Usage Examples
402
403
### Configuring Data Exchange Patterns
404
405
```java
406
import org.apache.flink.runtime.io.network.DataExchangeMode;
407
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
408
import org.apache.flink.runtime.jobgraph.*;
409
410
// Create job vertices
411
JobVertex sourceVertex = new JobVertex("Source");
412
JobVertex mapVertex = new JobVertex("Map");
413
JobVertex sinkVertex = new JobVertex("Sink");
414
415
// Configure different result partition types
416
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
417
IntermediateDataSet mapOutput = mapVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);
418
419
// Create edges with different distribution patterns
420
JobEdge sourceToMap = new JobEdge(sourceOutput, mapVertex, DistributionPattern.ALL_TO_ALL);
421
sourceToMap.setShipStrategy(ShipStrategyType.PARTITION_HASH);
422
423
JobEdge mapToSink = new JobEdge(mapOutput, sinkVertex, DistributionPattern.FORWARD);
424
mapToSink.setShipStrategy(ShipStrategyType.FORWARD);
425
426
// Add edges to job graph
427
JobGraph jobGraph = new JobGraph("Data Exchange Example");
428
jobGraph.addVertex(sourceVertex);
429
jobGraph.addVertex(mapVertex);
430
jobGraph.addVertex(sinkVertex);
431
jobGraph.addEdge(sourceToMap);
432
jobGraph.addEdge(mapToSink);
433
434
// Configure global data exchange mode
435
Configuration jobConfig = new Configuration();
436
jobConfig.setString("execution.batch-shuffle-mode", DataExchangeMode.ALL_EDGES_BLOCKING.toString());
437
jobGraph.setJobConfiguration(jobConfig);
438
```
439
440
### Custom Result Partition Writer
441
442
```java
443
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
444
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
445
446
public class CustomResultPartitionWriter implements ResultPartitionWriter {
447
private final ResultPartition partition;
448
private final BufferPool bufferPool;
449
private volatile boolean finished = false;
450
451
public CustomResultPartitionWriter(ResultPartition partition, BufferPool bufferPool) {
452
this.partition = partition;
453
this.bufferPool = bufferPool;
454
}
455
456
@Override
457
public ResultPartition getPartition() {
458
return partition;
459
}
460
461
@Override
462
public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
463
if (finished) {
464
throw new IllegalStateException("Writer has been finished");
465
}
466
467
// Request buffer from pool
468
BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(true);
469
if (bufferBuilder == null) {
470
throw new IOException("Failed to get buffer from pool");
471
}
472
473
return bufferBuilder;
474
}
475
476
@Override
477
public BufferBuilder tryGetBufferBuilder() throws IOException {
478
if (finished) {
479
return null;
480
}
481
482
return bufferPool.requestBufferBuilder(false);
483
}
484
485
@Override
486
public void flushAll() {
487
partition.flushAll();
488
}
489
490
@Override
491
public void flushAllSubpartitions(boolean finishProducers) {
492
partition.flushAll();
493
if (finishProducers) {
494
try {
495
finish();
496
} catch (IOException e) {
497
throw new RuntimeException("Failed to finish partition", e);
498
}
499
}
500
}
501
502
@Override
503
public void fail(Throwable throwable) {
504
finished = true;
505
partition.fail(throwable);
506
}
507
508
@Override
509
public void finish() throws IOException {
510
if (!finished) {
511
finished = true;
512
partition.finish();
513
}
514
}
515
516
@Override
517
public void close() {
518
try {
519
finish();
520
} catch (IOException e) {
521
// Log error but don't throw in close()
522
System.err.println("Error finishing partition during close: " + e.getMessage());
523
}
524
}
525
526
public boolean isFinished() {
527
return finished;
528
}
529
}
530
```
531
532
### Input Gate Implementation
533
534
```java
535
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
536
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
537
538
public class CustomInputGate extends InputGate {
539
private final InputChannel[] inputChannels;
540
private final Set<InputChannel> channelsWithData = new LinkedHashSet<>();
541
private final Object lock = new Object();
542
private volatile boolean finished = false;
543
544
public CustomInputGate(String owningTaskName, JobID jobID,
545
IntermediateDataSetID consumedResultId,
546
ResultPartitionType consumedPartitionType,
547
int consumedSubpartitionIndex,
548
InputChannel[] inputChannels) {
549
super(owningTaskName, jobID, consumedResultId, consumedPartitionType, consumedSubpartitionIndex);
550
this.inputChannels = inputChannels;
551
}
552
553
@Override
554
public int getNumberOfInputChannels() {
555
return inputChannels.length;
556
}
557
558
@Override
559
public boolean isFinished() {
560
synchronized (lock) {
561
return finished;
562
}
563
}
564
565
@Override
566
public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
567
while (true) {
568
synchronized (lock) {
569
if (finished) {
570
return Optional.empty();
571
}
572
573
// Check channels with available data
574
InputChannel channelToRead = getChannelWithData();
575
if (channelToRead != null) {
576
Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
577
if (result.isPresent()) {
578
BufferAndAvailability bufferAndAvailability = result.get();
579
580
// Handle the buffer
581
if (bufferAndAvailability.buffer().isBuffer()) {
582
return Optional.of(new BufferOrEvent(
583
bufferAndAvailability.buffer(),
584
channelToRead.getChannelIndex()
585
));
586
} else {
587
// Handle events
588
AbstractEvent event = EventSerializer.fromBuffer(
589
bufferAndAvailability.buffer(),
590
getClass().getClassLoader()
591
);
592
593
bufferAndAvailability.buffer().recycleBuffer();
594
595
// Check if this is an end-of-partition event
596
if (event.getClass() == EndOfPartitionEvent.class) {
597
channelToRead.releaseAllResources();
598
checkForFinished();
599
}
600
601
return Optional.of(new BufferOrEvent(
602
event,
603
channelToRead.getChannelIndex()
604
));
605
}
606
}
607
}
608
}
609
610
// Wait for data availability
611
waitForData();
612
}
613
}
614
615
@Override
616
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
617
synchronized (lock) {
618
if (finished) {
619
return Optional.empty();
620
}
621
622
InputChannel channelToRead = getChannelWithData();
623
if (channelToRead != null) {
624
Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();
625
if (result.isPresent()) {
626
// Similar processing as in getNext()
627
return processBuffer(result.get(), channelToRead);
628
}
629
}
630
631
return Optional.empty();
632
}
633
}
634
635
@Override
636
public void sendTaskEvent(TaskEvent event) throws IOException {
637
for (InputChannel channel : inputChannels) {
638
channel.sendTaskEvent(event);
639
}
640
}
641
642
@Override
643
public void registerListener(InputGateListener listener) {
644
// Implementation for listener registration
645
this.inputGateListener = listener;
646
}
647
648
@Override
649
public int getPageSize() {
650
return 32768; // Default page size
651
}
652
653
@Override
654
public void close() throws Exception {
655
synchronized (lock) {
656
finished = true;
657
658
for (InputChannel channel : inputChannels) {
659
try {
660
channel.releaseAllResources();
661
} catch (Exception e) {
662
// Log but continue cleanup
663
System.err.println("Error releasing channel: " + e.getMessage());
664
}
665
}
666
}
667
}
668
669
private InputChannel getChannelWithData() {
670
Iterator<InputChannel> iterator = channelsWithData.iterator();
671
if (iterator.hasNext()) {
672
InputChannel channel = iterator.next();
673
iterator.remove();
674
return channel;
675
}
676
return null;
677
}
678
679
private void waitForData() throws InterruptedException {
680
synchronized (lock) {
681
while (channelsWithData.isEmpty() && !finished) {
682
lock.wait();
683
}
684
}
685
}
686
687
private void checkForFinished() {
688
boolean allFinished = true;
689
for (InputChannel channel : inputChannels) {
690
if (!channel.isReleased()) {
691
allFinished = false;
692
break;
693
}
694
}
695
696
if (allFinished) {
697
finished = true;
698
synchronized (lock) {
699
lock.notifyAll();
700
}
701
}
702
}
703
704
public void notifyChannelNonEmpty(InputChannel channel) {
705
synchronized (lock) {
706
channelsWithData.add(channel);
707
lock.notifyAll();
708
}
709
710
// Notify input gate listener
711
if (inputGateListener != null) {
712
inputGateListener.notifyInputGateNonEmpty(this);
713
}
714
}
715
}
716
```
717
718
### Buffer Pool Management
719
720
```java
721
import org.apache.flink.runtime.io.network.buffer.BufferPool;
722
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
723
724
public class BufferPoolManager {
725
private final NetworkBufferPool networkBufferPool;
726
private final Map<String, BufferPool> bufferPools = new ConcurrentHashMap<>();
727
728
public BufferPoolManager(int totalBuffers, int bufferSize) {
729
this.networkBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
730
}
731
732
public BufferPool createBufferPool(String poolName, int minBuffers, int maxBuffers) throws IOException {
733
BufferPool bufferPool = networkBufferPool.createBufferPool(minBuffers, maxBuffers);
734
bufferPools.put(poolName, bufferPool);
735
736
System.out.println("Created buffer pool '" + poolName + "' with " +
737
minBuffers + "-" + maxBuffers + " buffers");
738
739
return bufferPool;
740
}
741
742
public void destroyBufferPool(String poolName) {
743
BufferPool bufferPool = bufferPools.remove(poolName);
744
if (bufferPool != null) {
745
bufferPool.close();
746
System.out.println("Destroyed buffer pool '" + poolName + "'");
747
}
748
}
749
750
public void printBufferPoolStats() {
751
System.out.println("=== Buffer Pool Statistics ===");
752
System.out.println("Network Pool - Total: " + networkBufferPool.getTotalNumberOfMemorySegments() +
753
", Available: " + networkBufferPool.getNumberOfAvailableMemorySegments());
754
755
for (Map.Entry<String, BufferPool> entry : bufferPools.entrySet()) {
756
BufferPool pool = entry.getValue();
757
System.out.println("Pool '" + entry.getKey() + "' - " +
758
"Requested: " + pool.getNumberOfRequestedMemorySegments() +
759
", Available: " + pool.getNumberOfAvailableMemorySegments() +
760
", Max: " + pool.getMaxNumberOfMemorySegments());
761
}
762
}
763
764
public void shutdown() {
765
// Close all buffer pools
766
for (BufferPool pool : bufferPools.values()) {
767
pool.close();
768
}
769
bufferPools.clear();
770
771
// Close network buffer pool
772
networkBufferPool.destroyAllBufferPools();
773
networkBufferPool.destroy();
774
}
775
}
776
```
777
778
## Common Patterns
779
780
### Backpressure Handling
781
782
```java
783
public class BackpressureAwareWriter {
784
private final ResultPartitionWriter writer;
785
private final AtomicLong backpressureCount = new AtomicLong(0);
786
private final long maxBackpressureWait = 5000; // 5 seconds
787
788
public BackpressureAwareWriter(ResultPartitionWriter writer) {
789
this.writer = writer;
790
}
791
792
public void writeWithBackpressure(ByteBuffer data) throws IOException, InterruptedException {
793
long startTime = System.currentTimeMillis();
794
795
while (true) {
796
try {
797
BufferBuilder bufferBuilder = writer.tryGetBufferBuilder();
798
if (bufferBuilder != null) {
799
// Successfully got buffer, write data
800
boolean success = bufferBuilder.append(data);
801
if (success) {
802
bufferBuilder.finish();
803
return;
804
} else {
805
// Buffer full, need another buffer
806
bufferBuilder.finish();
807
continue;
808
}
809
}
810
811
// No buffer available - backpressure
812
long elapsedTime = System.currentTimeMillis() - startTime;
813
if (elapsedTime > maxBackpressureWait) {
814
throw new IOException("Backpressure timeout: unable to get buffer after " +
815
maxBackpressureWait + "ms");
816
}
817
818
backpressureCount.incrementAndGet();
819
Thread.sleep(10); // Brief wait before retry
820
821
} catch (IOException e) {
822
throw e;
823
} catch (Exception e) {
824
throw new IOException("Failed to write data", e);
825
}
826
}
827
}
828
829
public long getBackpressureCount() {
830
return backpressureCount.get();
831
}
832
}
833
```
834
835
### Network Metrics Collection
836
837
```java
838
public class NetworkMetricsCollector {
839
private final Counter buffersRequested;
840
private final Counter buffersRecycled;
841
private final Histogram bufferWaitTime;
842
private final Gauge<Integer> buffersInUse;
843
844
private final AtomicInteger currentBuffersInUse = new AtomicInteger(0);
845
846
public NetworkMetricsCollector(MetricGroup metricGroup) {
847
this.buffersRequested = metricGroup.counter("buffers_requested");
848
this.buffersRecycled = metricGroup.counter("buffers_recycled");
849
this.bufferWaitTime = metricGroup.histogram("buffer_wait_time_ms",
850
new DescriptiveStatisticsHistogram(1000));
851
this.buffersInUse = metricGroup.gauge("buffers_in_use", currentBuffersInUse::get);
852
}
853
854
public void recordBufferRequest(long waitTimeMs) {
855
buffersRequested.inc();
856
bufferWaitTime.update(waitTimeMs);
857
currentBuffersInUse.incrementAndGet();
858
}
859
860
public void recordBufferRecycle() {
861
buffersRecycled.inc();
862
currentBuffersInUse.decrementAndGet();
863
}
864
865
public NetworkStats getNetworkStats() {
866
NetworkStats stats = new NetworkStats();
867
stats.buffersRequested = buffersRequested.getCount();
868
stats.buffersRecycled = buffersRecycled.getCount();
869
stats.buffersInUse = currentBuffersInUse.get();
870
stats.avgBufferWaitTime = bufferWaitTime.getStatistics().getMean();
871
return stats;
872
}
873
874
public static class NetworkStats {
875
public long buffersRequested;
876
public long buffersRecycled;
877
public int buffersInUse;
878
public double avgBufferWaitTime;
879
}
880
}
881
```