CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-network-common-2-12

Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.

Pending
Overview
Eval results
Files

shuffle-database.mddocs/

Shuffle Database

The shuffle database API provides specialized key-value database functionality for handling shuffle data storage in Apache Spark. It supports both LevelDB and RocksDB backends, offering high-performance storage solutions optimized for Spark's shuffle operations with features like atomic operations, iteration support, and version management.

Capabilities

Database Interface

Core interface for key-value database operations used in shuffle data management.

public interface DB extends Closeable {
    /**
     * Store a key-value pair in the database
     * @param key - byte array representing the key
     * @param value - byte array representing the value to store
     * @throws IOException if the put operation fails
     */
    void put(byte[] key, byte[] value) throws IOException;
    
    /**
     * Retrieve a value by its key from the database
     * @param key - byte array representing the key to look up
     * @return byte array containing the value, or null if key not found
     * @throws IOException if the get operation fails
     */
    byte[] get(byte[] key) throws IOException;
    
    /**
     * Delete a key-value pair from the database
     * @param key - byte array representing the key to delete
     * @throws IOException if the delete operation fails
     */
    void delete(byte[] key) throws IOException;
    
    /**
     * Create an iterator for traversing all key-value pairs in the database
     * @return DBIterator for iterating over database entries
     */
    DBIterator iterator();
    
    /**
     * Close the database and release all associated resources
     * @throws IOException if the close operation fails
     */
    void close() throws IOException;
}

Database Iterator

Iterator interface for traversing database entries with proper resource management.

public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {
    /**
     * Check if there are more entries to iterate over
     * @return boolean indicating if more entries exist
     */
    @Override
    boolean hasNext();
    
    /**
     * Get the next key-value pair from the iterator
     * @return Map.Entry containing the next key-value pair
     * @throws NoSuchElementException if no more entries exist
     */
    @Override
    Map.Entry<byte[], byte[]> next();
    
    /**
     * Close the iterator and release associated resources
     * @throws IOException if the close operation fails
     */
    @Override
    void close() throws IOException;
}

Database Backends

Database Backend Enumeration

Enumeration of supported database backend implementations.

public enum DBBackend {
    LEVELDB("leveldb"),
    ROCKSDB("rocksdb");
    
    private final String name;
    
    DBBackend(String name) {
        this.name = name;
    }
    
    /**
     * Generate a filename for the database with the given prefix
     * @param prefix - String prefix for the database filename
     * @return String representing the complete filename
     */
    public String fileName(String prefix) {
        return prefix + "." + name;
    }
    
    /**
     * Get the backend name
     * @return String representing the backend name
     */
    public String name() {
        return name;
    }
    
    /**
     * Get a database backend by name
     * @param value - String name of the backend ("leveldb" or "rocksdb")
     * @return DBBackend corresponding to the name
     * @throws IllegalArgumentException if the name is not recognized
     */
    public static DBBackend byName(String value) {
        for (DBBackend backend : values()) {
            if (backend.name.equals(value)) {
                return backend;
            }
        }
        throw new IllegalArgumentException("Unknown DB backend: " + value);
    }
}

LevelDB Implementation

LevelDB-based database implementation for shuffle data storage.

public class LevelDB implements DB {
    /**
     * Create a LevelDB database instance
     * @param file - File path where the database should be stored
     * @throws IOException if database creation or opening fails
     */
    public LevelDB(File file) throws IOException;
    
    @Override
    public void put(byte[] key, byte[] value) throws IOException;
    
    @Override
    public byte[] get(byte[] key) throws IOException;
    
    @Override
    public void delete(byte[] key) throws IOException;
    
    @Override
    public DBIterator iterator();
    
    @Override
    public void close() throws IOException;
    
    /**
     * Get the database file location
     * @return File representing the database location
     */
    public File getFile();
    
    /**
     * Check if the database is closed
     * @return boolean indicating if the database is closed
     */
    public boolean isClosed();
}

LevelDB Iterator

