or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdevent-time-watermarks.mdexecution-jobs.mdfunctions-and-operators.mdindex.mdstate-management.mdtype-system-serialization.mdutilities.md

utilities.mddocs/

0

# Core Utilities

1

2

Apache Flink Core provides a rich set of utility classes and interfaces for common operations like I/O handling, memory management, filesystem operations, and general-purpose utilities. These components form the foundation for many Flink operations and can be used to build efficient applications.

3

4

## Collection and Iterator Utilities

5

6

### Collector Interface

7

8

The primary mechanism for emitting records in Flink functions.

9

10

```java { .api }

11

import org.apache.flink.util.Collector;

12

13

// Using collector in FlatMapFunction

14

public class TokenizerFunction implements FlatMapFunction<String, String> {

15

@Override

16

public void flatMap(String value, Collector<String> out) throws Exception {

17

for (String word : value.split("\\s+")) {

18

if (!word.isEmpty()) {

19

out.collect(word);

20

}

21

}

22

}

23

}

24

25

// Custom collector implementation

26

public class FilteringCollector<T> implements Collector<T> {

27

private final Collector<T> delegate;

28

private final Predicate<T> filter;

29

30

public FilteringCollector(Collector<T> delegate, Predicate<T> filter) {

31

this.delegate = delegate;

32

this.filter = filter;

33

}

34

35

@Override

36

public void collect(T record) {

37

if (filter.test(record)) {

38

delegate.collect(record);

39

}

40

}

41

42

@Override

43

public void close() {

44

delegate.close();

45

}

46

}

47

48

// Batching collector

49

public class BatchingCollector<T> implements Collector<T> {

50

private final Collector<List<T>> delegate;

51

private final int batchSize;

52

private final List<T> buffer;

53

54

public BatchingCollector(Collector<List<T>> delegate, int batchSize) {

55

this.delegate = delegate;

56

this.batchSize = batchSize;

57

this.buffer = new ArrayList<>(batchSize);

58

}

59

60

@Override

61

public void collect(T record) {

62

buffer.add(record);

63

if (buffer.size() >= batchSize) {

64

flush();

65

}

66

}

67

68

@Override

69

public void close() {

70

if (!buffer.isEmpty()) {

71

flush();

72

}

73

delegate.close();

74

}

75

76

private void flush() {

77

if (!buffer.isEmpty()) {

78

delegate.collect(new ArrayList<>(buffer));

79

buffer.clear();

80

}

81

}

82

}

83

```

84

85

### Closeable Iterators and Iterables

86

87

Handle resource cleanup for iterators backed by native resources.

88

89

