Memory allocation and management supporting both heap and off-heap memory with object pooling for large allocations and debugging capabilities. The memory management system provides efficient allocation strategies optimized for Spark's big data processing workloads.
Core interface for memory allocation with debugging support and multiple implementation strategies.
public interface MemoryAllocator {
public abstract MemoryBlock allocate(long size) throws OutOfMemoryError;
public abstract void free(MemoryBlock memory);
public static final boolean MEMORY_DEBUG_FILL_ENABLED;
public static final byte MEMORY_DEBUG_FILL_CLEAN_VALUE;
public static final byte MEMORY_DEBUG_FILL_FREED_VALUE;
public static final MemoryAllocator UNSAFE;
public static final MemoryAllocator HEAP;
}Base class representing a memory location with object reference and offset for unified memory addressing.
public class MemoryLocation {
public MemoryLocation(Object obj, long offset);
public MemoryLocation();
public void setObjAndOffset(Object newObj, long newOffset);
public final Object getBaseObject();
public final long getBaseOffset();
}Represents a contiguous memory block with fixed size, extending MemoryLocation with size information and utility operations.
public class MemoryBlock extends MemoryLocation {
public MemoryBlock(Object obj, long offset, long length);
public static MemoryBlock fromLongArray(final long[] array);
public long size();
public void fill(byte value);
public int pageNumber;
public static final int NO_PAGE_NUMBER = -1;
public static final int FREED_IN_TMM_PAGE_NUMBER = -2;
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
}Memory allocator using JVM heap with object pooling for large allocations to improve performance and reduce garbage collection pressure.
public class HeapMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError;
public void free(MemoryBlock memory);
}Memory allocator using off-heap memory via Unsafe for maximum performance and to avoid JVM heap limitations.
public class UnsafeMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError;
public void free(MemoryBlock memory);
}import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
// Use heap allocator for smaller allocations
MemoryAllocator heapAllocator = MemoryAllocator.HEAP;
MemoryBlock heapBlock = heapAllocator.allocate(1024);
// Use unsafe allocator for larger, high-performance allocations
MemoryAllocator unsafeAllocator = MemoryAllocator.UNSAFE;
MemoryBlock unsafeBlock = unsafeAllocator.allocate(1024 * 1024);
// Clean up
heapAllocator.free(heapBlock);
unsafeAllocator.free(unsafeBlock);import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.Platform;
// Create memory block from long array
long[] data = {1L, 2L, 3L, 4L, 5L};
MemoryBlock block = MemoryBlock.fromLongArray(data);
// Access memory block properties
Object baseObject = block.getBaseObject();
long baseOffset = block.getBaseOffset();
long size = block.size();
// Fill block with specific value
block.fill((byte) 0x42);
// Direct memory access through Platform
long value = Platform.getLong(baseObject, baseOffset);
Platform.putLong(baseObject, baseOffset + 8, 999L);import org.apache.spark.unsafe.memory.MemoryLocation;
// Create memory location
byte[] array = new byte[1000];
MemoryLocation location = new MemoryLocation(array, 0);
// Update location
location.setObjAndOffset(array, 100);
// Access location properties
Object obj = location.getBaseObject();
long offset = location.getBaseOffset();import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryAllocator;
MemoryBlock block = MemoryAllocator.UNSAFE.allocate(1024);
// Set page number for TaskMemoryManager integration
block.pageNumber = 42;
// Check page status
if (block.pageNumber == MemoryBlock.NO_PAGE_NUMBER) {
// Block is not managed by TaskMemoryManager
}
// Free and check status
MemoryAllocator.UNSAFE.free(block);
if (block.pageNumber == MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) {
// Block has been freed
}import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
// When MEMORY_DEBUG_FILL_ENABLED is true, allocated memory is filled
// with MEMORY_DEBUG_FILL_CLEAN_VALUE and freed memory with
// MEMORY_DEBUG_FILL_FREED_VALUE for debugging purposes
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
MemoryBlock block = MemoryAllocator.UNSAFE.allocate(1024);
// Block is filled with 0xa5 (MEMORY_DEBUG_FILL_CLEAN_VALUE)
MemoryAllocator.UNSAFE.free(block);
// Block is filled with 0x5a (MEMORY_DEBUG_FILL_FREED_VALUE)
}