0
# Network Utilities
1
2
Essential utilities for network operations and common Java tasks, providing helper methods for resource management, data conversion, and file operations in distributed environments.
3
4
## Capabilities
5
6
### JavaUtils Class
7
8
Core utility class providing static methods for common operations in network and distributed computing contexts.
9
10
```java { .api }
11
/**
12
* General utilities for network operations and common Java tasks
13
* Package: org.apache.spark.network.util
14
*/
15
public class JavaUtils {
16
17
/** Default driver memory size in megabytes */
18
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
19
20
/**
21
* Closes a Closeable resource without throwing exceptions
22
* Logs any IOException that occurs during closing
23
* @param closeable - Resource to close (can be null)
24
*/
25
public static void closeQuietly(Closeable closeable);
26
27
/**
28
* Returns a non-negative hash code for any object
29
* Handles null objects and ensures result is never negative
30
* @param obj - Object to hash (can be null)
31
* @return Non-negative hash code
32
*/
33
public static int nonNegativeHash(Object obj);
34
35
/**
36
* Converts a string to ByteBuffer using UTF-8 encoding
37
* @param s - String to convert
38
* @return ByteBuffer containing UTF-8 encoded string
39
*/
40
public static ByteBuffer stringToBytes(String s);
41
42
/**
43
* Converts a ByteBuffer to string using UTF-8 decoding
44
* @param b - ByteBuffer to convert
45
* @return Decoded string
46
*/
47
public static String bytesToString(ByteBuffer b);
48
49
/**
50
* Recursively deletes files and directories
51
* @param file - File or directory to delete
52
* @throws IOException - If deletion fails
53
*/
54
public static void deleteRecursively(File file) throws IOException;
55
56
/**
57
* Recursively deletes files and directories with filename filter
58
* @param file - File or directory to delete
59
* @param filter - Filename filter for selective deletion
60
* @throws IOException - If deletion fails
61
*/
62
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException;
63
}
64
```
65
66
**Usage Examples:**
67
68
```java
69
import org.apache.spark.network.util.JavaUtils;
70
import java.io.*;
71
import java.nio.ByteBuffer;
72
73
// Resource management with automatic cleanup
74
FileInputStream fis = null;
75
try {
76
fis = new FileInputStream("data.txt");
77
// Process file...
78
} catch (IOException e) {
79
// Handle error...
80
} finally {
81
// Safe cleanup - no exceptions thrown
82
JavaUtils.closeQuietly(fis);
83
}
84
85
// Safe hashing for distributed operations
86
public class PartitionKey {
87
private final String key;
88
89
public PartitionKey(String key) {
90
this.key = key;
91
}
92
93
@Override
94
public int hashCode() {
95
// Always returns non-negative hash
96
return JavaUtils.nonNegativeHash(key);
97
}
98
99
public int getPartition(int numPartitions) {
100
return JavaUtils.nonNegativeHash(key) % numPartitions;
101
}
102
}
103
104
// String and ByteBuffer conversions
105
public void processNetworkData() {
106
// Convert string for network transmission
107
String message = "Hello Spark";
108
ByteBuffer buffer = JavaUtils.stringToBytes(message);
109
110
// Send buffer over network...
111
sendOverNetwork(buffer);
112
113
// Convert received buffer back to string
114
ByteBuffer received = receiveFromNetwork();
115
String decoded = JavaUtils.bytesToString(received);
116
System.out.println("Received: " + decoded);
117
}
118
119
// File cleanup in distributed environments
120
public void cleanupTempFiles(String tempDir) {
121
File tempDirectory = new File(tempDir);
122
123
// Recursively delete all temporary files
124
// May throw IOException if deletion fails
125
try {
126
JavaUtils.deleteRecursively(tempDirectory);
127
} catch (IOException e) {
128
System.err.println("Failed to delete temp directory: " + e.getMessage());
129
}
130
131
System.out.println("Cleanup completed for: " + tempDir);
132
}
133
```
134
135
### Closeable Resource Management
136
137
Safe resource cleanup utilities that handle exceptions gracefully.
138
139
```java { .api }
140
/**
141
* Closes a Closeable resource without throwing exceptions
142
* @param closeable - Resource to close (can be null)
143
*/
144
public static void closeQuietly(Closeable closeable);
145
```
146
147
**Usage Examples:**
148
149
```java
150
import org.apache.spark.network.util.JavaUtils;
151
import java.io.*;
152
import java.net.*;
153
154
// Database connection cleanup
155
public void processDatabase() {
156
Connection conn = null;
157
PreparedStatement stmt = null;
158
ResultSet rs = null;
159
160
try {
161
conn = getConnection();
162
stmt = conn.prepareStatement("SELECT * FROM users");
163
rs = stmt.executeQuery();
164
165
while (rs.next()) {
166
processRow(rs);
167
}
168
} catch (SQLException e) {
169
handleError(e);
170
} finally {
171
// Safe cleanup of all resources
172
JavaUtils.closeQuietly(rs);
173
JavaUtils.closeQuietly(stmt);
174
JavaUtils.closeQuietly(conn);
175
}
176
}
177
178
// Network resource cleanup
179
public void downloadData(String url) {
180
InputStream input = null;
181
OutputStream output = null;
182
183
try {
184
URLConnection connection = new URL(url).openConnection();
185
input = connection.getInputStream();
186
output = new FileOutputStream("downloaded.dat");
187
188
byte[] buffer = new byte[8192];
189
int bytesRead;
190
while ((bytesRead = input.read(buffer)) != -1) {
191
output.write(buffer, 0, bytesRead);
192
}
193
} catch (IOException e) {
194
handleError(e);
195
} finally {
196
// Cleanup both streams safely
197
JavaUtils.closeQuietly(input);
198
JavaUtils.closeQuietly(output);
199
}
200
}
201
202
// Multiple resource cleanup pattern
203
public void processMultipleFiles(List<String> filePaths) {
204
List<FileInputStream> streams = new ArrayList<>();
205
206
try {
207
// Open all files
208
for (String path : filePaths) {
209
streams.add(new FileInputStream(path));
210
}
211
212
// Process files...
213
214
} catch (IOException e) {
215
handleError(e);
216
} finally {
217
// Cleanup all streams
218
for (FileInputStream stream : streams) {
219
JavaUtils.closeQuietly(stream);
220
}
221
}
222
}
223
```
224
225
### Hash Code Utilities
226
227
Non-negative hash code generation for safe partitioning and distributed operations.
228
229
```java { .api }
230
/**
231
* Returns a non-negative hash code for any object
232
* @param obj - Object to hash (can be null)
233
* @return Non-negative hash code
234
*/
235
public static int nonNegativeHash(Object obj);
236
```
237
238
**Usage Examples:**
239
240
```java
241
import org.apache.spark.network.util.JavaUtils;
242
243
// Safe partitioning in distributed systems
244
public class DataPartitioner {
245
private final int numPartitions;
246
247
public DataPartitioner(int numPartitions) {
248
this.numPartitions = numPartitions;
249
}
250
251
public int getPartition(Object key) {
252
// Always returns value in range [0, numPartitions)
253
return JavaUtils.nonNegativeHash(key) % numPartitions;
254
}
255
}
256
257
// Hash-based bucketing
258
public class HashBucket {
259
public static int getBucket(String key, int numBuckets) {
260
// Safe for negative hash codes and null keys
261
return JavaUtils.nonNegativeHash(key) % numBuckets;
262
}
263
}
264
265
// Consistent hashing for load balancing
266
public class LoadBalancer {
267
private final List<String> servers;
268
269
public LoadBalancer(List<String> servers) {
270
this.servers = servers;
271
}
272
273
public String getServer(String requestId) {
274
int index = JavaUtils.nonNegativeHash(requestId) % servers.size();
275
return servers.get(index);
276
}
277
}
278
279
// Safe hash code implementation
280
public class DistributedKey {
281
private final String id;
282
private final long timestamp;
283
284
public DistributedKey(String id, long timestamp) {
285
this.id = id;
286
this.timestamp = timestamp;
287
}
288
289
@Override
290
public int hashCode() {
291
// Combine multiple fields safely
292
int hash1 = JavaUtils.nonNegativeHash(id);
293
int hash2 = JavaUtils.nonNegativeHash(timestamp);
294
return JavaUtils.nonNegativeHash(hash1 ^ hash2);
295
}
296
}
297
```
298
299
### Data Conversion Utilities
300
301
UTF-8 string and ByteBuffer conversion utilities for network operations.
302
303
```java { .api }
304
/**
305
* Converts a string to ByteBuffer using UTF-8 encoding
306
* @param s - String to convert
307
* @return ByteBuffer containing UTF-8 encoded string
308
*/
309
public static ByteBuffer stringToBytes(String s);
310
311
/**
312
* Converts a ByteBuffer to string using UTF-8 decoding
313
* @param b - ByteBuffer to convert
314
* @return Decoded string
315
*/
316
public static String bytesToString(ByteBuffer b);
317
```
318
319
**Usage Examples:**
320
321
```java
322
import org.apache.spark.network.util.JavaUtils;
323
import java.nio.ByteBuffer;
324
import java.util.concurrent.ConcurrentHashMap;
325
326
// Network message serialization
327
public class MessageSerializer {
328
329
public ByteBuffer serialize(String message) {
330
return JavaUtils.stringToBytes(message);
331
}
332
333
public String deserialize(ByteBuffer buffer) {
334
return JavaUtils.bytesToString(buffer);
335
}
336
}
337
338
// Efficient string caching with ByteBuffer
339
public class StringCache {
340
private final ConcurrentHashMap<String, ByteBuffer> cache = new ConcurrentHashMap<>();
341
342
public ByteBuffer getEncoded(String str) {
343
return cache.computeIfAbsent(str, JavaUtils::stringToBytes);
344
}
345
346
public void put(String str) {
347
cache.put(str, JavaUtils.stringToBytes(str));
348
}
349
}
350
351
// Protocol buffer-like message handling
352
public class NetworkProtocol {
353
354
public void sendMessage(String message, OutputStream out) throws IOException {
355
ByteBuffer buffer = JavaUtils.stringToBytes(message);
356
357
// Write length first
358
out.write(intToBytes(buffer.remaining()));
359
360
// Write data
361
out.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
362
}
363
364
public String receiveMessage(InputStream in) throws IOException {
365
// Read length
366
int length = bytesToInt(readBytes(in, 4));
367
368
// Read data
369
byte[] data = readBytes(in, length);
370
ByteBuffer buffer = ByteBuffer.wrap(data);
371
372
return JavaUtils.bytesToString(buffer);
373
}
374
375
private byte[] readBytes(InputStream in, int count) throws IOException {
376
byte[] bytes = new byte[count];
377
int totalRead = 0;
378
while (totalRead < count) {
379
int read = in.read(bytes, totalRead, count - totalRead);
380
if (read == -1) throw new EOFException();
381
totalRead += read;
382
}
383
return bytes;
384
}
385
}
386
387
// Binary data processing
388
public class BinaryProcessor {
389
390
public void processTextData(ByteBuffer binaryData) {
391
// Convert binary data to text for processing
392
String text = JavaUtils.bytesToString(binaryData);
393
394
// Process as text
395
String processed = text.toUpperCase().trim();
396
397
// Convert back to binary if needed
398
ByteBuffer result = JavaUtils.stringToBytes(processed);
399
400
// Store or transmit result...
401
}
402
}
403
```
404
405
### File System Utilities
406
407
Recursive file deletion utilities for cleanup operations in distributed environments.
408
409
```java { .api }
410
/**
411
* Recursively deletes files and directories
412
* @param file - File or directory to delete
413
*/
414
public static void deleteRecursively(File file);
415
```
416
417
**Usage Examples:**
418
419
```java
420
import org.apache.spark.network.util.JavaUtils;
421
import java.io.File;
422
423
// Temporary directory cleanup
424
public class TempDirectoryManager {
425
private final String tempBasePath;
426
427
public TempDirectoryManager(String tempBasePath) {
428
this.tempBasePath = tempBasePath;
429
}
430
431
public File createTempDir(String prefix) {
432
File tempDir = new File(tempBasePath, prefix + "_" + System.currentTimeMillis());
433
tempDir.mkdirs();
434
return tempDir;
435
}
436
437
public void cleanup(File tempDir) {
438
// Recursively delete entire directory tree
439
try {
440
JavaUtils.deleteRecursively(tempDir);
441
} catch (IOException e) {
442
System.err.println("Failed to delete directory: " + e.getMessage());
443
}
444
}
445
446
public void cleanupAll() {
447
File baseDir = new File(tempBasePath);
448
if (baseDir.exists()) {
449
// Delete all temporary directories
450
try {
451
JavaUtils.deleteRecursively(baseDir);
452
} catch (IOException e) {
453
System.err.println("Failed to delete base directory: " + e.getMessage());
454
}
455
}
456
}
457
}
458
459
// Build artifact cleanup
460
public class BuildCleaner {
461
462
public void cleanProject(String projectPath) {
463
// Clean common build directories
464
cleanDirectory(projectPath + "/target");
465
cleanDirectory(projectPath + "/build");
466
cleanDirectory(projectPath + "/.gradle");
467
cleanDirectory(projectPath + "/node_modules");
468
}
469
470
private void cleanDirectory(String path) {
471
File dir = new File(path);
472
if (dir.exists()) {
473
System.out.println("Cleaning: " + path);
474
try {
475
JavaUtils.deleteRecursively(dir);
476
} catch (IOException e) {
477
System.err.println("Failed to clean " + path + ": " + e.getMessage());
478
}
479
}
480
}
481
}
482
483
// Log file rotation cleanup
484
public class LogCleaner {
485
486
public void cleanOldLogs(String logDir, int daysToKeep) {
487
File logDirectory = new File(logDir);
488
489
if (!logDirectory.exists()) return;
490
491
long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24 * 60 * 60 * 1000L);
492
493
File[] files = logDirectory.listFiles();
494
if (files != null) {
495
for (File file : files) {
496
if (file.lastModified() < cutoffTime) {
497
System.out.println("Deleting old log: " + file.getName());
498
if (file.isDirectory()) {
499
try {
500
JavaUtils.deleteRecursively(file);
501
} catch (IOException e) {
502
System.err.println("Failed to delete log directory: " + e.getMessage());
503
}
504
} else {
505
file.delete();
506
}
507
}
508
}
509
}
510
}
511
}
512
513
// Shutdown cleanup hook
514
public class ApplicationCleaner {
515
private final List<File> tempFiles = new ArrayList<>();
516
517
public ApplicationCleaner() {
518
// Register shutdown hook for cleanup
519
Runtime.getRuntime().addShutdownHook(new Thread(this::cleanup));
520
}
521
522
public void addTempFile(File tempFile) {
523
tempFiles.add(tempFile);
524
}
525
526
private void cleanup() {
527
System.out.println("Cleaning up temporary files...");
528
for (File tempFile : tempFiles) {
529
try {
530
JavaUtils.deleteRecursively(tempFile);
531
} catch (IOException e) {
532
System.err.println("Failed to delete temp file: " + e.getMessage());
533
}
534
}
535
}
536
}
537
```
538
539
## MemoryMode Enum
540
541
Enumeration for different memory allocation modes in Spark operations.
542
543
```java { .api }
544
/**
545
* Enumeration for memory allocation modes
546
*/
547
public enum MemoryMode {
548
/** Standard JVM heap memory allocation */
549
ON_HEAP,
550
551
/** Off-heap memory allocation (outside JVM heap) */
552
OFF_HEAP
553
}
554
```
555
556
**Usage Examples:**
557
558
```java
559
import org.apache.spark.memory.MemoryMode;
560
561
// Memory mode configuration
562
public class MemoryConfiguration {
563
564
public void configureMemory(boolean useOffHeap) {
565
MemoryMode mode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
566
567
switch (mode) {
568
case ON_HEAP:
569
System.out.println("Using JVM heap memory");
570
configureHeapMemory();
571
break;
572
573
case OFF_HEAP:
574
System.out.println("Using off-heap memory");
575
configureOffHeapMemory();
576
break;
577
}
578
}
579
580
private void configureHeapMemory() {
581
// Configure heap-based memory settings
582
}
583
584
private void configureOffHeapMemory() {
585
// Configure off-heap memory settings
586
}
587
}
588
589
// Memory allocation strategy
590
public class MemoryAllocator {
591
592
public Buffer allocateBuffer(int size, MemoryMode mode) {
593
switch (mode) {
594
case ON_HEAP:
595
return allocateHeapBuffer(size);
596
597
case OFF_HEAP:
598
return allocateOffHeapBuffer(size);
599
600
default:
601
throw new IllegalArgumentException("Unknown memory mode: " + mode);
602
}
603
}
604
605
private Buffer allocateHeapBuffer(int size) {
606
// Allocate heap-based buffer
607
return new HeapBuffer(size);
608
}
609
610
private Buffer allocateOffHeapBuffer(int size) {
611
// Allocate off-heap buffer
612
return new OffHeapBuffer(size);
613
}
614
}
615
```
616
617
## Constants and Configuration
618
619
```java { .api }
620
/**
621
* Default driver memory size in megabytes
622
*/
623
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
624
```
625
626
**Usage Examples:**
627
628
```java
629
import org.apache.spark.network.util.JavaUtils;
630
631
// Memory configuration utilities
632
public class SparkMemoryConfig {
633
634
public long getDriverMemory(String configValue) {
635
if (configValue == null || configValue.trim().isEmpty()) {
636
// Use default if not specified
637
return JavaUtils.DEFAULT_DRIVER_MEM_MB;
638
}
639
640
try {
641
return Long.parseLong(configValue);
642
} catch (NumberFormatException e) {
643
System.err.println("Invalid memory config: " + configValue +
644
", using default: " + JavaUtils.DEFAULT_DRIVER_MEM_MB);
645
return JavaUtils.DEFAULT_DRIVER_MEM_MB;
646
}
647
}
648
649
public String formatMemorySize(long memoryMB) {
650
if (memoryMB == JavaUtils.DEFAULT_DRIVER_MEM_MB) {
651
return memoryMB + "MB (default)";
652
} else {
653
return memoryMB + "MB";
654
}
655
}
656
}
657
```
658
659
## Type Definitions
660
661
```java { .api }
662
// Core utility class
663
public class JavaUtils {
664
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
665
666
public static void closeQuietly(Closeable closeable);
667
public static int nonNegativeHash(Object obj);
668
public static ByteBuffer stringToBytes(String s);
669
public static String bytesToString(ByteBuffer b);
670
public static void deleteRecursively(File file);
671
}
672
673
// Memory allocation mode enumeration
674
public enum MemoryMode {
675
ON_HEAP,
676
OFF_HEAP
677
}
678
```