```java { .api }

90

import org.apache.flink.util.CloseableIterator;

91

import org.apache.flink.util.CloseableIterable;

92

93

// File-based closeable iterator

94

public class FileLineIterator implements CloseableIterator<String> {

95

private final BufferedReader reader;

96

private String nextLine;

97

private boolean closed = false;

98

99

public FileLineIterator(Path filePath) throws IOException {

100

this.reader = Files.newBufferedReader(filePath);

101

this.nextLine = reader.readLine();

102

}

103

104

@Override

105

public boolean hasNext() {

106

return !closed && nextLine != null;

107

}

108

109

@Override

110

public String next() {

111

if (!hasNext()) {

112

throw new NoSuchElementException();

113

}

114

115

String current = nextLine;

116

try {

117

nextLine = reader.readLine();

118

} catch (IOException e) {

119

throw new RuntimeException("Error reading file", e);

120

}

121

122

return current;

123

}

124

125

@Override

126

public void close() throws IOException {

127

if (!closed) {

128

closed = true;

129

reader.close();

130

}

131

}

132

}

133

134

// Directory-based closeable iterable

135

public class DirectoryIterable implements CloseableIterable<Path> {

136

private final Path directory;

137

138

public DirectoryIterable(Path directory) {

139

this.directory = directory;

140

}

141

142

@Override

143

public CloseableIterator<Path> iterator() {

144

try {

145

Stream<Path> stream = Files.list(directory);

146

Iterator<Path> iterator = stream.iterator();

147

148

return new CloseableIterator<Path>() {

149

@Override

150

public boolean hasNext() {

151

return iterator.hasNext();

152

}

153

154

@Override

155

public Path next() {

156

return iterator.next();

157

}

158

159

@Override

160

public void close() {

161

stream.close();

162

}

163

};

164

} catch (IOException e) {

165

throw new RuntimeException("Error listing directory", e);

166

}

167

}

168

169

@Override

170

public void close() throws IOException {

171

// Nothing specific to close for directory itself

172

}

173

}

174

175

// Using closeable iterables safely

176

public class CloseableIterableExample {

177

178

public static void processFiles(Path directory) {

179

try (DirectoryIterable iterable = new DirectoryIterable(directory)) {

180

try (CloseableIterator<Path> iterator = iterable.iterator()) {

181

while (iterator.hasNext()) {

182

Path file = iterator.next();

183

System.out.println("Processing file: " + file);

184

185

// Process file with another closeable iterator

186

if (Files.isRegularFile(file)) {

187

try (FileLineIterator lineIterator = new FileLineIterator(file)) {

188

while (lineIterator.hasNext()) {

189

String line = lineIterator.next();

190

processLine(line);

191

}

192

}

193

}

194

}

195

}

196

} catch (IOException e) {

197

System.err.println("Error processing files: " + e.getMessage());

198

}

199

}

200

201

private static void processLine(String line) {

202

// Process individual line

203

System.out.println("Line: " + line);

204

}

205

}

206

```

207

208

## Memory and I/O Utilities

209

210

### Memory Management

211

212

```java { .api }

213

import org.apache.flink.core.memory.MemorySegment;

214

import org.apache.flink.core.memory.HybridMemorySegment;

215

216

// Working with memory segments

217

public class MemorySegmentExample {

218

219

public static void basicMemorySegmentOperations() {

220

// Allocate memory segment

221

byte[] buffer = new byte[1024];

222

MemorySegment segment = MemorySegment.wrap(buffer);

223

224

// Write primitives

225

segment.putInt(0, 42);

226

segment.putLong(4, 123456789L);

227

segment.putDouble(12, 3.14159);

228

229

// Write bytes

230

byte[] data = "Hello, Flink!".getBytes();

231

segment.put(20, data);

232

233

// Read primitives

234

int intValue = segment.getInt(0);

235

long longValue = segment.getLong(4);

236

double doubleValue = segment.getDouble(12);

237

238

// Read bytes

239

byte[] readData = new byte[data.length];

240

segment.get(20, readData);

241

String text = new String(readData);

242

243

System.out.println("Int: " + intValue);

244

System.out.println("Long: " + longValue);

245

System.out.println("Double: " + doubleValue);

246

System.out.println("Text: " + text);

247

}

248

249

public static void memorySegmentUtilities() {

250

MemorySegment segment1 = MemorySegment.wrap(new byte[100]);

251

MemorySegment segment2 = MemorySegment.wrap(new byte[100]);

252

253

// Fill with pattern

254

segment1.put(0, (byte) 0xAA, 50); // Fill first 50 bytes with 0xAA

255

256

// Copy between segments

257

segment1.copyTo(0, segment2, 0, 50);

258

259

// Compare segments

260

int comparison = segment1.compare(segment2, 0, 0, 50);

261

System.out.println("Segments equal: " + (comparison == 0));

262

263

// Swap bytes

264

segment1.swapBytes(new byte[100], segment2, 0, 0, 50);

265

}

266

}

267

```

268

269

### Input/Output Utilities

270

271

