or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

data-exchange.mddocs/

0

# Data Exchange and Networking

1

2

The Data Exchange and Networking system provides the fundamental infrastructure for inter-task communication in Flink clusters. This system defines data exchange patterns, result partition types, and networking mechanisms that enable efficient data transfer between operators across the distributed execution environment.

3

4

## Data Exchange Modes

5

6

### DataExchangeMode

7

8

Enumeration defining different data exchange patterns that control how data flows between operators.

9

10

```java { .api }

11

public enum DataExchangeMode {

12

PIPELINED, // Streamed data exchange with back-pressure

13

BATCH, // Decoupled data exchange with full result materialization

14

PIPELINE_WITH_BATCH_FALLBACK; // Pipelined with batch fallback for recovery

15

16

public static DataExchangeMode getForForwardExchange(ExecutionMode mode);

17

public static DataExchangeMode getForShuffleOrBroadcast(ExecutionMode mode);

18

public static DataExchangeMode getPipelineBreakingExchange(ExecutionMode mode);

19

public static DataExchangeMode select(ExecutionMode executionMode, ShipStrategyType shipStrategy, boolean breakPipeline);

20

}

21

```

22

23

### DistributionPattern

24

25

Defines how data is distributed between upstream and downstream operators.

26

27

```java { .api }

28

public enum DistributionPattern {

29

POINTWISE, // Each upstream subtask sends data to one downstream subtask

30

ALL_TO_ALL; // Each upstream subtask sends data to all downstream subtasks

31

32

public boolean isPointwise();

33

public boolean isAllToAll();

34

}

35

```

36

37

## Result Partition System

38

39

### ResultPartitionType

40

41

Enumeration of result partition types that determine data exchange characteristics and buffering behavior.

42

43

```java { .api }

44

public enum ResultPartitionType {

45

BLOCKING(true, false, false), // Data is fully produced before consumption

46

PIPELINED(false, true, false), // Data is consumed as it's produced

47

PIPELINED_BOUNDED(false, true, true); // Pipelined with bounded buffers

48

49

private final boolean isBlocking;

50

private final boolean isPipelined;

51

private final boolean isBounded;

52

53

public boolean isBlocking();

54

public boolean isPipelined();

55

public boolean isBounded();

56

57

public boolean hasBackPressure();

58

public boolean requiresFiniteStreams();

59

}

60

```

61

62

### ResultPartition

63

64

Represents a partition of results produced by an operator that can be consumed by downstream tasks.

65

66

```java { .api }

67

public class ResultPartition implements AutoCloseable {

68

public ResultPartition(

69

String owningTaskName,

70

TaskActions taskActions,

71

JobID jobId,

72

ResultPartitionID partitionId,

73

ResultPartitionType partitionType,

74

int numberOfSubpartitions,

75

int numberOfQueuedBuffers,

76

ResultPartitionManager partitionManager,

77

@Nullable ResultPartitionMetrics metrics

78

);

79

80

public void setup() throws IOException;

81

public void finish() throws IOException;

82

public void release();

83

public void release(Throwable cause);

84

85

public BufferBuilder getBufferBuilder() throws IOException, InterruptedException;

86

public BufferBuilder tryGetBufferBuilder() throws IOException;

87

88

public void flushAll();

89

public void flush(int targetSubpartition);

90

91

public ResultPartitionID getPartitionId();

92

public ResultPartitionType getResultType();

93

public int getNumberOfSubpartitions();

94

95

public boolean isReleased();

96

public Throwable getFailureCause();

97

98

@Override

99

public void close();

100

}

101

```

102

103

### IntermediateDataSet

104

105

Represents a data set produced by one job vertex and consumed by another, defining the connection in the job graph.

106

107

```java { .api }

108

public class IntermediateDataSet implements Serializable {

109

public IntermediateDataSet(IntermediateDataSetID id, ResultPartitionType resultType, JobVertex producer);

110

111

public IntermediateDataSetID getId();

112

public JobVertex getProducer();

113

public List<JobEdge> getConsumers();

114

115

public ResultPartitionType getResultType();

116

117

public void addConsumer(JobEdge edge);

118

119

public int getConsumerParallelism();

120

public DistributionPattern getDistributionPattern();

121

}

122

```

123

124

## Networking Infrastructure

125

126

### ResultPartitionWriter

127

128

Interface for writing data to result partitions, providing the output mechanism for operators.

129

130

