Core utility classes and functions for Apache Spark including exception handling, logging, storage configuration, and Java API integration
—
Essential utilities for network operations and common Java tasks, providing helper methods for resource management, data conversion, and file operations in distributed environments.
Core utility class providing static methods for common operations in network and distributed computing contexts.
/**
* General utilities for network operations and common Java tasks
* Package: org.apache.spark.network.util
*/
public class JavaUtils {
/** Default driver memory size in megabytes */
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
/**
* Closes a Closeable resource without throwing exceptions
* Logs any IOException that occurs during closing
* @param closeable - Resource to close (can be null)
*/
public static void closeQuietly(Closeable closeable);
/**
* Returns a non-negative hash code for any object
* Handles null objects and ensures result is never negative
* @param obj - Object to hash (can be null)
* @return Non-negative hash code
*/
public static int nonNegativeHash(Object obj);
/**
* Converts a string to ByteBuffer using UTF-8 encoding
* @param s - String to convert
* @return ByteBuffer containing UTF-8 encoded string
*/
public static ByteBuffer stringToBytes(String s);
/**
* Converts a ByteBuffer to string using UTF-8 decoding
* @param b - ByteBuffer to convert
* @return Decoded string
*/
public static String bytesToString(ByteBuffer b);
/**
* Recursively deletes files and directories
* @param file - File or directory to delete
* @throws IOException - If deletion fails
*/
public static void deleteRecursively(File file) throws IOException;
/**
* Recursively deletes files and directories with filename filter
* @param file - File or directory to delete
* @param filter - Filename filter for selective deletion
* @throws IOException - If deletion fails
*/
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException;
}Usage Examples:
import org.apache.spark.network.util.JavaUtils;
import java.io.*;
import java.nio.ByteBuffer;
// Resource management with automatic cleanup
FileInputStream fis = null;
try {
fis = new FileInputStream("data.txt");
// Process file...
} catch (IOException e) {
// Handle error...
} finally {
// Safe cleanup - no exceptions thrown
JavaUtils.closeQuietly(fis);
}
// Safe hashing for distributed operations
public class PartitionKey {
private final String key;
public PartitionKey(String key) {
this.key = key;
}
@Override
public int hashCode() {
// Always returns non-negative hash
return JavaUtils.nonNegativeHash(key);
}
public int getPartition(int numPartitions) {
return JavaUtils.nonNegativeHash(key) % numPartitions;
}
}
// String and ByteBuffer conversions
public void processNetworkData() {
// Convert string for network transmission
String message = "Hello Spark";
ByteBuffer buffer = JavaUtils.stringToBytes(message);
// Send buffer over network...
sendOverNetwork(buffer);
// Convert received buffer back to string
ByteBuffer received = receiveFromNetwork();
String decoded = JavaUtils.bytesToString(received);
System.out.println("Received: " + decoded);
}
// File cleanup in distributed environments
public void cleanupTempFiles(String tempDir) {
File tempDirectory = new File(tempDir);
// Recursively delete all temporary files
// May throw IOException if deletion fails
try {
JavaUtils.deleteRecursively(tempDirectory);
} catch (IOException e) {
System.err.println("Failed to delete temp directory: " + e.getMessage());
}
System.out.println("Cleanup completed for: " + tempDir);
}Safe resource cleanup utilities that handle exceptions gracefully.
/**
* Closes a Closeable resource without throwing exceptions
* @param closeable - Resource to close (can be null)
*/
public static void closeQuietly(Closeable closeable);Usage Examples:
import org.apache.spark.network.util.JavaUtils;
import java.io.*;
import java.net.*;
// Database connection cleanup
public void processDatabase() {
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = getConnection();
stmt = conn.prepareStatement("SELECT * FROM users");
rs = stmt.executeQuery();
while (rs.next()) {
processRow(rs);
}
} catch (SQLException e) {
handleError(e);
} finally {
// Safe cleanup of all resources
JavaUtils.closeQuietly(rs);
JavaUtils.closeQuietly(stmt);
JavaUtils.closeQuietly(conn);
}
}
// Network resource cleanup
public void downloadData(String url) {
InputStream input = null;
OutputStream output = null;
try {
URLConnection connection = new URL(url).openConnection();
input = connection.getInputStream();
output = new FileOutputStream("downloaded.dat");
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
} catch (IOException e) {
handleError(e);
} finally {
// Cleanup both streams safely
JavaUtils.closeQuietly(input);
JavaUtils.closeQuietly(output);
}
}
// Multiple resource cleanup pattern
public void processMultipleFiles(List<String> filePaths) {
List<FileInputStream> streams = new ArrayList<>();
try {
// Open all files
for (String path : filePaths) {
streams.add(new FileInputStream(path));
}
// Process files...
} catch (IOException e) {
handleError(e);
} finally {
// Cleanup all streams
for (FileInputStream stream : streams) {
JavaUtils.closeQuietly(stream);
}
}
}Non-negative hash code generation for safe partitioning and distributed operations.
/**
* Returns a non-negative hash code for any object
* @param obj - Object to hash (can be null)
* @return Non-negative hash code
*/
public static int nonNegativeHash(Object obj);Usage Examples:
import org.apache.spark.network.util.JavaUtils;
// Safe partitioning in distributed systems
public class DataPartitioner {
private final int numPartitions;
public DataPartitioner(int numPartitions) {
this.numPartitions = numPartitions;
}
public int getPartition(Object key) {
// Always returns value in range [0, numPartitions)
return JavaUtils.nonNegativeHash(key) % numPartitions;
}
}
// Hash-based bucketing
public class HashBucket {
public static int getBucket(String key, int numBuckets) {
// Safe for negative hash codes and null keys
return JavaUtils.nonNegativeHash(key) % numBuckets;
}
}
// Consistent hashing for load balancing
public class LoadBalancer {
private final List<String> servers;
public LoadBalancer(List<String> servers) {
this.servers = servers;
}
public String getServer(String requestId) {
int index = JavaUtils.nonNegativeHash(requestId) % servers.size();
return servers.get(index);
}
}
// Safe hash code implementation
public class DistributedKey {
private final String id;
private final long timestamp;
public DistributedKey(String id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
@Override
public int hashCode() {
// Combine multiple fields safely
int hash1 = JavaUtils.nonNegativeHash(id);
int hash2 = JavaUtils.nonNegativeHash(timestamp);
return JavaUtils.nonNegativeHash(hash1 ^ hash2);
}
}UTF-8 string and ByteBuffer conversion utilities for network operations.
/**
* Converts a string to ByteBuffer using UTF-8 encoding
* @param s - String to convert
* @return ByteBuffer containing UTF-8 encoded string
*/
public static ByteBuffer stringToBytes(String s);
/**
* Converts a ByteBuffer to string using UTF-8 decoding
* @param b - ByteBuffer to convert
* @return Decoded string
*/
public static String bytesToString(ByteBuffer b);Usage Examples:
import org.apache.spark.network.util.JavaUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
// Network message serialization
public class MessageSerializer {
public ByteBuffer serialize(String message) {
return JavaUtils.stringToBytes(message);
}
public String deserialize(ByteBuffer buffer) {
return JavaUtils.bytesToString(buffer);
}
}
// Efficient string caching with ByteBuffer
public class StringCache {
private final ConcurrentHashMap<String, ByteBuffer> cache = new ConcurrentHashMap<>();
public ByteBuffer getEncoded(String str) {
return cache.computeIfAbsent(str, JavaUtils::stringToBytes);
}
public void put(String str) {
cache.put(str, JavaUtils.stringToBytes(str));
}
}
// Protocol buffer-like message handling
public class NetworkProtocol {
public void sendMessage(String message, OutputStream out) throws IOException {
ByteBuffer buffer = JavaUtils.stringToBytes(message);
// Write length first
out.write(intToBytes(buffer.remaining()));
// Write data
out.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
}
public String receiveMessage(InputStream in) throws IOException {
// Read length
int length = bytesToInt(readBytes(in, 4));
// Read data
byte[] data = readBytes(in, length);
ByteBuffer buffer = ByteBuffer.wrap(data);
return JavaUtils.bytesToString(buffer);
}
private byte[] readBytes(InputStream in, int count) throws IOException {
byte[] bytes = new byte[count];
int totalRead = 0;
while (totalRead < count) {
int read = in.read(bytes, totalRead, count - totalRead);
if (read == -1) throw new EOFException();
totalRead += read;
}
return bytes;
}
}
// Binary data processing
public class BinaryProcessor {
public void processTextData(ByteBuffer binaryData) {
// Convert binary data to text for processing
String text = JavaUtils.bytesToString(binaryData);
// Process as text
String processed = text.toUpperCase().trim();
// Convert back to binary if needed
ByteBuffer result = JavaUtils.stringToBytes(processed);
// Store or transmit result...
}
}Recursive file deletion utilities for cleanup operations in distributed environments.
/**
* Recursively deletes files and directories
* @param file - File or directory to delete
*/
public static void deleteRecursively(File file);Usage Examples:
import org.apache.spark.network.util.JavaUtils;
import java.io.File;
// Temporary directory cleanup
public class TempDirectoryManager {
private final String tempBasePath;
public TempDirectoryManager(String tempBasePath) {
this.tempBasePath = tempBasePath;
}
public File createTempDir(String prefix) {
File tempDir = new File(tempBasePath, prefix + "_" + System.currentTimeMillis());
tempDir.mkdirs();
return tempDir;
}
public void cleanup(File tempDir) {
// Recursively delete entire directory tree
try {
JavaUtils.deleteRecursively(tempDir);
} catch (IOException e) {
System.err.println("Failed to delete directory: " + e.getMessage());
}
}
public void cleanupAll() {
File baseDir = new File(tempBasePath);
if (baseDir.exists()) {
// Delete all temporary directories
try {
JavaUtils.deleteRecursively(baseDir);
} catch (IOException e) {
System.err.println("Failed to delete base directory: " + e.getMessage());
}
}
}
}
// Build artifact cleanup
public class BuildCleaner {
public void cleanProject(String projectPath) {
// Clean common build directories
cleanDirectory(projectPath + "/target");
cleanDirectory(projectPath + "/build");
cleanDirectory(projectPath + "/.gradle");
cleanDirectory(projectPath + "/node_modules");
}
private void cleanDirectory(String path) {
File dir = new File(path);
if (dir.exists()) {
System.out.println("Cleaning: " + path);
try {
JavaUtils.deleteRecursively(dir);
} catch (IOException e) {
System.err.println("Failed to clean " + path + ": " + e.getMessage());
}
}
}
}
// Log file rotation cleanup
public class LogCleaner {
public void cleanOldLogs(String logDir, int daysToKeep) {
File logDirectory = new File(logDir);
if (!logDirectory.exists()) return;
long cutoffTime = System.currentTimeMillis() - (daysToKeep * 24 * 60 * 60 * 1000L);
File[] files = logDirectory.listFiles();
if (files != null) {
for (File file : files) {
if (file.lastModified() < cutoffTime) {
System.out.println("Deleting old log: " + file.getName());
if (file.isDirectory()) {
try {
JavaUtils.deleteRecursively(file);
} catch (IOException e) {
System.err.println("Failed to delete log directory: " + e.getMessage());
}
} else {
file.delete();
}
}
}
}
}
}
// Shutdown cleanup hook
public class ApplicationCleaner {
private final List<File> tempFiles = new ArrayList<>();
public ApplicationCleaner() {
// Register shutdown hook for cleanup
Runtime.getRuntime().addShutdownHook(new Thread(this::cleanup));
}
public void addTempFile(File tempFile) {
tempFiles.add(tempFile);
}
private void cleanup() {
System.out.println("Cleaning up temporary files...");
for (File tempFile : tempFiles) {
try {
JavaUtils.deleteRecursively(tempFile);
} catch (IOException e) {
System.err.println("Failed to delete temp file: " + e.getMessage());
}
}
}
}Enumeration for different memory allocation modes in Spark operations.
/**
* Enumeration for memory allocation modes
*/
public enum MemoryMode {
/** Standard JVM heap memory allocation */
ON_HEAP,
/** Off-heap memory allocation (outside JVM heap) */
OFF_HEAP
}Usage Examples:
import org.apache.spark.memory.MemoryMode;
// Memory mode configuration
public class MemoryConfiguration {
public void configureMemory(boolean useOffHeap) {
MemoryMode mode = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
switch (mode) {
case ON_HEAP:
System.out.println("Using JVM heap memory");
configureHeapMemory();
break;
case OFF_HEAP:
System.out.println("Using off-heap memory");
configureOffHeapMemory();
break;
}
}
private void configureHeapMemory() {
// Configure heap-based memory settings
}
private void configureOffHeapMemory() {
// Configure off-heap memory settings
}
}
// Memory allocation strategy
public class MemoryAllocator {
public Buffer allocateBuffer(int size, MemoryMode mode) {
switch (mode) {
case ON_HEAP:
return allocateHeapBuffer(size);
case OFF_HEAP:
return allocateOffHeapBuffer(size);
default:
throw new IllegalArgumentException("Unknown memory mode: " + mode);
}
}
private Buffer allocateHeapBuffer(int size) {
// Allocate heap-based buffer
return new HeapBuffer(size);
}
private Buffer allocateOffHeapBuffer(int size) {
// Allocate off-heap buffer
return new OffHeapBuffer(size);
}
}/**
* Default driver memory size in megabytes
*/
public static final long DEFAULT_DRIVER_MEM_MB = 1024;Usage Examples:
import org.apache.spark.network.util.JavaUtils;
// Memory configuration utilities
public class SparkMemoryConfig {
public long getDriverMemory(String configValue) {
if (configValue == null || configValue.trim().isEmpty()) {
// Use default if not specified
return JavaUtils.DEFAULT_DRIVER_MEM_MB;
}
try {
return Long.parseLong(configValue);
} catch (NumberFormatException e) {
System.err.println("Invalid memory config: " + configValue +
", using default: " + JavaUtils.DEFAULT_DRIVER_MEM_MB);
return JavaUtils.DEFAULT_DRIVER_MEM_MB;
}
}
public String formatMemorySize(long memoryMB) {
if (memoryMB == JavaUtils.DEFAULT_DRIVER_MEM_MB) {
return memoryMB + "MB (default)";
} else {
return memoryMB + "MB";
}
}
}// Core utility class
public class JavaUtils {
public static final long DEFAULT_DRIVER_MEM_MB = 1024;
public static void closeQuietly(Closeable closeable);
public static int nonNegativeHash(Object obj);
public static ByteBuffer stringToBytes(String s);
public static String bytesToString(ByteBuffer b);
public static void deleteRecursively(File file);
}
// Memory allocation mode enumeration
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}Install with Tessl CLI
npx tessl i tessl/maven-spark-common-utils