tessl install tessl/maven-org-apache-spark--spark-unsafe_2-13@3.5.0Low-level unsafe operations and optimized data structures for Apache Spark's internal memory management and performance-critical operations.
Abstract memory allocators supporting both heap and off-heap memory allocation with pooling, debug capabilities, and memory block abstractions. This system provides a unified interface for memory management that integrates with Spark's TaskMemoryManager for efficient resource tracking and cleanup.
Abstract interface for memory allocation supporting both heap and off-heap implementations with consistent debugging and error handling capabilities.
public interface MemoryAllocator {
MemoryBlock allocate(long size) throws OutOfMemoryError;
void free(MemoryBlock memory);
}Pre-configured allocator instances for different memory types, providing easy access to heap and off-heap allocation strategies.
// Static allocator instances
public static final MemoryAllocator UNSAFE; // Off-heap allocator
public static final MemoryAllocator HEAP; // On-heap allocatorUsage Example:
// Use off-heap memory
MemoryAllocator offHeap = MemoryAllocator.UNSAFE;
MemoryBlock block = offHeap.allocate(1024);
// ... use memory ...
offHeap.free(block);
// Use on-heap memory
MemoryAllocator onHeap = MemoryAllocator.HEAP;
MemoryBlock heapBlock = onHeap.allocate(2048);
// ... use memory ...
onHeap.free(heapBlock);Debug capabilities for tracking memory allocation and detecting memory corruption issues in development and testing environments.
public static final boolean MEMORY_DEBUG_FILL_ENABLED;
public static final byte MEMORY_DEBUG_FILL_CLEAN_VALUE = (byte)0xa5;
public static final byte MEMORY_DEBUG_FILL_FREED_VALUE = (byte)0x5a;Usage Example:
// Check if debug filling is enabled
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
// Allocated memory will be filled with MEMORY_DEBUG_FILL_CLEAN_VALUE
// Freed memory will be filled with MEMORY_DEBUG_FILL_FREED_VALUE
System.out.println("Memory debug filling is active");
}Base class for representing memory locations that can be either on-heap (object + offset) or off-heap (absolute address), providing a unified interface for memory access.
public class MemoryLocation {
public MemoryLocation(@Nullable Object obj, long offset);
public MemoryLocation();
public void setObjAndOffset(Object newObj, long newOffset);
public final Object getBaseObject();
public final long getBaseOffset();
}Usage Examples:
// On-heap memory location
byte[] array = new byte[1024];
MemoryLocation heapLocation = new MemoryLocation(array, Platform.BYTE_ARRAY_OFFSET);
// Off-heap memory location
long address = Platform.allocateMemory(1024);
MemoryLocation offHeapLocation = new MemoryLocation(null, address);
// Update location
heapLocation.setObjAndOffset(array, Platform.BYTE_ARRAY_OFFSET + 64);
// Access location data
Object baseObj = heapLocation.getBaseObject(); // array
long offset = heapLocation.getBaseOffset(); // Platform.BYTE_ARRAY_OFFSET + 64Fixed-size memory blocks with page number tracking for integration with TaskMemoryManager, supporting both heap and off-heap memory with consistent lifecycle management.
public class MemoryBlock extends MemoryLocation {
public static final int NO_PAGE_NUMBER = -1;
public static final int FREED_IN_TMM_PAGE_NUMBER = -2;
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
public int pageNumber;
public MemoryBlock(@Nullable Object obj, long offset, long length);
public long size();
public static MemoryBlock fromLongArray(final long[] array);
public void fill(byte value);
}Usage Examples:
// Create memory block from existing array
long[] dataArray = new long[128];
MemoryBlock block = MemoryBlock.fromLongArray(dataArray);
// Create memory block from allocated memory
long address = Platform.allocateMemory(1024);
MemoryBlock offHeapBlock = new MemoryBlock(null, address, 1024);
// Fill block with specific value
block.fill((byte)0xFF);
// Check block size
long blockSize = block.size(); // 128 * 8 = 1024 bytes
// Page number management for TaskMemoryManager integration
block.pageNumber = 42; // Set page number
if (block.pageNumber == MemoryBlock.NO_PAGE_NUMBER) {
// Block not managed by TaskMemoryManager
}On-heap memory allocator using JVM long arrays with intelligent pooling for large allocations, providing memory management that works within the Java garbage collector.
public class HeapMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError;
public void free(MemoryBlock memory);
}Usage Example:
HeapMemoryAllocator heapAllocator = new HeapMemoryAllocator();
// Allocate heap memory - internally uses long arrays
MemoryBlock heapBlock = heapAllocator.allocate(8192);
// Use the memory
Platform.putLong(heapBlock.getBaseObject(),
heapBlock.getBaseOffset(), 12345L);
// Free the memory (may be pooled for reuse)
heapAllocator.free(heapBlock);Off-heap memory allocator using sun.misc.Unsafe for direct system memory allocation, providing memory that exists outside the JVM heap and garbage collection.
public class UnsafeMemoryAllocator implements MemoryAllocator {
public MemoryBlock allocate(long size) throws OutOfMemoryError;
public void free(MemoryBlock memory);
}Usage Example:
UnsafeMemoryAllocator unsafeAllocator = new UnsafeMemoryAllocator();
// Allocate off-heap memory
MemoryBlock offHeapBlock = unsafeAllocator.allocate(4096);
// Use the memory - note base object is null for off-heap
Platform.putInt(offHeapBlock.getBaseObject(), // null
offHeapBlock.getBaseOffset(), // absolute address
42);
// Free the memory
unsafeAllocator.free(offHeapBlock);Convenient factory methods for creating memory blocks from existing data structures, enabling integration with existing Java collections and arrays.
Usage Examples:
// Create memory block from long array
long[] data = {1, 2, 3, 4, 5, 6, 7, 8};
MemoryBlock arrayBlock = MemoryBlock.fromLongArray(data);
// Access the underlying data
Object baseObj = arrayBlock.getBaseObject(); // the long[] array
long baseOffset = arrayBlock.getBaseOffset(); // Platform.LONG_ARRAY_OFFSET
long size = arrayBlock.size(); // 64 bytes (8 longs * 8 bytes)
// Direct memory access to array elements
long firstElement = Platform.getLong(baseObj, baseOffset);
long secondElement = Platform.getLong(baseObj, baseOffset + 8);Memory blocks support page number tracking for seamless integration with Spark's TaskMemoryManager, enabling efficient memory accounting and cleanup in distributed computing scenarios.
Usage Example:
// Create memory block for TaskMemoryManager integration
MemoryBlock managedBlock = allocator.allocate(1024);
// TaskMemoryManager assigns page number
managedBlock.pageNumber = taskMemoryManager.allocatePage(managedBlock);
// Check management status
if (managedBlock.pageNumber != MemoryBlock.NO_PAGE_NUMBER) {
// Block is managed by TaskMemoryManager
// Memory will be automatically freed when task completes
}
// Manual cleanup sets specific page numbers
managedBlock.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;