Iterator implementation for LevelDB databases.

public class LevelDBIterator implements DBIterator {
    /**
     * Create a LevelDB iterator (typically created through LevelDB.iterator())
     * @param db - LevelDB instance to iterate over
     */
    LevelDBIterator(LevelDB db);
    
    @Override
    public boolean hasNext();
    
    @Override
    public Map.Entry<byte[], byte[]> next();
    
    @Override
    public void close() throws IOException;
}

RocksDB Implementation

RocksDB-based database implementation for shuffle data storage with enhanced performance features.

public class RocksDB implements DB {
    /**
     * Create a RocksDB database instance
     * @param file - File path where the database should be stored
     * @throws IOException if database creation or opening fails
     */
    public RocksDB(File file) throws IOException;
    
    @Override
    public void put(byte[] key, byte[] value) throws IOException;
    
    @Override
    public byte[] get(byte[] key) throws IOException;
    
    @Override
    public void delete(byte[] key) throws IOException;
    
    @Override
    public DBIterator iterator();
    
    @Override
    public void close() throws IOException;
    
    /**
     * Get the database file location
     * @return File representing the database location
     */
    public File getFile();
    
    /**
     * Check if the database is closed
     * @return boolean indicating if the database is closed
     */
    public boolean isClosed();
    
    /**
     * Perform a manual compaction of the database
     * @throws IOException if compaction fails
     */
    public void compactRange() throws IOException;
}

RocksDB Iterator

Iterator implementation for RocksDB databases.

public class RocksDBIterator implements DBIterator {
    /**
     * Create a RocksDB iterator (typically created through RocksDB.iterator())
     * @param db - RocksDB instance to iterate over
     */
    RocksDBIterator(RocksDB db);
    
    @Override
    public boolean hasNext();
    
    @Override
    public Map.Entry<byte[], byte[]> next();
    
    @Override
    public void close() throws IOException;
}

Version Management

Store Version

Version management for shuffle store data with backward compatibility support.

public class StoreVersion {
    /**
     * Current version of the store format
     */
    public static final StoreVersion CURRENT = new StoreVersion(1, 0);
    
    /**
     * Create a store version
     * @param major - Major version number
     * @param minor - Minor version number
     */
    public StoreVersion(int major, int minor);
    
    /**
     * Get the major version number
     * @return int representing the major version
     */
    public int major();
    
    /**
     * Get the minor version number
     * @return int representing the minor version
     */
    public int minor();
    
    /**
     * Check if this version is compatible with another version
     * @param other - StoreVersion to check compatibility against
     * @return boolean indicating if versions are compatible
     */
    public boolean isCompatible(StoreVersion other);
    
    /**
     * Write the version to a byte array
     * @return byte array containing the encoded version
     */
    public byte[] toBytes();
    
    /**
     * Read a version from a byte array
     * @param bytes - byte array containing the encoded version
     * @return StoreVersion decoded from the bytes
     * @throws IllegalArgumentException if bytes are invalid
     */
    public static StoreVersion fromBytes(byte[] bytes);
    
    @Override
    public String toString();
    
    @Override
    public boolean equals(Object obj);
    
    @Override
    public int hashCode();
}

Database Providers

Database Provider Interface

Base interface for database provider implementations.

public interface DBProvider {
    /**
     * Initialize the database provider with configuration
     * @param dbFile - File location for the database
     * @param version - StoreVersion for the database format
     * @throws IOException if initialization fails
     */
    void init(File dbFile, StoreVersion version) throws IOException;
    
    /**
     * Get the database instance
     * @return DB instance for database operations
     * @throws IOException if database access fails
     */
    DB getDB() throws IOException;
    
    /**
     * Close the database provider and cleanup resources
     * @throws IOException if cleanup fails
     */
    void close() throws IOException;
}

LevelDB Provider

Provider implementation for LevelDB databases.

public class LevelDBProvider implements DBProvider {
    /**
     * Create a LevelDB provider instance
     */
    public LevelDBProvider();
    