```java { .api }

131

public interface ResultPartitionWriter extends AutoCloseable {

132

ResultPartition getPartition();

133

134

BufferBuilder getBufferBuilder() throws IOException, InterruptedException;

135

BufferBuilder tryGetBufferBuilder() throws IOException;

136

137

void flushAll();

138

void flushAllSubpartitions(boolean finishProducers);

139

140

void fail(Throwable throwable);

141

void finish() throws IOException;

142

143

@Override

144

void close();

145

}

146

```

147

148

### InputGate

149

150

Abstract base class for input gates that manage reading data from multiple input channels.

151

152

```java { .api }

153

public abstract class InputGate implements AutoCloseable {

154

public abstract int getNumberOfInputChannels();

155

public abstract boolean isFinished();

156

157

public abstract Optional<BufferOrEvent> getNext() throws IOException, InterruptedException;

158

public abstract Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException;

159

160

public abstract void sendTaskEvent(TaskEvent event) throws IOException;

161

public abstract void registerListener(InputGateListener listener);

162

163

public abstract int getPageSize();

164

165

@Override

166

public abstract void close() throws Exception;

167

}

168

```

169

170

### InputChannel

171

172

Abstract base class representing an input channel that reads data from a specific result partition.

173

174

```java { .api }

175

public abstract class InputChannel {

176

protected final InputGate inputGate;

177

protected final int channelIndex;

178

protected final ResultPartitionID partitionId;

179

protected final int initialBackoff;

180

protected final int maxBackoff;

181

182

public InputChannel(

183

InputGate inputGate,

184

int channelIndex,

185

ResultPartitionID partitionId,

186

int initialBackoff,

187

int maxBackoff

188

);

189

190

public int getChannelIndex();

191

public ResultPartitionID getPartitionId();

192

193

public abstract Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException;

194

195

public abstract void sendTaskEvent(TaskEvent event) throws IOException;

196

197

public abstract boolean isReleased();

198

public abstract void releaseAllResources() throws IOException;

199

200

public abstract int getBuffersInUseCount();

201

}

202

```

203

204

## Buffer Management

205

206

### Buffer

207

208

Interface representing a buffer containing data or events in the network stack.

209

210

```java { .api }

211

public interface Buffer extends AutoCloseable {

212

boolean isBuffer();

213

boolean isEvent();

214

215

MemorySegment getMemorySegment();

216

int getMemorySegmentOffset();

217

BufferRecycler getRecycler();

218

219

void recycleBuffer();

220

boolean isRecycled();

221

222

Buffer retainBuffer();

223

Buffer readOnlySlice();

224

Buffer readOnlySlice(int index, int length);

225

226

int getMaxCapacity();

227

int getSize();

228

void setSize(int writerIndex);

229

230

int getReaderIndex();

231

void setReaderIndex(int readerIndex);

232

233

ByteBuffer getNioBufferReadable();

234

ByteBuffer getNioBuffer(int index, int length);

235

236

@Override

237

void close();

238

}

239

```

240

241

### BufferBuilder

242

243

Interface for building buffers by appending data incrementally.

244

245

```java { .api }

246

public interface BufferBuilder extends AutoCloseable {

247

boolean append(ByteBuffer source) throws IOException;

248

boolean appendAndCommit(ByteBuffer source) throws IOException;

249

250

BufferConsumer createBufferConsumer();

251

boolean isFull();

252

boolean isFinished();

253

254

int getWrittenBytes();

255

int getMaxCapacity();

256

257

void finish();

258

259

@Override

260

void close();

261

}

262

```

263

264

### BufferPool

265

266

Interface for managing pools of network buffers to control memory usage.

267

268

```java { .api }

269

public interface BufferPool extends AutoCloseable {

270

Buffer requestBuffer() throws IOException;

271

Buffer requestBuffer(boolean isBlocking) throws IOException, InterruptedException;

272

BufferBuilder requestBufferBuilder() throws IOException;

273

BufferBuilder requestBufferBuilder(boolean isBlocking) throws IOException, InterruptedException;

274

275

void recycle(MemorySegment memorySegment);

276

277

boolean addBufferListener(BufferListener listener);

278

279

boolean isDestroyed();

280

281

int getNumberOfRequestedMemorySegments();

282

int getNumberOfAvailableMemorySegments();

283

int getNumBuffers();

284

int getMaxNumberOfMemorySegments();

285

286

void setNumBuffers(int numBuffers) throws IOException;

287

288

@Override

289

void close();

290

}

291

```

