Spark Unsafe provides high-performance utilities for array operations, including optimized byte array methods, memory-backed long arrays, and key-value iterators. These utilities are designed for maximum performance in data processing workloads by leveraging unsafe memory operations and word-aligned access patterns.
import java.io.IOException;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.KVIterator;
import org.apache.spark.unsafe.types.ByteArray;// Power of 2 calculations
long nextPower = ByteArrayMethods.nextPowerOf2(100); // Returns 128
long powerOf16 = ByteArrayMethods.nextPowerOf2(16); // Returns 16
// Word alignment calculations
int aligned1 = ByteArrayMethods.roundNumberOfBytesToNearestWord(15); // Returns 16
int aligned2 = ByteArrayMethods.roundNumberOfBytesToNearestWord(24); // Returns 24
// High-performance array comparison
byte[] array1 = "Hello, World!".getBytes(StandardCharsets.UTF_8);
byte[] array2 = "Hello, World!".getBytes(StandardCharsets.UTF_8);
byte[] array3 = "Different".getBytes(StandardCharsets.UTF_8);
boolean equal1 = ByteArrayMethods.arrayEquals(
array1, Platform.BYTE_ARRAY_OFFSET,
array2, Platform.BYTE_ARRAY_OFFSET,
array1.length
); // true
boolean equal2 = ByteArrayMethods.arrayEquals(
array1, Platform.BYTE_ARRAY_OFFSET,
array3, Platform.BYTE_ARRAY_OFFSET,
array1.length
); // false// Create memory block for long array
HeapMemoryAllocator allocator = new HeapMemoryAllocator();
MemoryBlock memory = allocator.allocate(80); // 10 longs * 8 bytes each
// Create long array backed by memory block
LongArray longArray = new LongArray(memory);
// Basic operations
long capacity = longArray.size(); // Number of longs this array can hold
System.out.println("Array capacity: " + capacity);
// Fill array with data
for (int i = 0; i < capacity; i++) {
longArray.set(i, i * 10L);
}
// Read data from array
for (int i = 0; i < capacity; i++) {
long value = longArray.get(i);
System.out.println("Index " + i + ": " + value);
}
// Zero out the entire array
longArray.zeroOut();
// Verify array is zeroed
for (int i = 0; i < capacity; i++) {
long value = longArray.get(i);
assert value == 0L;
}
// Clean up
allocator.free(memory);// Working with memory-backed arrays and direct access
MemoryBlock block = allocator.allocate(1024);
LongArray array = new LongArray(block);
// Get direct memory access information
Object baseObject = array.getBaseObject();
long baseOffset = array.getBaseOffset();
MemoryBlock underlyingBlock = array.memoryBlock();
// Use Platform class for direct memory access
Platform.putLong(baseObject, baseOffset, 12345L);
long directValue = Platform.getLong(baseObject, baseOffset);
// Compare with array methods
array.set(0, 12345L);
long arrayValue = array.get(0);
assert directValue == arrayValue; // Both approaches yield same resultpublic class ByteArrayMethods {
/**
* Maximum safe array length for word-aligned arrays.
*/
public static final int MAX_ROUNDED_ARRAY_LENGTH;
/**
* Returns the next power of 2 greater than or equal to the input.
* For inputs already a power of 2, returns the input unchanged.
*/
public static long nextPowerOf2(long num);
/**
* Rounds byte count up to the nearest 8-byte (word) boundary.
*/
public static int roundNumberOfBytesToNearestWord(int numBytes);
/**
* Rounds byte count up to the nearest 8-byte (word) boundary.
*/
public static long roundNumberOfBytesToNearestWord(long numBytes);
/**
* High-performance byte array equality comparison using unsafe operations.
* Compares arrays in word-sized chunks for maximum performance.
*
* @param leftBase Base object for left array (array itself for heap arrays)
* @param leftOffset Offset within left base object
* @param rightBase Base object for right array
* @param rightOffset Offset within right base object
* @param length Number of bytes to compare
* @return true if arrays are equal, false otherwise
*/
public static boolean arrayEquals(Object leftBase, long leftOffset,
Object rightBase, long rightOffset, long length);
}public final class LongArray {
/**
* Creates a long array backed by the specified memory block.
* The memory block must be at least 8-byte aligned and have sufficient space.
*/
public LongArray(MemoryBlock memory);
/**
* Returns the underlying memory block backing this array.
*/
public MemoryBlock memoryBlock();
/**
* Returns the base object for direct memory access.
* For heap-allocated arrays, this is the underlying byte array.
* For off-heap arrays, this is null.
*/
public Object getBaseObject();
/**
* Returns the base offset for direct memory access.
*/
public long getBaseOffset();
/**
* Returns the number of long elements this array can hold.
* This is the memory block size divided by 8.
*/
public long size();
/**
* Fills the entire array with zeros using optimized memory operations.
*/
public void zeroOut();
/**
* Sets the value at the specified index.
*
* @param index Array index (0-based)
* @param value Long value to store
*/
public void set(int index, long value);
/**
* Gets the value at the specified index.
*
* @param index Array index (0-based)
* @return Long value at the specified index
*/
public long get(int index);
}Word-Aligned Comparison: The arrayEquals method compares arrays in 8-byte chunks when possible, significantly faster than byte-by-byte comparison.
SIMD Optimization: On supported platforms, the JVM may use SIMD instructions for bulk operations.
Cache Efficiency: Word-aligned access patterns improve CPU cache utilization.
Direct Memory Access: Bypasses array bounds checking for maximum performance.
Memory Layout: Uses contiguous memory layout for optimal cache performance.
Bulk Operations: The zeroOut() method uses optimized memory filling operations.
MemoryBlock which must be explicitly managedBounds Checking: LongArray does not perform bounds checking for performance reasons. Ensure indices are within valid range.
Memory Alignment: LongArray requires 8-byte aligned memory blocks for correct operation.
Thread Safety: Neither ByteArrayMethods nor LongArray provide thread safety guarantees.
Memory Block Lifetime: Ensure the MemoryBlock backing a LongArray remains valid during array usage.
Platform Dependencies: Performance characteristics may vary across different JVM implementations and platforms.
// Calculate required size with proper alignment
int numElements = 1000;
long requiredBytes = numElements * 8L; // 8 bytes per long
long alignedBytes = ByteArrayMethods.roundNumberOfBytesToNearestWord(requiredBytes);
// Allocate aligned memory
MemoryAllocator allocator = MemoryAllocator.HEAP;
MemoryBlock block = allocator.allocate(alignedBytes);
try {
LongArray array = new LongArray(block);
// Use array safely within calculated bounds
long actualCapacity = array.size();
for (int i = 0; i < Math.min(numElements, actualCapacity); i++) {
array.set(i, i);
}
// Process data...
} finally {
// Always clean up
allocator.free(block);
}// Compare arrays efficiently using word-aligned operations
public static boolean fastArrayEquals(byte[] a, byte[] b) {
if (a.length != b.length) {
return false;
}
return ByteArrayMethods.arrayEquals(
a, Platform.BYTE_ARRAY_OFFSET,
b, Platform.BYTE_ARRAY_OFFSET,
a.length
);
}// Calculate optimal buffer size
int desiredSize = 1000;
long optimalSize = ByteArrayMethods.nextPowerOf2(desiredSize);
MemoryBlock buffer = allocator.allocate(optimalSize);/**
* Abstract base class for key-value iterators.
* Provides a common interface for iterating over key-value pairs.
*/
public abstract class KVIterator<K, V> {
/**
* Advances to the next key-value pair.
* @return true if there is a next pair, false if iteration is complete
* @throws IOException if an I/O error occurs during iteration
*/
public abstract boolean next() throws IOException;
/**
* Returns the current key.
* Must be called after a successful next() call.
* @return the current key
*/
public abstract K getKey();
/**
* Returns the current value.
* Must be called after a successful next() call.
* @return the current value
*/
public abstract V getValue();
/**
* Closes the iterator and releases any associated resources.
*/
public abstract void close();
}public final class ByteArray {
/**
* Empty byte array constant.
*/
public static final byte[] EMPTY_BYTE;
/**
* Writes byte array content to specified memory location.
*
* @param src Source byte array
* @param target Target base object
* @param targetOffset Offset within target object
*/
public static void writeToMemory(byte[] src, Object target, long targetOffset);
/**
* Returns 64-bit prefix of byte array for sorting operations.
*
* @param bytes Input byte array
* @return 64-bit prefix value
*/
public static long getPrefix(byte[] bytes);
/**
* Extracts substring from byte array using SQL semantics.
*
* @param bytes Source byte array
* @param pos Starting position (1-based, SQL-style)
* @param len Length of substring
* @return Extracted byte array substring
*/
public static byte[] subStringSQL(byte[] bytes, int pos, int len);
/**
* Concatenates multiple byte arrays into a single array.
*
* @param inputs Variable number of byte arrays to concatenate
* @return Concatenated byte array
*/
public static byte[] concat(byte[]... inputs);
}// Example implementation of KVIterator
public class SimpleKVIterator extends KVIterator<String, Integer> {
private final Map<String, Integer> data;
private final Iterator<Map.Entry<String, Integer>> iterator;
private Map.Entry<String, Integer> current;
public SimpleKVIterator(Map<String, Integer> data) {
this.data = data;
this.iterator = data.entrySet().iterator();
this.current = null;
}
@Override
public boolean next() {
if (iterator.hasNext()) {
current = iterator.next();
return true;
}
return false;
}
@Override
public String getKey() {
return current != null ? current.getKey() : null;
}
@Override
public Integer getValue() {
return current != null ? current.getValue() : null;
}
@Override
public void close() {
// Clean up resources if needed
current = null;
}
}
// Usage
Map<String, Integer> data = Map.of("a", 1, "b", 2, "c", 3);
KVIterator<String, Integer> iterator = new SimpleKVIterator(data);
while (iterator.next()) {
String key = iterator.getKey();
Integer value = iterator.getValue();
System.out.println(key + " -> " + value);
}
iterator.close();// Working with ByteArray utilities
byte[] data1 = "Hello".getBytes(StandardCharsets.UTF_8);
byte[] data2 = "World".getBytes(StandardCharsets.UTF_8);
// Concatenate arrays
byte[] concatenated = ByteArray.concat(data1, " ".getBytes(), data2);
String result = new String(concatenated, StandardCharsets.UTF_8); // "Hello World"
// Get prefix for sorting
long prefix1 = ByteArray.getPrefix(data1);
long prefix2 = ByteArray.getPrefix(data2);
int comparison = Long.compare(prefix1, prefix2);
// SQL-style substring
byte[] fullText = "Hello, World!".getBytes(StandardCharsets.UTF_8);
byte[] substring = ByteArray.subStringSQL(fullText, 8, 5); // "World" (1-based, length 5)
// Write to memory
MemoryAllocator allocator = MemoryAllocator.HEAP;
MemoryBlock block = allocator.allocate(concatenated.length);
try {
ByteArray.writeToMemory(concatenated, block.getBaseObject(), block.getBaseOffset());
// Data is now written to memory block
} finally {
allocator.free(block);
}