```java { .api }

272

import org.apache.flink.core.io.IOReadableWritable;

273

import org.apache.flink.core.memory.DataInputView;

274

import org.apache.flink.core.memory.DataOutputView;

275

276

// Custom serializable object

277

public class SerializableRecord implements IOReadableWritable {

278

private String name;

279

private int value;

280

private long timestamp;

281

282

public SerializableRecord() {

283

// Default constructor required

284

}

285

286

public SerializableRecord(String name, int value, long timestamp) {

287

this.name = name;

288

this.value = value;

289

this.timestamp = timestamp;

290

}

291

292

@Override

293

public void write(DataOutputView out) throws IOException {

294

out.writeUTF(name != null ? name : "");

295

out.writeInt(value);

296

out.writeLong(timestamp);

297

}

298

299

@Override

300

public void read(DataInputView in) throws IOException {

301

this.name = in.readUTF();

302

this.value = in.readInt();

303

this.timestamp = in.readLong();

304

305

// Handle empty string case

306

if (this.name.isEmpty()) {

307

this.name = null;

308

}

309

}

310

311

// Getters and setters

312

public String getName() { return name; }

313

public void setName(String name) { this.name = name; }

314

315

public int getValue() { return value; }

316

public void setValue(int value) { this.value = value; }

317

318

public long getTimestamp() { return timestamp; }

319

public void setTimestamp(long timestamp) { this.timestamp = timestamp; }

320

321

@Override

322

public String toString() {

323

return "SerializableRecord{name='" + name + "', value=" + value +

324

", timestamp=" + timestamp + "}";

325

}

326

}

327

328

// I/O utilities for serialization

329

public class IOUtils {

330

331

public static byte[] serialize(IOReadableWritable object) throws IOException {

332

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

333

DataOutputStream dos = new DataOutputStream(baos)) {

334

335

DataOutputView outputView = new DataOutputViewStreamWrapper(dos);

336

object.write(outputView);

337

338

return baos.toByteArray();

339

}

340

}

341

342

public static <T extends IOReadableWritable> T deserialize(byte[] data,

343

Class<T> clazz) throws IOException {

344

try (ByteArrayInputStream bais = new ByteArrayInputStream(data);

345

DataInputStream dis = new DataInputStream(bais)) {

346

347

T object = clazz.newInstance();

348

DataInputView inputView = new DataInputViewStreamWrapper(dis);

349

object.read(inputView);

350

351

return object;

352

} catch (InstantiationException | IllegalAccessException e) {

353

throw new IOException("Failed to instantiate object", e);

354

}

355

}

356

357

public static void serializeToFile(IOReadableWritable object, Path filePath)

358

throws IOException {

359

try (FileOutputStream fos = Files.newOutputStream(filePath);

360

DataOutputStream dos = new DataOutputStream(fos)) {

361

362

DataOutputView outputView = new DataOutputViewStreamWrapper(dos);

363

object.write(outputView);

364

}

365

}

366

367

public static <T extends IOReadableWritable> T deserializeFromFile(Path filePath,

368

Class<T> clazz) throws IOException {

369

try (FileInputStream fis = Files.newInputStream(filePath);

370

DataInputStream dis = new DataInputStream(fis)) {

371

372

T object = clazz.newInstance();

373

DataInputView inputView = new DataInputViewStreamWrapper(dis);

374

object.read(inputView);

375

376

return object;

377

} catch (InstantiationException | IllegalAccessException e) {

378

throw new IOException("Failed to instantiate object", e);

379

}

380

}

381

}

382

```

383

384

## Filesystem Operations

385

386

### Filesystem Abstractions

387

388