    @Override
    public void init(File dbFile, StoreVersion version) throws IOException;
    
    @Override
    public DB getDB() throws IOException;
    
    @Override
    public void close() throws IOException;
    
    /**
     * Check if LevelDB is available on the system
     * @return boolean indicating if LevelDB native libraries are available
     */
    public static boolean isAvailable();
}

RocksDB Provider

Provider implementation for RocksDB databases.

public class RocksDBProvider implements DBProvider {
    /**
     * Create a RocksDB provider instance
     */
    public RocksDBProvider();
    
    @Override
    public void init(File dbFile, StoreVersion version) throws IOException;
    
    @Override
    public DB getDB() throws IOException;
    
    @Override
    public void close() throws IOException;
    
    /**
     * Check if RocksDB is available on the system
     * @return boolean indicating if RocksDB native libraries are available
     */
    public static boolean isAvailable();
}

Usage Examples

Basic Database Operations

import org.apache.spark.network.shuffledb.*;
import java.io.File;

// Create database directory
File dbDir = new File("shuffle-data");
dbDir.mkdirs();

// Create LevelDB instance
try (LevelDB levelDB = new LevelDB(dbDir)) {
    // Store key-value pairs
    String key1 = "shuffle-block-1";
    String value1 = "block data content 1";
    levelDB.put(key1.getBytes(), value1.getBytes());
    
    String key2 = "shuffle-block-2";
    String value2 = "block data content 2";
    levelDB.put(key2.getBytes(), value2.getBytes());
    
    // Retrieve values
    byte[] retrievedValue1 = levelDB.get(key1.getBytes());
    if (retrievedValue1 != null) {
        System.out.println("Retrieved: " + new String(retrievedValue1));
    }
    
    // Check if key exists
    byte[] nonExistentValue = levelDB.get("non-existent-key".getBytes());
    System.out.println("Non-existent key result: " + (nonExistentValue == null ? "null" : "found"));
    
    // Delete a key
    levelDB.delete(key2.getBytes());
    
    // Verify deletion
    byte[] deletedValue = levelDB.get(key2.getBytes());
    System.out.println("Deleted key result: " + (deletedValue == null ? "deleted" : "still exists"));
    
} catch (IOException e) {
    System.err.println("Database operation failed: " + e.getMessage());
}

Database Iteration

import org.apache.spark.network.shuffledb.*;
import java.util.Map;

// Populate database with test data
try (RocksDB rocksDB = new RocksDB(new File("iteration-test"))) {
    // Store multiple key-value pairs
    for (int i = 0; i < 10; i++) {
        String key = "key-" + String.format("%03d", i);
        String value = "value-" + i;
        rocksDB.put(key.getBytes(), value.getBytes());
    }
    
    // Iterate over all entries
    System.out.println("Database contents:");
    try (DBIterator iterator = rocksDB.iterator()) {
        while (iterator.hasNext()) {
            Map.Entry<byte[], byte[]> entry = iterator.next();
            String key = new String(entry.getKey());
            String value = new String(entry.getValue());
            System.out.println("  " + key + " = " + value);
        }
    }
    
} catch (IOException e) {
    System.err.println("Database iteration failed: " + e.getMessage());
}

Backend Selection and Availability

// Check backend availability
System.out.println("LevelDB available: " + LevelDBProvider.isAvailable());
System.out.println("RocksDB available: " + RocksDBProvider.isAvailable());

// Select backend based on availability and preference
DBBackend selectedBackend;
if (RocksDBProvider.isAvailable()) {
    selectedBackend = DBBackend.ROCKSDB;
    System.out.println("Using RocksDB backend");
} else if (LevelDBProvider.isAvailable()) {
    selectedBackend = DBBackend.LEVELDB;
    System.out.println("Using LevelDB backend");
} else {
    throw new RuntimeException("No database backend available");
}

