Spark Unsafe provides sophisticated memory management capabilities supporting both heap and off-heap allocation strategies. The memory management system includes allocators, memory blocks, and memory locations designed for high-performance data processing workloads.
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.UnsafeMemoryAllocator;// Use heap memory allocator
MemoryAllocator heapAllocator = new HeapMemoryAllocator();
MemoryBlock block = heapAllocator.allocate(1024);
// Fill the block with data
block.fill((byte) 0xFF);
// Clean up
heapAllocator.free(block);// Use unsafe (off-heap) memory allocator
MemoryAllocator unsafeAllocator = new UnsafeMemoryAllocator();
MemoryBlock offHeapBlock = unsafeAllocator.allocate(2048);
// Use the memory block
System.out.println("Block size: " + offHeapBlock.size());
// Clean up
unsafeAllocator.free(offHeapBlock);// Use global allocator instances
MemoryBlock heapBlock = MemoryAllocator.HEAP.allocate(512);
MemoryBlock unsafeBlock = MemoryAllocator.UNSAFE.allocate(512);
// Clean up
MemoryAllocator.HEAP.free(heapBlock);
MemoryAllocator.UNSAFE.free(unsafeBlock);// Create memory location
MemoryLocation location = new MemoryLocation(null, 0);
// Update location
byte[] data = new byte[100];
location.setObjAndOffset(data, Platform.BYTE_ARRAY_OFFSET);
// Access location properties
Object baseObject = location.getBaseObject();
long baseOffset = location.getBaseOffset();long[] array = {1L, 2L, 3L, 4L, 5L};
MemoryBlock arrayBlock = MemoryBlock.fromLongArray(array);
System.out.println("Array block size: " + arrayBlock.size());public interface MemoryAllocator {
// Debug configuration constants
public static final boolean MEMORY_DEBUG_FILL_ENABLED;
public static final byte MEMORY_DEBUG_FILL_CLEAN_VALUE;
public static final byte MEMORY_DEBUG_FILL_FREED_VALUE;
// Global allocator instances
public static final MemoryAllocator UNSAFE;
public static final MemoryAllocator HEAP;
/**
* Allocates a contiguous memory block of the specified size.
*/
MemoryBlock allocate(long size);
/**
* Frees a previously allocated memory block.
*/
void free(MemoryBlock memory);
}public class MemoryBlock {
// Page number constants for TaskMemoryManager integration
public static final int NO_PAGE_NUMBER;
public static final int FREED_IN_TMM_PAGE_NUMBER;
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER;
// Optional page number for TaskMemoryManager allocated pages
public int pageNumber;
/**
* Creates memory block with specified base object, offset, and length.
*/
public MemoryBlock(Object obj, long offset, long length);
/**
* Returns the size of this memory block in bytes.
*/
public long size();
/**
* Fills the entire memory block with the specified byte value.
*/
public void fill(byte value);
/**
* Creates a memory block wrapping a long array.
*/
public static MemoryBlock fromLongArray(long[] array);
}public class MemoryLocation {
/**
* Creates memory location with specified base object and offset.
*/
public MemoryLocation(Object obj, long offset);
/**
* Creates memory location with null base and zero offset.
*/
public MemoryLocation();
/**
* Updates the base object and offset of this memory location.
*/
public void setObjAndOffset(Object newObj, long newOffset);
/**
* Returns the base object for memory access.
*/
public Object getBaseObject();
/**
* Returns the base offset for memory access.
*/
public long getBaseOffset();
}public class HeapMemoryAllocator implements MemoryAllocator {
/**
* Allocates heap memory block of specified size.
*/
public MemoryBlock allocate(long size);
/**
* Frees previously allocated heap memory block.
*/
public void free(MemoryBlock memory);
}public class UnsafeMemoryAllocator implements MemoryAllocator {
/**
* Allocates off-heap memory block using unsafe operations.
*/
public MemoryBlock allocate(long size);
/**
* Frees previously allocated off-heap memory block.
*/
public void free(MemoryBlock memory);
}Platform.allocateMemory() directlyThe memory allocator system includes debug support for tracking memory allocation and deallocation:
MEMORY_DEBUG_FILL_ENABLED: When true, fills allocated and freed memory with specific patternsMEMORY_DEBUG_FILL_CLEAN_VALUE: Value used to fill newly allocated memoryMEMORY_DEBUG_FILL_FREED_VALUE: Value used to fill freed memoryMemory blocks can be integrated with Spark's TaskMemoryManager for memory tracking:
pageNumber field tracks pages allocated by TaskMemoryManagerAlways Free Memory: Pair every allocate() call with a corresponding free() call to prevent memory leaks.
Choose Appropriate Allocator: Use heap allocation for smaller, short-lived objects and off-heap allocation for large, long-lived data structures.
Debug Mode: Enable debug filling in development to catch use-after-free bugs.
Thread Safety: Memory allocators are thread-safe, but individual memory blocks are not.
Memory Block Reuse: Memory blocks should not be used after being freed.