```java { .api }

389

import org.apache.flink.core.fs.FileSystem;

390

import org.apache.flink.core.fs.Path;

391

import org.apache.flink.core.fs.FSDataInputStream;

392

import org.apache.flink.core.fs.FSDataOutputStream;

393

394

// Filesystem operations

395

public class FileSystemUtils {

396

397

public static void fileSystemOperations() throws IOException {

398

// Get filesystem for path

399

Path remotePath = new Path("hdfs://namenode:8020/data/input.txt");

400

FileSystem fs = remotePath.getFileSystem();

401

402

// Check if file/directory exists

403

boolean exists = fs.exists(remotePath);

404

System.out.println("File exists: " + exists);

405

406

// Get file status

407

if (exists) {

408

FileStatus status = fs.getFileStatus(remotePath);

409

System.out.println("File size: " + status.getLen());

410

System.out.println("Is directory: " + status.isDir());

411

System.out.println("Modification time: " + status.getModificationTime());

412

}

413

414

// List files in directory

415

Path directory = new Path("hdfs://namenode:8020/data/");

416

if (fs.exists(directory)) {

417

FileStatus[] files = fs.listStatus(directory);

418

for (FileStatus file : files) {

419

System.out.println("File: " + file.getPath() +

420

" (size: " + file.getLen() + ")");

421

}

422

}

423

}

424

425

public static void readFromFileSystem() throws IOException {

426

Path inputPath = new Path("hdfs://namenode:8020/data/input.txt");

427

FileSystem fs = inputPath.getFileSystem();

428

429

try (FSDataInputStream inputStream = fs.open(inputPath);

430

BufferedReader reader = new BufferedReader(

431

new InputStreamReader(inputStream))) {

432

433

String line;

434

while ((line = reader.readLine()) != null) {

435

System.out.println("Read line: " + line);

436

}

437

}

438

}

439

440

public static void writeToFileSystem() throws IOException {

441

Path outputPath = new Path("hdfs://namenode:8020/data/output.txt");

442

FileSystem fs = outputPath.getFileSystem();

443

444

try (FSDataOutputStream outputStream = fs.create(outputPath,

445

FileSystem.WriteMode.OVERWRITE);

446

PrintWriter writer = new PrintWriter(

447

new OutputStreamWriter(outputStream))) {

448

449

writer.println("Hello, Flink FileSystem!");

450

writer.println("Writing to: " + outputPath);

451

}

452

}

453

454

public static void copyFiles() throws IOException {

455

Path sourcePath = new Path("file:///local/source.txt");

456

Path destPath = new Path("hdfs://namenode:8020/data/copied.txt");

457

458

FileSystem sourceFs = sourcePath.getFileSystem();

459

FileSystem destFs = destPath.getFileSystem();

460

461

try (FSDataInputStream input = sourceFs.open(sourcePath);

462

FSDataOutputStream output = destFs.create(destPath,

463

FileSystem.WriteMode.OVERWRITE)) {

464

465

byte[] buffer = new byte[8192];

466

int bytesRead;

467

while ((bytesRead = input.read(buffer)) != -1) {

468

output.write(buffer, 0, bytesRead);

469

}

470

}

471

}

472

473

public static void atomicFileOperations() throws IOException {

474

Path tempPath = new Path("hdfs://namenode:8020/data/temp_file.txt");

475

Path finalPath = new Path("hdfs://namenode:8020/data/final_file.txt");

476

477

FileSystem fs = tempPath.getFileSystem();

478

479

// Write to temporary file first

480

try (FSDataOutputStream output = fs.create(tempPath,

481

FileSystem.WriteMode.OVERWRITE);

482

PrintWriter writer = new PrintWriter(

483

new OutputStreamWriter(output))) {

484

485

writer.println("Critical data");

486

writer.println("Must be written atomically");

487

}

488

489

// Atomically rename to final location

490

if (fs.rename(tempPath, finalPath)) {

491

System.out.println("File written atomically");

492

} else {

493

System.err.println("Failed to rename file");

494

fs.delete(tempPath, false); // Clean up temp file

495

}

496

}

497

}

498

```

499

500

## Utility Classes and Helpers

501

502

### Auto-Closeable Management

503

504

