0
# Core Utilities
1
2
Apache Flink Core provides a rich set of utility classes and interfaces for common operations like I/O handling, memory management, filesystem operations, and general-purpose utilities. These components form the foundation for many Flink operations and can be used to build efficient applications.
3
4
## Collection and Iterator Utilities
5
6
### Collector Interface
7
8
The primary mechanism for emitting records in Flink functions.
9
10
```java { .api }
11
import org.apache.flink.util.Collector;
12
13
// Using collector in FlatMapFunction
14
public class TokenizerFunction implements FlatMapFunction<String, String> {
15
@Override
16
public void flatMap(String value, Collector<String> out) throws Exception {
17
for (String word : value.split("\\s+")) {
18
if (!word.isEmpty()) {
19
out.collect(word);
20
}
21
}
22
}
23
}
24
25
// Custom collector implementation
26
public class FilteringCollector<T> implements Collector<T> {
27
private final Collector<T> delegate;
28
private final Predicate<T> filter;
29
30
public FilteringCollector(Collector<T> delegate, Predicate<T> filter) {
31
this.delegate = delegate;
32
this.filter = filter;
33
}
34
35
@Override
36
public void collect(T record) {
37
if (filter.test(record)) {
38
delegate.collect(record);
39
}
40
}
41
42
@Override
43
public void close() {
44
delegate.close();
45
}
46
}
47
48
// Batching collector
49
public class BatchingCollector<T> implements Collector<T> {
50
private final Collector<List<T>> delegate;
51
private final int batchSize;
52
private final List<T> buffer;
53
54
public BatchingCollector(Collector<List<T>> delegate, int batchSize) {
55
this.delegate = delegate;
56
this.batchSize = batchSize;
57
this.buffer = new ArrayList<>(batchSize);
58
}
59
60
@Override
61
public void collect(T record) {
62
buffer.add(record);
63
if (buffer.size() >= batchSize) {
64
flush();
65
}
66
}
67
68
@Override
69
public void close() {
70
if (!buffer.isEmpty()) {
71
flush();
72
}
73
delegate.close();
74
}
75
76
private void flush() {
77
if (!buffer.isEmpty()) {
78
delegate.collect(new ArrayList<>(buffer));
79
buffer.clear();
80
}
81
}
82
}
83
```
84
85
### Closeable Iterators and Iterables
86
87
Handle resource cleanup for iterators backed by native resources.
88
89
```java { .api }
90
import org.apache.flink.util.CloseableIterator;
91
import org.apache.flink.util.CloseableIterable;
92
93
// File-based closeable iterator
94
public class FileLineIterator implements CloseableIterator<String> {
95
private final BufferedReader reader;
96
private String nextLine;
97
private boolean closed = false;
98
99
public FileLineIterator(Path filePath) throws IOException {
100
this.reader = Files.newBufferedReader(filePath);
101
this.nextLine = reader.readLine();
102
}
103
104
@Override
105
public boolean hasNext() {
106
return !closed && nextLine != null;
107
}
108
109
@Override
110
public String next() {
111
if (!hasNext()) {
112
throw new NoSuchElementException();
113
}
114
115
String current = nextLine;
116
try {
117
nextLine = reader.readLine();
118
} catch (IOException e) {
119
throw new RuntimeException("Error reading file", e);
120
}
121
122
return current;
123
}
124
125
@Override
126
public void close() throws IOException {
127
if (!closed) {
128
closed = true;
129
reader.close();
130
}
131
}
132
}
133
134
// Directory-based closeable iterable
135
public class DirectoryIterable implements CloseableIterable<Path> {
136
private final Path directory;
137
138
public DirectoryIterable(Path directory) {
139
this.directory = directory;
140
}
141
142
@Override
143
public CloseableIterator<Path> iterator() {
144
try {
145
Stream<Path> stream = Files.list(directory);
146
Iterator<Path> iterator = stream.iterator();
147
148
return new CloseableIterator<Path>() {
149
@Override
150
public boolean hasNext() {
151
return iterator.hasNext();
152
}
153
154
@Override
155
public Path next() {
156
return iterator.next();
157
}
158
159
@Override
160
public void close() {
161
stream.close();
162
}
163
};
164
} catch (IOException e) {
165
throw new RuntimeException("Error listing directory", e);
166
}
167
}
168
169
@Override
170
public void close() throws IOException {
171
// Nothing specific to close for directory itself
172
}
173
}
174
175
// Using closeable iterables safely
176
public class CloseableIterableExample {
177
178
public static void processFiles(Path directory) {
179
try (DirectoryIterable iterable = new DirectoryIterable(directory)) {
180
try (CloseableIterator<Path> iterator = iterable.iterator()) {
181
while (iterator.hasNext()) {
182
Path file = iterator.next();
183
System.out.println("Processing file: " + file);
184
185
// Process file with another closeable iterator
186
if (Files.isRegularFile(file)) {
187
try (FileLineIterator lineIterator = new FileLineIterator(file)) {
188
while (lineIterator.hasNext()) {
189
String line = lineIterator.next();
190
processLine(line);
191
}
192
}
193
}
194
}
195
}
196
} catch (IOException e) {
197
System.err.println("Error processing files: " + e.getMessage());
198
}
199
}
200
201
private static void processLine(String line) {
202
// Process individual line
203
System.out.println("Line: " + line);
204
}
205
}
206
```
207
208
## Memory and I/O Utilities
209
210
### Memory Management
211
212
```java { .api }
213
import org.apache.flink.core.memory.MemorySegment;
214
import org.apache.flink.core.memory.HybridMemorySegment;
215
216
// Working with memory segments
217
public class MemorySegmentExample {
218
219
public static void basicMemorySegmentOperations() {
220
// Allocate memory segment
221
byte[] buffer = new byte[1024];
222
MemorySegment segment = MemorySegment.wrap(buffer);
223
224
// Write primitives
225
segment.putInt(0, 42);
226
segment.putLong(4, 123456789L);
227
segment.putDouble(12, 3.14159);
228
229
// Write bytes
230
byte[] data = "Hello, Flink!".getBytes();
231
segment.put(20, data);
232
233
// Read primitives
234
int intValue = segment.getInt(0);
235
long longValue = segment.getLong(4);
236
double doubleValue = segment.getDouble(12);
237
238
// Read bytes
239
byte[] readData = new byte[data.length];
240
segment.get(20, readData);
241
String text = new String(readData);
242
243
System.out.println("Int: " + intValue);
244
System.out.println("Long: " + longValue);
245
System.out.println("Double: " + doubleValue);
246
System.out.println("Text: " + text);
247
}
248
249
public static void memorySegmentUtilities() {
250
MemorySegment segment1 = MemorySegment.wrap(new byte[100]);
251
MemorySegment segment2 = MemorySegment.wrap(new byte[100]);
252
253
// Fill with pattern
254
segment1.put(0, (byte) 0xAA, 50); // Fill first 50 bytes with 0xAA
255
256
// Copy between segments
257
segment1.copyTo(0, segment2, 0, 50);
258
259
// Compare segments
260
int comparison = segment1.compare(segment2, 0, 0, 50);
261
System.out.println("Segments equal: " + (comparison == 0));
262
263
// Swap bytes
264
segment1.swapBytes(new byte[100], segment2, 0, 0, 50);
265
}
266
}
267
```
268
269
### Input/Output Utilities
270
271
```java { .api }
272
import org.apache.flink.core.io.IOReadableWritable;
273
import org.apache.flink.core.memory.DataInputView;
274
import org.apache.flink.core.memory.DataOutputView;
275
276
// Custom serializable object
277
public class SerializableRecord implements IOReadableWritable {
278
private String name;
279
private int value;
280
private long timestamp;
281
282
public SerializableRecord() {
283
// Default constructor required
284
}
285
286
public SerializableRecord(String name, int value, long timestamp) {
287
this.name = name;
288
this.value = value;
289
this.timestamp = timestamp;
290
}
291
292
@Override
293
public void write(DataOutputView out) throws IOException {
294
out.writeUTF(name != null ? name : "");
295
out.writeInt(value);
296
out.writeLong(timestamp);
297
}
298
299
@Override
300
public void read(DataInputView in) throws IOException {
301
this.name = in.readUTF();
302
this.value = in.readInt();
303
this.timestamp = in.readLong();
304
305
// Handle empty string case
306
if (this.name.isEmpty()) {
307
this.name = null;
308
}
309
}
310
311
// Getters and setters
312
public String getName() { return name; }
313
public void setName(String name) { this.name = name; }
314
315
public int getValue() { return value; }
316
public void setValue(int value) { this.value = value; }
317
318
public long getTimestamp() { return timestamp; }
319
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
320
321
@Override
322
public String toString() {
323
return "SerializableRecord{name='" + name + "', value=" + value +
324
", timestamp=" + timestamp + "}";
325
}
326
}
327
328
// I/O utilities for serialization
329
public class IOUtils {
330
331
public static byte[] serialize(IOReadableWritable object) throws IOException {
332
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
333
DataOutputStream dos = new DataOutputStream(baos)) {
334
335
DataOutputView outputView = new DataOutputViewStreamWrapper(dos);
336
object.write(outputView);
337
338
return baos.toByteArray();
339
}
340
}
341
342
public static <T extends IOReadableWritable> T deserialize(byte[] data,
343
Class<T> clazz) throws IOException {
344
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
345
DataInputStream dis = new DataInputStream(bais)) {
346
347
T object = clazz.newInstance();
348
DataInputView inputView = new DataInputViewStreamWrapper(dis);
349
object.read(inputView);
350
351
return object;
352
} catch (InstantiationException | IllegalAccessException e) {
353
throw new IOException("Failed to instantiate object", e);
354
}
355
}
356
357
public static void serializeToFile(IOReadableWritable object, Path filePath)
358
throws IOException {
359
try (FileOutputStream fos = Files.newOutputStream(filePath);
360
DataOutputStream dos = new DataOutputStream(fos)) {
361
362
DataOutputView outputView = new DataOutputViewStreamWrapper(dos);
363
object.write(outputView);
364
}
365
}
366
367
public static <T extends IOReadableWritable> T deserializeFromFile(Path filePath,
368
Class<T> clazz) throws IOException {
369
try (FileInputStream fis = Files.newInputStream(filePath);
370
DataInputStream dis = new DataInputStream(fis)) {
371
372
T object = clazz.newInstance();
373
DataInputView inputView = new DataInputViewStreamWrapper(dis);
374
object.read(inputView);
375
376
return object;
377
} catch (InstantiationException | IllegalAccessException e) {
378
throw new IOException("Failed to instantiate object", e);
379
}
380
}
381
}
382
```
383
384
## Filesystem Operations
385
386
### Filesystem Abstractions
387
388
```java { .api }
389
import org.apache.flink.core.fs.FileSystem;
390
import org.apache.flink.core.fs.Path;
391
import org.apache.flink.core.fs.FSDataInputStream;
392
import org.apache.flink.core.fs.FSDataOutputStream;
393
394
// Filesystem operations
395
public class FileSystemUtils {
396
397
public static void fileSystemOperations() throws IOException {
398
// Get filesystem for path
399
Path remotePath = new Path("hdfs://namenode:8020/data/input.txt");
400
FileSystem fs = remotePath.getFileSystem();
401
402
// Check if file/directory exists
403
boolean exists = fs.exists(remotePath);
404
System.out.println("File exists: " + exists);
405
406
// Get file status
407
if (exists) {
408
FileStatus status = fs.getFileStatus(remotePath);
409
System.out.println("File size: " + status.getLen());
410
System.out.println("Is directory: " + status.isDir());
411
System.out.println("Modification time: " + status.getModificationTime());
412
}
413
414
// List files in directory
415
Path directory = new Path("hdfs://namenode:8020/data/");
416
if (fs.exists(directory)) {
417
FileStatus[] files = fs.listStatus(directory);
418
for (FileStatus file : files) {
419
System.out.println("File: " + file.getPath() +
420
" (size: " + file.getLen() + ")");
421
}
422
}
423
}
424
425
public static void readFromFileSystem() throws IOException {
426
Path inputPath = new Path("hdfs://namenode:8020/data/input.txt");
427
FileSystem fs = inputPath.getFileSystem();
428
429
try (FSDataInputStream inputStream = fs.open(inputPath);
430
BufferedReader reader = new BufferedReader(
431
new InputStreamReader(inputStream))) {
432
433
String line;
434
while ((line = reader.readLine()) != null) {
435
System.out.println("Read line: " + line);
436
}
437
}
438
}
439
440
public static void writeToFileSystem() throws IOException {
441
Path outputPath = new Path("hdfs://namenode:8020/data/output.txt");
442
FileSystem fs = outputPath.getFileSystem();
443
444
try (FSDataOutputStream outputStream = fs.create(outputPath,
445
FileSystem.WriteMode.OVERWRITE);
446
PrintWriter writer = new PrintWriter(
447
new OutputStreamWriter(outputStream))) {
448
449
writer.println("Hello, Flink FileSystem!");
450
writer.println("Writing to: " + outputPath);
451
}
452
}
453
454
public static void copyFiles() throws IOException {
455
Path sourcePath = new Path("file:///local/source.txt");
456
Path destPath = new Path("hdfs://namenode:8020/data/copied.txt");
457
458
FileSystem sourceFs = sourcePath.getFileSystem();
459
FileSystem destFs = destPath.getFileSystem();
460
461
try (FSDataInputStream input = sourceFs.open(sourcePath);
462
FSDataOutputStream output = destFs.create(destPath,
463
FileSystem.WriteMode.OVERWRITE)) {
464
465
byte[] buffer = new byte[8192];
466
int bytesRead;
467
while ((bytesRead = input.read(buffer)) != -1) {
468
output.write(buffer, 0, bytesRead);
469
}
470
}
471
}
472
473
public static void atomicFileOperations() throws IOException {
474
Path tempPath = new Path("hdfs://namenode:8020/data/temp_file.txt");
475
Path finalPath = new Path("hdfs://namenode:8020/data/final_file.txt");
476
477
FileSystem fs = tempPath.getFileSystem();
478
479
// Write to temporary file first
480
try (FSDataOutputStream output = fs.create(tempPath,
481
FileSystem.WriteMode.OVERWRITE);
482
PrintWriter writer = new PrintWriter(
483
new OutputStreamWriter(output))) {
484
485
writer.println("Critical data");
486
writer.println("Must be written atomically");
487
}
488
489
// Atomically rename to final location
490
if (fs.rename(tempPath, finalPath)) {
491
System.out.println("File written atomically");
492
} else {
493
System.err.println("Failed to rename file");
494
fs.delete(tempPath, false); // Clean up temp file
495
}
496
}
497
}
498
```
499
500
## Utility Classes and Helpers
501
502
### Auto-Closeable Management
503
504
```java { .api }
505
import org.apache.flink.util.AbstractAutoCloseableRegistry;
506
import org.apache.flink.util.AutoCloseableAsync;
507
508
// Resource registry for managing multiple resources
509
public class ResourceManager extends AbstractAutoCloseableRegistry<Closeable, IOException> {
510
511
@Override
512
protected void doClose() throws IOException {
513
IOException exception = null;
514
515
for (Closeable resource : getCloseableIterator()) {
516
try {
517
resource.close();
518
} catch (IOException e) {
519
if (exception == null) {
520
exception = e;
521
} else {
522
exception.addSuppressed(e);
523
}
524
}
525
}
526
527
if (exception != null) {
528
throw exception;
529
}
530
}
531
}
532
533
// Async closeable implementation
534
public class AsyncResource implements AutoCloseableAsync {
535
private final ExecutorService executor;
536
private volatile boolean closed = false;
537
538
public AsyncResource() {
539
this.executor = Executors.newSingleThreadExecutor();
540
}
541
542
public void doWork() {
543
if (closed) {
544
throw new IllegalStateException("Resource is closed");
545
}
546
547
executor.submit(() -> {
548
// Simulate async work
549
try {
550
Thread.sleep(1000);
551
System.out.println("Work completed");
552
} catch (InterruptedException e) {
553
Thread.currentThread().interrupt();
554
}
555
});
556
}
557
558
@Override
559
public CompletableFuture<Void> closeAsync() {
560
if (closed) {
561
return CompletableFuture.completedFuture(null);
562
}
563
564
closed = true;
565
566
return CompletableFuture.runAsync(() -> {
567
executor.shutdown();
568
try {
569
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
570
executor.shutdownNow();
571
}
572
} catch (InterruptedException e) {
573
executor.shutdownNow();
574
Thread.currentThread().interrupt();
575
}
576
});
577
}
578
}
579
580
// Usage example
581
public class ResourceManagementExample {
582
583
public static void manageResources() {
584
try (ResourceManager resourceManager = new ResourceManager()) {
585
586
// Register multiple resources
587
FileInputStream fis1 = new FileInputStream("file1.txt");
588
FileInputStream fis2 = new FileInputStream("file2.txt");
589
Socket socket = new Socket("localhost", 8080);
590
591
resourceManager.registerCloseable(fis1);
592
resourceManager.registerCloseable(fis2);
593
resourceManager.registerCloseable(socket);
594
595
// Use resources
596
processFiles(fis1, fis2);
597
communicateWithServer(socket);
598
599
// All resources will be closed automatically
600
601
} catch (IOException e) {
602
System.err.println("Error managing resources: " + e.getMessage());
603
}
604
}
605
606
public static void manageAsyncResources() {
607
AsyncResource resource1 = new AsyncResource();
608
AsyncResource resource2 = new AsyncResource();
609
610
try {
611
// Use resources
612
resource1.doWork();
613
resource2.doWork();
614
615
// Close asynchronously
616
CompletableFuture<Void> closeAll = CompletableFuture.allOf(
617
resource1.closeAsync(),
618
resource2.closeAsync()
619
);
620
621
closeAll.get(1, TimeUnit.MINUTES);
622
System.out.println("All async resources closed");
623
624
} catch (Exception e) {
625
System.err.println("Error managing async resources: " + e.getMessage());
626
}
627
}
628
629
private static void processFiles(FileInputStream... streams) {
630
// Process files
631
}
632
633
private static void communicateWithServer(Socket socket) {
634
// Communicate with server
635
}
636
}
637
```
638
639
### Reference Counting
640
641
```java { .api }
642
import org.apache.flink.util.RefCounted;
643
644
// Reference counted resource
645
public class RefCountedResource implements RefCounted {
646
private final AtomicInteger refCount = new AtomicInteger(1);
647
private final String resourceName;
648
private volatile boolean disposed = false;
649
650
public RefCountedResource(String resourceName) {
651
this.resourceName = resourceName;
652
System.out.println("Created resource: " + resourceName);
653
}
654
655
@Override
656
public void retain() {
657
if (disposed) {
658
throw new IllegalStateException("Resource already disposed: " + resourceName);
659
}
660
661
int newCount = refCount.incrementAndGet();
662
System.out.println("Retained " + resourceName + ", ref count: " + newCount);
663
}
664
665
@Override
666
public boolean release() {
667
if (disposed) {
668
return false;
669
}
670
671
int newCount = refCount.decrementAndGet();
672
System.out.println("Released " + resourceName + ", ref count: " + newCount);
673
674
if (newCount == 0) {
675
dispose();
676
return true;
677
} else if (newCount < 0) {
678
throw new IllegalStateException("Reference count became negative: " + newCount);
679
}
680
681
return false;
682
}
683
684
public void use() {
685
if (disposed) {
686
throw new IllegalStateException("Cannot use disposed resource: " + resourceName);
687
}
688
689
System.out.println("Using resource: " + resourceName);
690
// Simulate resource usage
691
}
692
693
private void dispose() {
694
if (!disposed) {
695
disposed = true;
696
System.out.println("Disposed resource: " + resourceName);
697
// Cleanup logic here
698
}
699
}
700
701
public boolean isDisposed() {
702
return disposed;
703
}
704
}
705
706
// Reference counted resource manager
707
public class RefCountedResourceManager {
708
709
public static void demonstrateRefCounting() {
710
RefCountedResource resource = new RefCountedResource("SharedBuffer");
711
712
// Simulate multiple consumers
713
Thread consumer1 = new Thread(() -> {
714
resource.retain();
715
try {
716
resource.use();
717
Thread.sleep(1000);
718
} catch (InterruptedException e) {
719
Thread.currentThread().interrupt();
720
} finally {
721
resource.release();
722
}
723
});
724
725
Thread consumer2 = new Thread(() -> {
726
resource.retain();
727
try {
728
resource.use();
729
Thread.sleep(1500);
730
} catch (InterruptedException e) {
731
Thread.currentThread().interrupt();
732
} finally {
733
resource.release();
734
}
735
});
736
737
consumer1.start();
738
consumer2.start();
739
740
// Original reference
741
try {
742
consumer1.join();
743
consumer2.join();
744
} catch (InterruptedException e) {
745
Thread.currentThread().interrupt();
746
}
747
748
// Release original reference
749
boolean disposed = resource.release();
750
System.out.println("Resource disposed: " + disposed);
751
}
752
}
753
```
754
755
## Functional Utilities
756
757
### Exception Handling
758
759
```java { .api }
760
import org.apache.flink.util.function.FunctionWithException;
761
import org.apache.flink.util.function.ConsumerWithException;
762
763
// Functional interfaces that can throw exceptions
764
public class FunctionalUtilities {
765
766
public static <T, R> Function<T, R> wrapFunction(FunctionWithException<T, R, Exception> function) {
767
return input -> {
768
try {
769
return function.apply(input);
770
} catch (Exception e) {
771
throw new RuntimeException("Function execution failed", e);
772
}
773
};
774
}
775
776
public static <T> Consumer<T> wrapConsumer(ConsumerWithException<T, Exception> consumer) {
777
return input -> {
778
try {
779
consumer.accept(input);
780
} catch (Exception e) {
781
throw new RuntimeException("Consumer execution failed", e);
782
}
783
};
784
}
785
786
public static void functionalExceptionHandling() {
787
List<String> filePaths = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
788
789
// Using wrapped function that can throw exceptions
790
List<Integer> lineCounts = filePaths.stream()
791
.map(wrapFunction(path -> {
792
try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {
793
return (int) reader.lines().count();
794
}
795
}))
796
.collect(Collectors.toList());
797
798
System.out.println("Line counts: " + lineCounts);
799
800
// Using wrapped consumer
801
filePaths.forEach(wrapConsumer(path -> {
802
try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {
803
long lines = reader.lines().count();
804
System.out.println(path + " has " + lines + " lines");
805
}
806
}));
807
}
808
}
809
```
810
811
### Visitor Pattern
812
813
```java { .api }
814
import org.apache.flink.util.Visitor;
815
import org.apache.flink.util.Visitable;
816
817
// Tree node that supports visitor pattern
818
public abstract class TreeNode implements Visitable<TreeNode> {
819
protected final String name;
820
821
public TreeNode(String name) {
822
this.name = name;
823
}
824
825
public String getName() {
826
return name;
827
}
828
}
829
830
public class LeafNode extends TreeNode {
831
private final String value;
832
833
public LeafNode(String name, String value) {
834
super(name);
835
this.value = value;
836
}
837
838
public String getValue() {
839
return value;
840
}
841
842
@Override
843
public void accept(Visitor<TreeNode> visitor) {
844
visitor.visit(this);
845
}
846
}
847
848
public class BranchNode extends TreeNode {
849
private final List<TreeNode> children;
850
851
public BranchNode(String name) {
852
super(name);
853
this.children = new ArrayList<>();
854
}
855
856
public void addChild(TreeNode child) {
857
children.add(child);
858
}
859
860
public List<TreeNode> getChildren() {
861
return children;
862
}
863
864
@Override
865
public void accept(Visitor<TreeNode> visitor) {
866
visitor.visit(this);
867
for (TreeNode child : children) {
868
child.accept(visitor);
869
}
870
}
871
}
872
873
// Visitor implementations
874
public class PrintVisitor implements Visitor<TreeNode> {
875
private int depth = 0;
876
877
@Override
878
public void visit(TreeNode node) {
879
String indent = " ".repeat(depth);
880
881
if (node instanceof LeafNode) {
882
LeafNode leaf = (LeafNode) node;
883
System.out.println(indent + leaf.getName() + ": " + leaf.getValue());
884
} else if (node instanceof BranchNode) {
885
BranchNode branch = (BranchNode) node;
886
System.out.println(indent + branch.getName() + "/");
887
depth++;
888
// Children will be visited automatically
889
depth--;
890
}
891
}
892
}
893
894
public class CountingVisitor implements Visitor<TreeNode> {
895
private int leafCount = 0;
896
private int branchCount = 0;
897
898
@Override
899
public void visit(TreeNode node) {
900
if (node instanceof LeafNode) {
901
leafCount++;
902
} else if (node instanceof BranchNode) {
903
branchCount++;
904
}
905
}
906
907
public int getLeafCount() {
908
return leafCount;
909
}
910
911
public int getBranchCount() {
912
return branchCount;
913
}
914
}
915
916
// Usage example
917
public class VisitorPatternExample {
918
919
public static void demonstrateVisitorPattern() {
920
// Build tree
921
BranchNode root = new BranchNode("root");
922
923
BranchNode config = new BranchNode("config");
924
config.addChild(new LeafNode("host", "localhost"));
925
config.addChild(new LeafNode("port", "8080"));
926
927
BranchNode data = new BranchNode("data");
928
data.addChild(new LeafNode("input", "/data/input"));
929
data.addChild(new LeafNode("output", "/data/output"));
930
931
root.addChild(config);
932
root.addChild(data);
933
934
// Use visitors
935
System.out.println("Tree structure:");
936
root.accept(new PrintVisitor());
937
938
CountingVisitor counter = new CountingVisitor();
939
root.accept(counter);
940
System.out.println("Branches: " + counter.getBranchCount());
941
System.out.println("Leaves: " + counter.getLeafCount());
942
}
943
}
944
```
945
946
Apache Flink's utility classes provide essential building blocks for efficient resource management, I/O operations, and common programming patterns. By leveraging these utilities, you can build more robust and efficient Flink applications while following established patterns for resource cleanup, exception handling, and data processing.