Core networking functionality for Apache Spark including transport layers, buffer management, client-server communication, cryptographic protocols, SASL authentication, and shuffle database support using Netty for high-performance network I/O.
—
The shuffle database API provides specialized key-value database functionality for handling shuffle data storage in Apache Spark. It supports both LevelDB and RocksDB backends, offering high-performance storage solutions optimized for Spark's shuffle operations with features like atomic operations, iteration support, and version management.
Core interface for key-value database operations used in shuffle data management.
public interface DB extends Closeable {
/**
* Store a key-value pair in the database
* @param key - byte array representing the key
* @param value - byte array representing the value to store
* @throws IOException if the put operation fails
*/
void put(byte[] key, byte[] value) throws IOException;
/**
* Retrieve a value by its key from the database
* @param key - byte array representing the key to look up
* @return byte array containing the value, or null if key not found
* @throws IOException if the get operation fails
*/
byte[] get(byte[] key) throws IOException;
/**
* Delete a key-value pair from the database
* @param key - byte array representing the key to delete
* @throws IOException if the delete operation fails
*/
void delete(byte[] key) throws IOException;
/**
* Create an iterator for traversing all key-value pairs in the database
* @return DBIterator for iterating over database entries
*/
DBIterator iterator();
/**
* Close the database and release all associated resources
* @throws IOException if the close operation fails
*/
void close() throws IOException;
}Iterator interface for traversing database entries with proper resource management.
public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {
/**
* Check if there are more entries to iterate over
* @return boolean indicating if more entries exist
*/
@Override
boolean hasNext();
/**
* Get the next key-value pair from the iterator
* @return Map.Entry containing the next key-value pair
* @throws NoSuchElementException if no more entries exist
*/
@Override
Map.Entry<byte[], byte[]> next();
/**
* Close the iterator and release associated resources
* @throws IOException if the close operation fails
*/
@Override
void close() throws IOException;
}Enumeration of supported database backend implementations.
public enum DBBackend {
LEVELDB("leveldb"),
ROCKSDB("rocksdb");
private final String name;
DBBackend(String name) {
this.name = name;
}
/**
* Generate a filename for the database with the given prefix
* @param prefix - String prefix for the database filename
* @return String representing the complete filename
*/
public String fileName(String prefix) {
return prefix + "." + name;
}
/**
* Get the backend name
* @return String representing the backend name
*/
public String name() {
return name;
}
/**
* Get a database backend by name
* @param value - String name of the backend ("leveldb" or "rocksdb")
* @return DBBackend corresponding to the name
* @throws IllegalArgumentException if the name is not recognized
*/
public static DBBackend byName(String value) {
for (DBBackend backend : values()) {
if (backend.name.equals(value)) {
return backend;
}
}
throw new IllegalArgumentException("Unknown DB backend: " + value);
}
}LevelDB-based database implementation for shuffle data storage.
public class LevelDB implements DB {
/**
* Create a LevelDB database instance
* @param file - File path where the database should be stored
* @throws IOException if database creation or opening fails
*/
public LevelDB(File file) throws IOException;
@Override
public void put(byte[] key, byte[] value) throws IOException;
@Override
public byte[] get(byte[] key) throws IOException;
@Override
public void delete(byte[] key) throws IOException;
@Override
public DBIterator iterator();
@Override
public void close() throws IOException;
/**
* Get the database file location
* @return File representing the database location
*/
public File getFile();
/**
* Check if the database is closed
* @return boolean indicating if the database is closed
*/
public boolean isClosed();
}Iterator implementation for LevelDB databases.
public class LevelDBIterator implements DBIterator {
/**
* Create a LevelDB iterator (typically created through LevelDB.iterator())
* @param db - LevelDB instance to iterate over
*/
LevelDBIterator(LevelDB db);
@Override
public boolean hasNext();
@Override
public Map.Entry<byte[], byte[]> next();
@Override
public void close() throws IOException;
}RocksDB-based database implementation for shuffle data storage with enhanced performance features.
public class RocksDB implements DB {
/**
* Create a RocksDB database instance
* @param file - File path where the database should be stored
* @throws IOException if database creation or opening fails
*/
public RocksDB(File file) throws IOException;
@Override
public void put(byte[] key, byte[] value) throws IOException;
@Override
public byte[] get(byte[] key) throws IOException;
@Override
public void delete(byte[] key) throws IOException;
@Override
public DBIterator iterator();
@Override
public void close() throws IOException;
/**
* Get the database file location
* @return File representing the database location
*/
public File getFile();
/**
* Check if the database is closed
* @return boolean indicating if the database is closed
*/
public boolean isClosed();
/**
* Perform a manual compaction of the database
* @throws IOException if compaction fails
*/
public void compactRange() throws IOException;
}Iterator implementation for RocksDB databases.
public class RocksDBIterator implements DBIterator {
/**
* Create a RocksDB iterator (typically created through RocksDB.iterator())
* @param db - RocksDB instance to iterate over
*/
RocksDBIterator(RocksDB db);
@Override
public boolean hasNext();
@Override
public Map.Entry<byte[], byte[]> next();
@Override
public void close() throws IOException;
}Version management for shuffle store data with backward compatibility support.
public class StoreVersion {
/**
* Current version of the store format
*/
public static final StoreVersion CURRENT = new StoreVersion(1, 0);
/**
* Create a store version
* @param major - Major version number
* @param minor - Minor version number
*/
public StoreVersion(int major, int minor);
/**
* Get the major version number
* @return int representing the major version
*/
public int major();
/**
* Get the minor version number
* @return int representing the minor version
*/
public int minor();
/**
* Check if this version is compatible with another version
* @param other - StoreVersion to check compatibility against
* @return boolean indicating if versions are compatible
*/
public boolean isCompatible(StoreVersion other);
/**
* Write the version to a byte array
* @return byte array containing the encoded version
*/
public byte[] toBytes();
/**
* Read a version from a byte array
* @param bytes - byte array containing the encoded version
* @return StoreVersion decoded from the bytes
* @throws IllegalArgumentException if bytes are invalid
*/
public static StoreVersion fromBytes(byte[] bytes);
@Override
public String toString();
@Override
public boolean equals(Object obj);
@Override
public int hashCode();
}Base interface for database provider implementations.
public interface DBProvider {
/**
* Initialize the database provider with configuration
* @param dbFile - File location for the database
* @param version - StoreVersion for the database format
* @throws IOException if initialization fails
*/
void init(File dbFile, StoreVersion version) throws IOException;
/**
* Get the database instance
* @return DB instance for database operations
* @throws IOException if database access fails
*/
DB getDB() throws IOException;
/**
* Close the database provider and cleanup resources
* @throws IOException if cleanup fails
*/
void close() throws IOException;
}Provider implementation for LevelDB databases.
public class LevelDBProvider implements DBProvider {
/**
* Create a LevelDB provider instance
*/
public LevelDBProvider();
@Override
public void init(File dbFile, StoreVersion version) throws IOException;
@Override
public DB getDB() throws IOException;
@Override
public void close() throws IOException;
/**
* Check if LevelDB is available on the system
* @return boolean indicating if LevelDB native libraries are available
*/
public static boolean isAvailable();
}Provider implementation for RocksDB databases.
public class RocksDBProvider implements DBProvider {
/**
* Create a RocksDB provider instance
*/
public RocksDBProvider();
@Override
public void init(File dbFile, StoreVersion version) throws IOException;
@Override
public DB getDB() throws IOException;
@Override
public void close() throws IOException;
/**
* Check if RocksDB is available on the system
* @return boolean indicating if RocksDB native libraries are available
*/
public static boolean isAvailable();
}import org.apache.spark.network.shuffledb.*;
import java.io.File;
// Create database directory
File dbDir = new File("shuffle-data");
dbDir.mkdirs();
// Create LevelDB instance
try (LevelDB levelDB = new LevelDB(dbDir)) {
// Store key-value pairs
String key1 = "shuffle-block-1";
String value1 = "block data content 1";
levelDB.put(key1.getBytes(), value1.getBytes());
String key2 = "shuffle-block-2";
String value2 = "block data content 2";
levelDB.put(key2.getBytes(), value2.getBytes());
// Retrieve values
byte[] retrievedValue1 = levelDB.get(key1.getBytes());
if (retrievedValue1 != null) {
System.out.println("Retrieved: " + new String(retrievedValue1));
}
// Check if key exists
byte[] nonExistentValue = levelDB.get("non-existent-key".getBytes());
System.out.println("Non-existent key result: " + (nonExistentValue == null ? "null" : "found"));
// Delete a key
levelDB.delete(key2.getBytes());
// Verify deletion
byte[] deletedValue = levelDB.get(key2.getBytes());
System.out.println("Deleted key result: " + (deletedValue == null ? "deleted" : "still exists"));
} catch (IOException e) {
System.err.println("Database operation failed: " + e.getMessage());
}import org.apache.spark.network.shuffledb.*;
import java.util.Map;
// Populate database with test data
try (RocksDB rocksDB = new RocksDB(new File("iteration-test"))) {
// Store multiple key-value pairs
for (int i = 0; i < 10; i++) {
String key = "key-" + String.format("%03d", i);
String value = "value-" + i;
rocksDB.put(key.getBytes(), value.getBytes());
}
// Iterate over all entries
System.out.println("Database contents:");
try (DBIterator iterator = rocksDB.iterator()) {
while (iterator.hasNext()) {
Map.Entry<byte[], byte[]> entry = iterator.next();
String key = new String(entry.getKey());
String value = new String(entry.getValue());
System.out.println(" " + key + " = " + value);
}
}
} catch (IOException e) {
System.err.println("Database iteration failed: " + e.getMessage());
}// Check backend availability
System.out.println("LevelDB available: " + LevelDBProvider.isAvailable());
System.out.println("RocksDB available: " + RocksDBProvider.isAvailable());
// Select backend based on availability and preference
DBBackend selectedBackend;
if (RocksDBProvider.isAvailable()) {
selectedBackend = DBBackend.ROCKSDB;
System.out.println("Using RocksDB backend");
} else if (LevelDBProvider.isAvailable()) {
selectedBackend = DBBackend.LEVELDB;
System.out.println("Using LevelDB backend");
} else {
throw new RuntimeException("No database backend available");
}
// Create database with selected backend
File dbFile = new File("shuffle-db");
String filename = selectedBackend.fileName("shuffle-store");
File fullDbPath = new File(dbFile, filename);
DB database;
switch (selectedBackend) {
case LEVELDB:
database = new LevelDB(fullDbPath);
break;
case ROCKSDB:
database = new RocksDB(fullDbPath);
break;
default:
throw new IllegalArgumentException("Unsupported backend: " + selectedBackend);
}
// Use database...
try {
database.put("test-key".getBytes(), "test-value".getBytes());
System.out.println("Database created and tested successfully");
} finally {
database.close();
}import org.apache.spark.network.shuffledb.*;
// Create database provider
DBProvider provider;
if (RocksDBProvider.isAvailable()) {
provider = new RocksDBProvider();
} else {
provider = new LevelDBProvider();
}
try {
// Initialize provider with version
File dbLocation = new File("versioned-shuffle-db");
StoreVersion version = StoreVersion.CURRENT;
provider.init(dbLocation, version);
// Get database instance
DB database = provider.getDB();
// Use database for shuffle operations
String blockId = "shuffle_1_2_0";
byte[] blockData = "compressed shuffle block data".getBytes();
database.put(blockId.getBytes(), blockData);
// Retrieve and verify
byte[] retrievedData = database.get(blockId.getBytes());
System.out.println("Block stored and retrieved successfully: " + (retrievedData != null));
} catch (IOException e) {
System.err.println("Provider operation failed: " + e.getMessage());
} finally {
try {
provider.close();
} catch (IOException e) {
System.err.println("Provider cleanup failed: " + e.getMessage());
}
}// Work with store versions
StoreVersion currentVersion = StoreVersion.CURRENT;
System.out.println("Current version: " + currentVersion);
System.out.println("Major: " + currentVersion.major() + ", Minor: " + currentVersion.minor());
// Create custom version
StoreVersion customVersion = new StoreVersion(1, 1);
System.out.println("Custom version: " + customVersion);
// Check compatibility
boolean compatible = currentVersion.isCompatible(customVersion);
System.out.println("Versions compatible: " + compatible);
// Serialize version
byte[] versionBytes = currentVersion.toBytes();
System.out.println("Serialized version length: " + versionBytes.length + " bytes");
// Deserialize version
StoreVersion deserializedVersion = StoreVersion.fromBytes(versionBytes);
System.out.println("Deserialized version: " + deserializedVersion);
System.out.println("Versions equal: " + currentVersion.equals(deserializedVersion));// Perform batch operations for better performance
try (RocksDB rocksDB = new RocksDB(new File("batch-operations"))) {
// Batch insert operation
System.out.println("Performing batch insert...");
long startTime = System.currentTimeMillis();
for (int partition = 0; partition < 100; partition++) {
for (int block = 0; block < 50; block++) {
String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
String value = "block-data-" + partition + "-" + block;
rocksDB.put(key.getBytes(), value.getBytes());
}
}
long insertTime = System.currentTimeMillis() - startTime;
System.out.println("Batch insert completed in " + insertTime + "ms");
// Batch read operation
System.out.println("Performing batch read...");
startTime = System.currentTimeMillis();
int foundCount = 0;
for (int partition = 0; partition < 100; partition++) {
for (int block = 0; block < 50; block++) {
String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
byte[] value = rocksDB.get(key.getBytes());
if (value != null) {
foundCount++;
}
}
}
long readTime = System.currentTimeMillis() - startTime;
System.out.println("Batch read completed in " + readTime + "ms");
System.out.println("Found " + foundCount + " entries");
// Manual compaction for RocksDB
if (rocksDB instanceof RocksDB) {
System.out.println("Performing manual compaction...");
rocksDB.compactRange();
System.out.println("Compaction completed");
}
} catch (IOException e) {
System.err.println("Batch operation failed: " + e.getMessage());
}// Proper resource management pattern
public class ShuffleDataManager {
private DB database;
private final File dbLocation;
public ShuffleDataManager(File dbLocation, DBBackend backend) throws IOException {
this.dbLocation = dbLocation;
switch (backend) {
case LEVELDB:
this.database = new LevelDB(dbLocation);
break;
case ROCKSDB:
this.database = new RocksDB(dbLocation);
break;
default:
throw new IllegalArgumentException("Unsupported backend: " + backend);
}
}
public void storeShuffleBlock(String blockId, byte[] data) throws IOException {
database.put(blockId.getBytes(), data);
}
public byte[] getShuffleBlock(String blockId) throws IOException {
return database.get(blockId.getBytes());
}
public void deleteShuffleBlock(String blockId) throws IOException {
database.delete(blockId.getBytes());
}
public void cleanup() {
if (database != null) {
try {
database.close();
} catch (IOException e) {
System.err.println("Failed to close database: " + e.getMessage());
}
}
}
// For testing: cleanup database files
public void deleteDatabase() {
cleanup();
if (dbLocation.exists()) {
deleteRecursively(dbLocation);
}
}
private void deleteRecursively(File file) {
if (file.isDirectory()) {
File[] children = file.listFiles();
if (children != null) {
for (File child : children) {
deleteRecursively(child);
}
}
}
file.delete();
}
}
// Usage
try {
ShuffleDataManager manager = new ShuffleDataManager(
new File("managed-shuffle-db"),
DBBackend.ROCKSDB
);
// Store shuffle data
manager.storeShuffleBlock("block-1", "shuffle data 1".getBytes());
manager.storeShuffleBlock("block-2", "shuffle data 2".getBytes());
// Retrieve shuffle data
byte[] block1Data = manager.getShuffleBlock("block-1");
System.out.println("Retrieved block 1: " + new String(block1Data));
// Cleanup
manager.cleanup();
} catch (IOException e) {
System.err.println("Shuffle data manager failed: " + e.getMessage());
}// Robust error handling pattern
public void robustDatabaseOperation(DB database, String key, byte[] value) {
try {
database.put(key.getBytes(), value);
System.out.println("Successfully stored key: " + key);
} catch (IOException e) {
System.err.println("Failed to store key " + key + ": " + e.getMessage());
// Implement retry logic or fallback behavior
handleDatabaseError(e, key, value);
}
}
private void handleDatabaseError(IOException e, String key, byte[] value) {
// Log error details
System.err.println("Database error details: " + e.getClass().getSimpleName());
// Implement retry with exponential backoff
// Or write to backup storage
// Or queue for later processing
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-common-2-12