```java { .api }

505

import org.apache.flink.util.AbstractAutoCloseableRegistry;

506

import org.apache.flink.util.AutoCloseableAsync;

507

508

// Resource registry for managing multiple resources

509

public class ResourceManager extends AbstractAutoCloseableRegistry<Closeable, IOException> {

510

511

@Override

512

protected void doClose() throws IOException {

513

IOException exception = null;

514

515

for (Closeable resource : getCloseableIterator()) {

516

try {

517

resource.close();

518

} catch (IOException e) {

519

if (exception == null) {

520

exception = e;

521

} else {

522

exception.addSuppressed(e);

523

}

524

}

525

}

526

527

if (exception != null) {

528

throw exception;

529

}

530

}

531

}

532

533

// Async closeable implementation

534

public class AsyncResource implements AutoCloseableAsync {

535

private final ExecutorService executor;

536

private volatile boolean closed = false;

537

538

public AsyncResource() {

539

this.executor = Executors.newSingleThreadExecutor();

540

}

541

542

public void doWork() {

543

if (closed) {

544

throw new IllegalStateException("Resource is closed");

545

}

546

547

executor.submit(() -> {

548

// Simulate async work

549

try {

550

Thread.sleep(1000);

551

System.out.println("Work completed");

552

} catch (InterruptedException e) {

553

Thread.currentThread().interrupt();

554

}

555

});

556

}

557

558

@Override

559

public CompletableFuture<Void> closeAsync() {

560

if (closed) {

561

return CompletableFuture.completedFuture(null);

562

}

563

564

closed = true;

565

566

return CompletableFuture.runAsync(() -> {

567

executor.shutdown();

568

try {

569

if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {

570

executor.shutdownNow();

571

}

572

} catch (InterruptedException e) {

573

executor.shutdownNow();

574

Thread.currentThread().interrupt();

575

}

576

});

577

}

578

}

579

580

// Usage example

581

public class ResourceManagementExample {

582

583

public static void manageResources() {

584

try (ResourceManager resourceManager = new ResourceManager()) {

585

586

// Register multiple resources

587

FileInputStream fis1 = new FileInputStream("file1.txt");

588

FileInputStream fis2 = new FileInputStream("file2.txt");

589

Socket socket = new Socket("localhost", 8080);

590

591

resourceManager.registerCloseable(fis1);

592

resourceManager.registerCloseable(fis2);

593

resourceManager.registerCloseable(socket);

594

595

// Use resources

596

processFiles(fis1, fis2);

597

communicateWithServer(socket);

598

599

// All resources will be closed automatically

600

601

} catch (IOException e) {

602

System.err.println("Error managing resources: " + e.getMessage());

603

}

604

}

605

606

public static void manageAsyncResources() {

607

AsyncResource resource1 = new AsyncResource();

608

AsyncResource resource2 = new AsyncResource();

609

610

try {

611

// Use resources

612

resource1.doWork();

613

resource2.doWork();

614

615

// Close asynchronously

616

CompletableFuture<Void> closeAll = CompletableFuture.allOf(

617

resource1.closeAsync(),

618

resource2.closeAsync()

619

);

620

621

closeAll.get(1, TimeUnit.MINUTES);

622

System.out.println("All async resources closed");

623

624

} catch (Exception e) {

625

System.err.println("Error managing async resources: " + e.getMessage());

626

}

627

}

628

629

private static void processFiles(FileInputStream... streams) {

630

// Process files

631

}

632

633

private static void communicateWithServer(Socket socket) {

634

// Communicate with server

635

}

636

}

637

```

638

639

### Reference Counting

640

641