// Create database with selected backend
File dbFile = new File("shuffle-db");
String filename = selectedBackend.fileName("shuffle-store");
File fullDbPath = new File(dbFile, filename);

DB database;
switch (selectedBackend) {
    case LEVELDB:
        database = new LevelDB(fullDbPath);
        break;
    case ROCKSDB:
        database = new RocksDB(fullDbPath);
        break;
    default:
        throw new IllegalArgumentException("Unsupported backend: " + selectedBackend);
}

// Use database...
try {
    database.put("test-key".getBytes(), "test-value".getBytes());
    System.out.println("Database created and tested successfully");
} finally {
    database.close();
}

Database Provider Pattern

import org.apache.spark.network.shuffledb.*;

// Create database provider
DBProvider provider;
if (RocksDBProvider.isAvailable()) {
    provider = new RocksDBProvider();
} else {
    provider = new LevelDBProvider();
}

try {
    // Initialize provider with version
    File dbLocation = new File("versioned-shuffle-db");
    StoreVersion version = StoreVersion.CURRENT;
    provider.init(dbLocation, version);
    
    // Get database instance
    DB database = provider.getDB();
    
    // Use database for shuffle operations
    String blockId = "shuffle_1_2_0";
    byte[] blockData = "compressed shuffle block data".getBytes();
    database.put(blockId.getBytes(), blockData);
    
    // Retrieve and verify
    byte[] retrievedData = database.get(blockId.getBytes());
    System.out.println("Block stored and retrieved successfully: " + (retrievedData != null));
    
} catch (IOException e) {
    System.err.println("Provider operation failed: " + e.getMessage());
} finally {
    try {
        provider.close();
    } catch (IOException e) {
        System.err.println("Provider cleanup failed: " + e.getMessage());
    }
}

Version Management

// Work with store versions
StoreVersion currentVersion = StoreVersion.CURRENT;
System.out.println("Current version: " + currentVersion);
System.out.println("Major: " + currentVersion.major() + ", Minor: " + currentVersion.minor());

// Create custom version
StoreVersion customVersion = new StoreVersion(1, 1);
System.out.println("Custom version: " + customVersion);

// Check compatibility
boolean compatible = currentVersion.isCompatible(customVersion);
System.out.println("Versions compatible: " + compatible);

// Serialize version
byte[] versionBytes = currentVersion.toBytes();
System.out.println("Serialized version length: " + versionBytes.length + " bytes");

// Deserialize version
StoreVersion deserializedVersion = StoreVersion.fromBytes(versionBytes);
System.out.println("Deserialized version: " + deserializedVersion);
System.out.println("Versions equal: " + currentVersion.equals(deserializedVersion));

Batch Operations

// Perform batch operations for better performance
try (RocksDB rocksDB = new RocksDB(new File("batch-operations"))) {
    
    // Batch insert operation
    System.out.println("Performing batch insert...");
    long startTime = System.currentTimeMillis();
    
    for (int partition = 0; partition < 100; partition++) {
        for (int block = 0; block < 50; block++) {
            String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
            String value = "block-data-" + partition + "-" + block;
            rocksDB.put(key.getBytes(), value.getBytes());
        }
    }
    
    long insertTime = System.currentTimeMillis() - startTime;
    System.out.println("Batch insert completed in " + insertTime + "ms");
    
    // Batch read operation
    System.out.println("Performing batch read...");
    startTime = System.currentTimeMillis();
    
    int foundCount = 0;
    for (int partition = 0; partition < 100; partition++) {
        for (int block = 0; block < 50; block++) {
            String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
            byte[] value = rocksDB.get(key.getBytes());
            if (value != null) {
                foundCount++;
            }
        }
    }
    
    long readTime = System.currentTimeMillis() - startTime;
    System.out.println("Batch read completed in " + readTime + "ms");
    System.out.println("Found " + foundCount + " entries");
    
    // Manual compaction for RocksDB
    if (rocksDB instanceof RocksDB) {
        System.out.println("Performing manual compaction...");
        rocksDB.compactRange();
        System.out.println("Compaction completed");
    }
    
} catch (IOException e) {
    System.err.println("Batch operation failed: " + e.getMessage());
}