292

293

## Task Events and Communication

294

295

### TaskEvent

296

297

Base class for events that can be sent between tasks through the network stack.

298

299

```java { .api }

300

public abstract class TaskEvent implements Serializable {

301

public static final int MAX_SIZE = 1024;

302

303

// Subclasses implement specific event types

304

}

305

```

306

307

### BufferOrEvent

308

309

Container that holds either a data buffer or a task event from the network stack.

310

311

```java { .api }

312

public class BufferOrEvent {

313

public BufferOrEvent(Buffer buffer, int channelIndex);

314

public BufferOrEvent(AbstractEvent event, int channelIndex);

315

316

public boolean isBuffer();

317

public boolean isEvent();

318

319

public Buffer getBuffer();

320

public AbstractEvent getEvent();

321

322

public int getChannelIndex();

323

324

public void recycleBuffer();

325

326

public Optional<Buffer> getOptionalBuffer();

327

public Optional<AbstractEvent> getOptionalEvent();

328

}

329

```

330

331

### InputGateListener

332

333

Interface for listening to input gate events and buffer availability.

334

335

```java { .api }

336

public interface InputGateListener {

337

void notifyInputGateNonEmpty(InputGate inputGate);

338

}

339

```

340

341

## Partitioning Strategies

342

343

### ChannelSelector

344

345

Interface for selecting output channels based on record content, implementing different partitioning strategies.

346

347

```java { .api }

348

public interface ChannelSelector<T> {

349

int[] selectChannels(T record, int numberOfOutputChannels);

350

351

boolean isBroadcast();

352

}

353

```

354

355

### OutputEmitter

356

357

Manages the emission of records to downstream tasks with appropriate partitioning.

358

359

```java { .api }

360

public class OutputEmitter<T> {

361

public OutputEmitter(

362

ShipStrategyType strategy,

363

ChannelSelector<T> channelSelector

364

);

365

366

public void emit(T record) throws IOException, InterruptedException;

367

public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException;

368

369

public void flush() throws IOException;

370

public void close();

371

372

public void clearBuffers();

373

374

public ShipStrategyType getShipStrategy();

375

}

376

```

377

378

## Shipping Strategies

379

380

### ShipStrategyType

381

382

Enumeration of data shipping strategies that determine how records are distributed to downstream tasks.

383

384

```java { .api }

385

public enum ShipStrategyType {

386

FORWARD, // Direct forwarding to single downstream task

387

BROADCAST, // Send to all downstream tasks

388

PARTITION_HASH, // Hash-based partitioning

389

PARTITION_RANGE,// Range-based partitioning

390

PARTITION_RANDOM,// Random distribution

391

PARTITION_FORCED_REBALANCE, // Forced rebalancing

392

PARTITION_CUSTOM; // Custom partitioning logic

393

394

public boolean isNetworkStrategy();

395

public boolean isForward();

396

public boolean isBroadcast();

397

public boolean isPartitioned();

398

}

399

```

400

401

## Usage Examples

402

403

### Configuring Data Exchange Patterns

404

405

```java

406

import org.apache.flink.runtime.io.network.DataExchangeMode;

407

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;

408

import org.apache.flink.runtime.jobgraph.*;

409

410

// Create job vertices

411

JobVertex sourceVertex = new JobVertex("Source");

412

JobVertex mapVertex = new JobVertex("Map");

413

JobVertex sinkVertex = new JobVertex("Sink");

414

415

// Configure different result partition types

416

IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);

417

IntermediateDataSet mapOutput = mapVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);

418

419

// Create edges with different distribution patterns

420

JobEdge sourceToMap = new JobEdge(sourceOutput, mapVertex, DistributionPattern.ALL_TO_ALL);

421

sourceToMap.setShipStrategy(ShipStrategyType.PARTITION_HASH);

422

423

JobEdge mapToSink = new JobEdge(mapOutput, sinkVertex, DistributionPattern.FORWARD);

424

mapToSink.setShipStrategy(ShipStrategyType.FORWARD);

425

426

// Add edges to job graph

427

JobGraph jobGraph = new JobGraph("Data Exchange Example");

428

jobGraph.addVertex(sourceVertex);

429

jobGraph.addVertex(mapVertex);

430

jobGraph.addVertex(sinkVertex);

431

jobGraph.addEdge(sourceToMap);

432

jobGraph.addEdge(mapToSink);

433

434

// Configure global data exchange mode

435

Configuration jobConfig = new Configuration();

436

jobConfig.setString("execution.batch-shuffle-mode", DataExchangeMode.ALL_EDGES_BLOCKING.toString());

437

jobGraph.setJobConfiguration(jobConfig);

438

```