```java { .api }

642

import org.apache.flink.util.RefCounted;

643

644

// Reference counted resource

645

public class RefCountedResource implements RefCounted {

646

private final AtomicInteger refCount = new AtomicInteger(1);

647

private final String resourceName;

648

private volatile boolean disposed = false;

649

650

public RefCountedResource(String resourceName) {

651

this.resourceName = resourceName;

652

System.out.println("Created resource: " + resourceName);

653

}

654

655

@Override

656

public void retain() {

657

if (disposed) {

658

throw new IllegalStateException("Resource already disposed: " + resourceName);

659

}

660

661

int newCount = refCount.incrementAndGet();

662

System.out.println("Retained " + resourceName + ", ref count: " + newCount);

663

}

664

665

@Override

666

public boolean release() {

667

if (disposed) {

668

return false;

669

}

670

671

int newCount = refCount.decrementAndGet();

672

System.out.println("Released " + resourceName + ", ref count: " + newCount);

673

674

if (newCount == 0) {

675

dispose();

676

return true;

677

} else if (newCount < 0) {

678

throw new IllegalStateException("Reference count became negative: " + newCount);

679

}

680

681

return false;

682

}

683

684

public void use() {

685

if (disposed) {

686

throw new IllegalStateException("Cannot use disposed resource: " + resourceName);

687

}

688

689

System.out.println("Using resource: " + resourceName);

690

// Simulate resource usage

691

}

692

693

private void dispose() {

694

if (!disposed) {

695

disposed = true;

696

System.out.println("Disposed resource: " + resourceName);

697

// Cleanup logic here

698

}

699

}

700

701

public boolean isDisposed() {

702

return disposed;

703

}

704

}

705

706

// Reference counted resource manager

707

public class RefCountedResourceManager {

708

709

public static void demonstrateRefCounting() {

710

RefCountedResource resource = new RefCountedResource("SharedBuffer");

711

712

// Simulate multiple consumers

713

Thread consumer1 = new Thread(() -> {

714

resource.retain();

715

try {

716

resource.use();

717

Thread.sleep(1000);

718

} catch (InterruptedException e) {

719

Thread.currentThread().interrupt();

720

} finally {

721

resource.release();

722

}

723

});

724

725

Thread consumer2 = new Thread(() -> {

726

resource.retain();

727

try {

728

resource.use();

729

Thread.sleep(1500);

730

} catch (InterruptedException e) {

731

Thread.currentThread().interrupt();

732

} finally {

733

resource.release();

734

}

735

});

736

737

consumer1.start();

738

consumer2.start();

739

740

// Original reference

741

try {

742

consumer1.join();

743

consumer2.join();

744

} catch (InterruptedException e) {

745

Thread.currentThread().interrupt();

746

}

747

748

// Release original reference

749

boolean disposed = resource.release();

750

System.out.println("Resource disposed: " + disposed);

751

}

752

}

753

```

754

755

## Functional Utilities

756

757

### Exception Handling

758

759

```java { .api }

760

import org.apache.flink.util.function.FunctionWithException;

761

import org.apache.flink.util.function.ConsumerWithException;

762

763

// Functional interfaces that can throw exceptions

764

public class FunctionalUtilities {

765

766

public static <T, R> Function<T, R> wrapFunction(FunctionWithException<T, R, Exception> function) {

767

return input -> {

768

try {

769

return function.apply(input);

770

} catch (Exception e) {

771

throw new RuntimeException("Function execution failed", e);

772

}

773

};

774

}

775

776

public static <T> Consumer<T> wrapConsumer(ConsumerWithException<T, Exception> consumer) {

777

return input -> {

778

try {

779

consumer.accept(input);

780

} catch (Exception e) {

781

throw new RuntimeException("Consumer execution failed", e);

782

}

783

};

784

}

785

786

public static void functionalExceptionHandling() {

787

List<String> filePaths = Arrays.asList("file1.txt", "file2.txt", "file3.txt");

788

789

// Using wrapped function that can throw exceptions

790

List<Integer> lineCounts = filePaths.stream()

791

.map(wrapFunction(path -> {

792

try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {

793

return (int) reader.lines().count();

794

}

795

}))

796

.collect(Collectors.toList());

797

798

System.out.println("Line counts: " + lineCounts);

799

800

// Using wrapped consumer

801

filePaths.forEach(wrapConsumer(path -> {

802

try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {

803

long lines = reader.lines().count();

804

System.out.println(path + " has " + lines + " lines");

805

}

806

}));

807

}

808

}

809

```