Database Cleanup and Resource Management

// Proper resource management pattern
public class ShuffleDataManager {
    private DB database;
    private final File dbLocation;
    
    public ShuffleDataManager(File dbLocation, DBBackend backend) throws IOException {
        this.dbLocation = dbLocation;
        
        switch (backend) {
            case LEVELDB:
                this.database = new LevelDB(dbLocation);
                break;
            case ROCKSDB:
                this.database = new RocksDB(dbLocation);
                break;
            default:
                throw new IllegalArgumentException("Unsupported backend: " + backend);
        }
    }
    
    public void storeShuffleBlock(String blockId, byte[] data) throws IOException {
        database.put(blockId.getBytes(), data);
    }
    
    public byte[] getShuffleBlock(String blockId) throws IOException {
        return database.get(blockId.getBytes());
    }
    
    public void deleteShuffleBlock(String blockId) throws IOException {
        database.delete(blockId.getBytes());
    }
    
    public void cleanup() {
        if (database != null) {
            try {
                database.close();
            } catch (IOException e) {
                System.err.println("Failed to close database: " + e.getMessage());
            }
        }
    }
    
    // For testing: cleanup database files
    public void deleteDatabase() {
        cleanup();
        if (dbLocation.exists()) {
            deleteRecursively(dbLocation);
        }
    }
    
    private void deleteRecursively(File file) {
        if (file.isDirectory()) {
            File[] children = file.listFiles();
            if (children != null) {
                for (File child : children) {
                    deleteRecursively(child);
                }
            }
        }
        file.delete();
    }
}

// Usage
try {
    ShuffleDataManager manager = new ShuffleDataManager(
        new File("managed-shuffle-db"), 
        DBBackend.ROCKSDB
    );
    
    // Store shuffle data
    manager.storeShuffleBlock("block-1", "shuffle data 1".getBytes());
    manager.storeShuffleBlock("block-2", "shuffle data 2".getBytes());
    
    // Retrieve shuffle data
    byte[] block1Data = manager.getShuffleBlock("block-1");
    System.out.println("Retrieved block 1: " + new String(block1Data));
    
    // Cleanup
    manager.cleanup();
    
} catch (IOException e) {
    System.err.println("Shuffle data manager failed: " + e.getMessage());
}

Best Practices

Performance Optimization

  1. Batch Operations: Group multiple put/get/delete operations together for better performance
  2. Iterator Management: Always close iterators to prevent resource leaks
  3. Key Design: Use consistent key naming schemes for better locality
  4. Compaction: Use manual compaction for RocksDB in write-heavy scenarios

Resource Management

  1. Database Lifecycle: Always close databases using try-with-resources or explicit close() calls
  2. Provider Pattern: Use DBProvider for better abstraction and configuration management
  3. Version Compatibility: Check version compatibility when opening existing databases
  4. Backend Selection: Choose backend based on performance requirements and availability

Error Handling

// Robust error handling pattern
public void robustDatabaseOperation(DB database, String key, byte[] value) {
    try {
        database.put(key.getBytes(), value);
        System.out.println("Successfully stored key: " + key);
    } catch (IOException e) {
        System.err.println("Failed to store key " + key + ": " + e.getMessage());
        // Implement retry logic or fallback behavior
        handleDatabaseError(e, key, value);
    }
}

private void handleDatabaseError(IOException e, String key, byte[] value) {
    // Log error details
    System.err.println("Database error details: " + e.getClass().getSimpleName());
    
    // Implement retry with exponential backoff
    // Or write to backup storage
    // Or queue for later processing
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12

docs

buffer-management.md

client-operations.md

configuration-management.md

index.md

message-protocol.md

security-authentication.md

server-operations.md

shuffle-database.md

transport-context.md

tile.json