439

440

### Custom Result Partition Writer

441

442

```java

443

import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;

444

import org.apache.flink.runtime.io.network.buffer.BufferBuilder;

445

446

public class CustomResultPartitionWriter implements ResultPartitionWriter {

447

private final ResultPartition partition;

448

private final BufferPool bufferPool;

449

private volatile boolean finished = false;

450

451

public CustomResultPartitionWriter(ResultPartition partition, BufferPool bufferPool) {

452

this.partition = partition;

453

this.bufferPool = bufferPool;

454

}

455

456

@Override

457

public ResultPartition getPartition() {

458

return partition;

459

}

460

461

@Override

462

public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {

463

if (finished) {

464

throw new IllegalStateException("Writer has been finished");

465

}

466

467

// Request buffer from pool

468

BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(true);

469

if (bufferBuilder == null) {

470

throw new IOException("Failed to get buffer from pool");

471

}

472

473

return bufferBuilder;

474

}

475

476

@Override

477

public BufferBuilder tryGetBufferBuilder() throws IOException {

478

if (finished) {

479

return null;

480

}

481

482

return bufferPool.requestBufferBuilder(false);

483

}

484

485

@Override

486

public void flushAll() {

487

partition.flushAll();

488

}

489

490

@Override

491

public void flushAllSubpartitions(boolean finishProducers) {

492

partition.flushAll();

493

if (finishProducers) {

494

try {

495

finish();

496

} catch (IOException e) {

497

throw new RuntimeException("Failed to finish partition", e);

498

}

499

}

500

}

501

502

@Override

503

public void fail(Throwable throwable) {

504

finished = true;

505

partition.fail(throwable);

506

}

507

508

@Override

509

public void finish() throws IOException {

510

if (!finished) {

511

finished = true;

512

partition.finish();

513

}

514

}

515

516

@Override

517

public void close() {

518

try {

519

finish();

520

} catch (IOException e) {

521

// Log error but don't throw in close()

522

System.err.println("Error finishing partition during close: " + e.getMessage());

523

}

524

}

525

526

public boolean isFinished() {

527

return finished;

528

}

529

}

530

```

531

532

### Input Gate Implementation

533

534