810

811

### Visitor Pattern

812

813

```java { .api }

814

import org.apache.flink.util.Visitor;

815

import org.apache.flink.util.Visitable;

816

817

// Tree node that supports visitor pattern

818

public abstract class TreeNode implements Visitable<TreeNode> {

819

protected final String name;

820

821

public TreeNode(String name) {

822

this.name = name;

823

}

824

825

public String getName() {

826

return name;

827

}

828

}

829

830

public class LeafNode extends TreeNode {

831

private final String value;

832

833

public LeafNode(String name, String value) {

834

super(name);

835

this.value = value;

836

}

837

838

public String getValue() {

839

return value;

840

}

841

842

@Override

843

public void accept(Visitor<TreeNode> visitor) {

844

visitor.visit(this);

845

}

846

}

847

848

public class BranchNode extends TreeNode {

849

private final List<TreeNode> children;

850

851

public BranchNode(String name) {

852

super(name);

853

this.children = new ArrayList<>();

854

}

855

856

public void addChild(TreeNode child) {

857

children.add(child);

858

}

859

860

public List<TreeNode> getChildren() {

861

return children;

862

}

863

864

@Override

865

public void accept(Visitor<TreeNode> visitor) {

866

visitor.visit(this);

867

for (TreeNode child : children) {

868

child.accept(visitor);

869

}

870

}

871

}

872

873

// Visitor implementations

874

public class PrintVisitor implements Visitor<TreeNode> {

875

private int depth = 0;

876

877

@Override

878

public void visit(TreeNode node) {

879

String indent = " ".repeat(depth);

880

881

if (node instanceof LeafNode) {

882

LeafNode leaf = (LeafNode) node;

883

System.out.println(indent + leaf.getName() + ": " + leaf.getValue());

884

} else if (node instanceof BranchNode) {

885

BranchNode branch = (BranchNode) node;

886

System.out.println(indent + branch.getName() + "/");

887

depth++;

888

// Children will be visited automatically

889

depth--;

890

}

891

}

892

}

893

894

public class CountingVisitor implements Visitor<TreeNode> {

895

private int leafCount = 0;

896

private int branchCount = 0;

897

898

@Override

899

public void visit(TreeNode node) {

900

if (node instanceof LeafNode) {

901

leafCount++;

902

} else if (node instanceof BranchNode) {

903

branchCount++;

904

}

905

}

906

907

public int getLeafCount() {

908

return leafCount;

909

}

910

911

public int getBranchCount() {

912

return branchCount;

913

}

914

}

915

916

// Usage example

917

public class VisitorPatternExample {

918

919

public static void demonstrateVisitorPattern() {

920

// Build tree

921

BranchNode root = new BranchNode("root");

922

923

BranchNode config = new BranchNode("config");

924

config.addChild(new LeafNode("host", "localhost"));

925

config.addChild(new LeafNode("port", "8080"));

926

927

BranchNode data = new BranchNode("data");

928

data.addChild(new LeafNode("input", "/data/input"));

929

data.addChild(new LeafNode("output", "/data/output"));

930

931

root.addChild(config);

932

root.addChild(data);

933

934

// Use visitors

935

System.out.println("Tree structure:");

936

root.accept(new PrintVisitor());

937

938

CountingVisitor counter = new CountingVisitor();

939

root.accept(counter);

940

System.out.println("Branches: " + counter.getBranchCount());

941

System.out.println("Leaves: " + counter.getLeafCount());

942

}

943

}

944

```

945

946

Apache Flink's utility classes provide essential building blocks for efficient resource management, I/O operations, and common programming patterns. By leveraging these utilities, you can build more robust and efficient Flink applications while following established patterns for resource cleanup, exception handling, and data processing.