A key-value store abstraction for storing application data locally with automatic serialization, indexing, and support for multiple storage backends.
Apache Spark KVStore provides multiple storage backend implementations to suit different deployment scenarios, from development and testing to high-performance production environments. Each backend implements the same KVStore interface while offering different performance characteristics and resource requirements.
In-memory implementation that keeps all data deserialized in memory. Ideal for development, testing, and scenarios where data fits comfortably in memory.
/**
* Implementation of KVStore that keeps data deserialized in memory.
* Does not persist data; suitable for testing and development.
*/
public class InMemoryStore implements KVStore {
/**
* Creates a new in-memory store instance.
*/
public InMemoryStore();
// Inherits all KVStore interface methods
}Characteristics:
Production-ready persistent storage using Google's LevelDB. Provides excellent read performance and efficient storage with compression.
/**
* Implementation of KVStore that uses LevelDB as the underlying data store.
* Provides persistent storage with compression and efficient indexing.
*/
public class LevelDB implements KVStore {
/**
* Store version for compatibility checking.
*/
public static final long STORE_VERSION = 1L;
/**
* Key used to store version information.
*/
public static final byte[] STORE_VERSION_KEY;
/**
* Creates a LevelDB store with default serializer.
* @param path Directory path where LevelDB files will be stored
* @throws Exception if store creation fails
*/
public LevelDB(File path) throws Exception;
/**
* Creates a LevelDB store with custom serializer.
* @param path Directory path where LevelDB files will be stored
* @param serializer Custom serializer for data transformation
* @throws Exception if store creation fails
*/
public LevelDB(File path, KVStoreSerializer serializer) throws Exception;
// Inherits all KVStore interface methods
}Characteristics:
High-performance persistent storage using Facebook's RocksDB. Optimized for SSD storage and high-throughput scenarios.
/**
* Implementation of KVStore that uses RocksDB as the underlying data store.
* Provides high-performance persistent storage optimized for modern hardware.
*/
public class RocksDB implements KVStore {
/**
* Store version for compatibility checking.
*/
public static final long STORE_VERSION = 1L;
/**
* Key used to store version information.
*/
public static final byte[] STORE_VERSION_KEY;
/**
* Creates a RocksDB store with default serializer.
* @param path Directory path where RocksDB files will be stored
* @throws Exception if store creation fails
*/
public RocksDB(File path) throws Exception;
/**
* Creates a RocksDB store with custom serializer.
* @param path Directory path where RocksDB files will be stored
* @param serializer Custom serializer for data transformation
* @throws Exception if store creation fails
*/
public RocksDB(File path, KVStoreSerializer serializer) throws Exception;
// Inherits all KVStore interface methods
}Characteristics:
Usage Examples:
import org.apache.spark.util.kvstore.*;
import java.io.File;
// In-memory store for testing
KVStore memoryStore = new InMemoryStore();
// LevelDB for moderate production use
KVStore levelStore = new LevelDB(new File("/data/app/leveldb"));
// RocksDB for high-performance scenarios
KVStore rocksStore = new RocksDB(new File("/data/app/rocksdb"));
// All stores implement the same interface
public void processData(KVStore store) {
// Same code works with any backend
store.write(new MyData("key1", "value1"));
MyData data = store.read(MyData.class, "key1");
long count = store.count(MyData.class);
}// Create custom serializer by extending KVStoreSerializer
public class CustomKVStoreSerializer extends KVStoreSerializer {
public CustomKVStoreSerializer() {
super();
// Configure the inherited ObjectMapper for custom needs
mapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
}
}
// Use with any persistent backend
KVStore store = new LevelDB(new File("/data/store"), new CustomKVStoreSerializer());
// or
KVStore store = new RocksDB(new File("/data/store"), new CustomKVStoreSerializer());
// Store objects with Java 8 time types
public class Event {
@KVIndex
public String id;
@KVIndex("timestamp")
public java.time.Instant timestamp;
public Event(String id, java.time.Instant timestamp) {
this.id = id;
this.timestamp = timestamp;
}
}
store.write(new Event("event1", java.time.Instant.now()));// LevelDB with larger write buffer for bulk operations
System.setProperty("leveldb.writebuffer.size", "67108864"); // 64MB
KVStore levelStore = new LevelDB(new File("/data/leveldb"));
// RocksDB automatically uses optimized settings for:
// - Bloom filters for faster key lookups
// - Multi-level compression (LZ4 + ZSTD)
// - Index compression disabled for faster access
// - Optimized block restart intervals
KVStore rocksStore = new RocksDB(new File("/data/rocksdb"));
// Batch operations for better performance
List<MyData> largeBatch = generateLargeDataset();
for (MyData item : largeBatch) {
store.write(item); // Each backend optimizes batch writes internally
}/**
* Choose storage backend based on requirements:
*/
// Development and Testing
if (environment.equals("test") || dataSize < 100_000) {
return new InMemoryStore();
}
// Production with moderate data volumes (< 10GB)
if (dataSize < 10_000_000 && !requiresHighThroughput) {
return new LevelDB(dataPath);
}
// High-performance production (large data, high throughput)
if (hasSSDs && requiresHighThroughput) {
return new RocksDB(dataPath);
}
// Default safe choice for production
return new LevelDB(dataPath);// All stores should be properly closed
try (KVStore store = new LevelDB(new File("/data/store"))) {
// Perform operations
store.write(data);
MyData result = store.read(MyData.class, key);
// Store is automatically closed due to try-with-resources
} catch (Exception e) {
logger.error("Store operation failed", e);
}
// Manual resource management
KVStore store = new RocksDB(new File("/data/store"));
try {
// Use store
processData(store);
} finally {
store.close(); // Always close to release resources
}/**
* Utility to migrate data between different storage backends.
*/
public class StoreMigration {
public static void migrate(KVStore source, KVStore target, Class<?>... types)
throws Exception {
for (Class<?> type : types) {
// Copy metadata
Object metadata = source.getMetadata(type);
if (metadata != null) {
target.setMetadata(metadata);
}
// Copy all instances of each type
KVStoreView<?> view = source.view(type);
for (Object item : view) {
target.write(item);
}
}
}
}
// Example: Upgrade from LevelDB to RocksDB
KVStore oldStore = new LevelDB(new File("/data/old"));
KVStore newStore = new RocksDB(new File("/data/new"));
try {
StoreMigration.migrate(oldStore, newStore,
Employee.class, Task.class, Project.class);
} finally {
oldStore.close();
newStore.close();
}// Store statistics and health checks
public class StoreMonitoring {
public static void checkStoreHealth(KVStore store, Class<?>... types)
throws Exception {
System.out.println("Store Health Check:");
for (Class<?> type : types) {
long count = store.count(type);
System.out.println(type.getSimpleName() + ": " + count + " items");
}
// Check if store is responsive
long startTime = System.currentTimeMillis();
store.getMetadata(String.class); // Simple read operation
long responseTime = System.currentTimeMillis() - startTime;
System.out.println("Response time: " + responseTime + "ms");
if (responseTime > 1000) {
System.out.println("WARNING: Store is responding slowly");
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-kvstore-2-13