```java

535

import org.apache.flink.runtime.io.network.partition.consumer.InputGate;

536

import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;

537

538

public class CustomInputGate extends InputGate {

539

private final InputChannel[] inputChannels;

540

private final Set<InputChannel> channelsWithData = new LinkedHashSet<>();

541

private final Object lock = new Object();

542

private volatile boolean finished = false;

543

544

public CustomInputGate(String owningTaskName, JobID jobID,

545

IntermediateDataSetID consumedResultId,

546

ResultPartitionType consumedPartitionType,

547

int consumedSubpartitionIndex,

548

InputChannel[] inputChannels) {

549

super(owningTaskName, jobID, consumedResultId, consumedPartitionType, consumedSubpartitionIndex);

550

this.inputChannels = inputChannels;

551

}

552

553

@Override

554

public int getNumberOfInputChannels() {

555

return inputChannels.length;

556

}

557

558

@Override

559

public boolean isFinished() {

560

synchronized (lock) {

561

return finished;

562

}

563

}

564

565

@Override

566

public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {

567

while (true) {

568

synchronized (lock) {

569

if (finished) {

570

return Optional.empty();

571

}

572

573

// Check channels with available data

574

InputChannel channelToRead = getChannelWithData();

575

if (channelToRead != null) {

576

Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();

577

if (result.isPresent()) {

578

BufferAndAvailability bufferAndAvailability = result.get();

579

580

// Handle the buffer

581

if (bufferAndAvailability.buffer().isBuffer()) {

582

return Optional.of(new BufferOrEvent(

583

bufferAndAvailability.buffer(),

584

channelToRead.getChannelIndex()

585

));

586

} else {

587

// Handle events

588

AbstractEvent event = EventSerializer.fromBuffer(

589

bufferAndAvailability.buffer(),

590

getClass().getClassLoader()

591

);

592

593

bufferAndAvailability.buffer().recycleBuffer();

594

595

// Check if this is an end-of-partition event

596

if (event.getClass() == EndOfPartitionEvent.class) {

597

channelToRead.releaseAllResources();

598

checkForFinished();

599

}

600

601

return Optional.of(new BufferOrEvent(

602

event,

603

channelToRead.getChannelIndex()

604

));

605

}

606

}

607

}

608

}

609

610

// Wait for data availability

611

waitForData();

612

}

613

}

614

615

@Override

616

public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {

617

synchronized (lock) {

618

if (finished) {

619

return Optional.empty();

620

}

621

622

InputChannel channelToRead = getChannelWithData();

623

if (channelToRead != null) {

624

Optional<BufferAndAvailability> result = channelToRead.getNextBuffer();

625

if (result.isPresent()) {

626

// Similar processing as in getNext()

627

return processBuffer(result.get(), channelToRead);

628

}

629

}

630

631

return Optional.empty();

632

}

633

}

634

635

@Override

636

public void sendTaskEvent(TaskEvent event) throws IOException {

637

for (InputChannel channel : inputChannels) {

638

channel.sendTaskEvent(event);

639

}

640

}

641

642

@Override

643

public void registerListener(InputGateListener listener) {

644

// Implementation for listener registration

645

this.inputGateListener = listener;

646

}

647

648

@Override

649

public int getPageSize() {

650

return 32768; // Default page size

651

}

652

653

@Override

654

public void close() throws Exception {

655

synchronized (lock) {

656

finished = true;

657

658

for (InputChannel channel : inputChannels) {

659

try {

660

channel.releaseAllResources();

661

} catch (Exception e) {

662

// Log but continue cleanup

663

System.err.println("Error releasing channel: " + e.getMessage());

664

}

665

}

666

}

667

}

668

669

private InputChannel getChannelWithData() {

670

Iterator<InputChannel> iterator = channelsWithData.iterator();

671

if (iterator.hasNext()) {

672

InputChannel channel = iterator.next();

673

iterator.remove();

674

return channel;

675

}

676

return null;

677

}

678

679

private void waitForData() throws InterruptedException {

680

synchronized (lock) {

681

while (channelsWithData.isEmpty() && !finished) {

682

lock.wait();

683

}

684

}

685

}

686

687

private void checkForFinished() {

688

boolean allFinished = true;

689

for (InputChannel channel : inputChannels) {

690

if (!channel.isReleased()) {

691

allFinished = false;

692

break;

693

}

694

}

695

696

if (allFinished) {

697

finished = true;

698

synchronized (lock) {

699

lock.notifyAll();

700

}

701

}

702

}

703

704

public void notifyChannelNonEmpty(InputChannel channel) {

705

synchronized (lock) {

706

channelsWithData.add(channel);

707

lock.notifyAll();

708

}

709

710

// Notify input gate listener

711

if (inputGateListener != null) {

712

inputGateListener.notifyInputGateNonEmpty(this);

713

}

714

}

715

}

716

```

717

718

### Buffer Pool Management

719

720

```java

721

import org.apache.flink.runtime.io.network.buffer.BufferPool;

722

import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;

723

724

public class BufferPoolManager {

725

private final NetworkBufferPool networkBufferPool;

726

private final Map<String, BufferPool> bufferPools = new ConcurrentHashMap<>();

727

728

public BufferPoolManager(int totalBuffers, int bufferSize) {

729

this.networkBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);

730

}

731

732

public BufferPool createBufferPool(String poolName, int minBuffers, int maxBuffers) throws IOException {

733

BufferPool bufferPool = networkBufferPool.createBufferPool(minBuffers, maxBuffers);

734

bufferPools.put(poolName, bufferPool);

735

736

System.out.println("Created buffer pool '" + poolName + "' with " +

737

minBuffers + "-" + maxBuffers + " buffers");

738

739

return bufferPool;

740

}

741

742

public void destroyBufferPool(String poolName) {

743

BufferPool bufferPool = bufferPools.remove(poolName);

744

if (bufferPool != null) {

745

bufferPool.close();

746

System.out.println("Destroyed buffer pool '" + poolName + "'");

747

}

748

}

749

750

public void printBufferPoolStats() {

751

System.out.println("=== Buffer Pool Statistics ===");

752

System.out.println("Network Pool - Total: " + networkBufferPool.getTotalNumberOfMemorySegments() +

753

", Available: " + networkBufferPool.getNumberOfAvailableMemorySegments());

754

755

for (Map.Entry<String, BufferPool> entry : bufferPools.entrySet()) {

756

BufferPool pool = entry.getValue();

757

System.out.println("Pool '" + entry.getKey() + "' - " +

758

"Requested: " + pool.getNumberOfRequestedMemorySegments() +

759

", Available: " + pool.getNumberOfAvailableMemorySegments() +

760

", Max: " + pool.getMaxNumberOfMemorySegments());

761

}

762

}

763

764

public void shutdown() {

765

// Close all buffer pools

766

for (BufferPool pool : bufferPools.values()) {

767

pool.close();

768

}

769

bufferPools.clear();

770

771

// Close network buffer pool

772

networkBufferPool.destroyAllBufferPools();

773

networkBufferPool.destroy();

774

}

775

}

776

```

777

778

## Common Patterns

779

780

### Backpressure Handling

781

782

```java

783

public class BackpressureAwareWriter {

784

private final ResultPartitionWriter writer;

785

private final AtomicLong backpressureCount = new AtomicLong(0);

786

private final long maxBackpressureWait = 5000; // 5 seconds

787

788

public BackpressureAwareWriter(ResultPartitionWriter writer) {

789

this.writer = writer;

790

}

791

792

public void writeWithBackpressure(ByteBuffer data) throws IOException, InterruptedException {

793

long startTime = System.currentTimeMillis();

794

795

while (true) {

796

try {

797

BufferBuilder bufferBuilder = writer.tryGetBufferBuilder();

798

if (bufferBuilder != null) {

799

// Successfully got buffer, write data

800

boolean success = bufferBuilder.append(data);

801

if (success) {

802

bufferBuilder.finish();

803

return;

804

} else {

805

// Buffer full, need another buffer

806

bufferBuilder.finish();

807

continue;

808

}

809

}

810

811

// No buffer available - backpressure

812

long elapsedTime = System.currentTimeMillis() - startTime;

813

if (elapsedTime > maxBackpressureWait) {

814

throw new IOException("Backpressure timeout: unable to get buffer after " +

815

maxBackpressureWait + "ms");

816

}

817

818

backpressureCount.incrementAndGet();

819

Thread.sleep(10); // Brief wait before retry

820

821

} catch (IOException e) {

822

throw e;

823

} catch (Exception e) {

824

throw new IOException("Failed to write data", e);

825

}

826

}

827

}

828

829

public long getBackpressureCount() {

830

return backpressureCount.get();

831

}

832

}

833

```

834

835

### Network Metrics Collection

836

837

```java

838

public class NetworkMetricsCollector {

839

private final Counter buffersRequested;

840

private final Counter buffersRecycled;

841

private final Histogram bufferWaitTime;

842

private final Gauge<Integer> buffersInUse;

843

844

private final AtomicInteger currentBuffersInUse = new AtomicInteger(0);

845

846

public NetworkMetricsCollector(MetricGroup metricGroup) {

847

this.buffersRequested = metricGroup.counter("buffers_requested");

848

this.buffersRecycled = metricGroup.counter("buffers_recycled");

849

this.bufferWaitTime = metricGroup.histogram("buffer_wait_time_ms",

850

new DescriptiveStatisticsHistogram(1000));

851

this.buffersInUse = metricGroup.gauge("buffers_in_use", currentBuffersInUse::get);

852

}

853

854

public void recordBufferRequest(long waitTimeMs) {

855

buffersRequested.inc();

856

bufferWaitTime.update(waitTimeMs);

857

currentBuffersInUse.incrementAndGet();

858

}

859

860

public void recordBufferRecycle() {

861

buffersRecycled.inc();

862

currentBuffersInUse.decrementAndGet();

863

}

864

865

public NetworkStats getNetworkStats() {

866

NetworkStats stats = new NetworkStats();

867

stats.buffersRequested = buffersRequested.getCount();

868

stats.buffersRecycled = buffersRecycled.getCount();

869

stats.buffersInUse = currentBuffersInUse.get();

870

stats.avgBufferWaitTime = bufferWaitTime.getStatistics().getMean();

871

return stats;

872

}

873

874

public static class NetworkStats {

875

public long buffersRequested;

876

public long buffersRecycled;

877

public int buffersInUse;

878

public double avgBufferWaitTime;

879

}

880

}

881

```