A local key/value store abstraction library for Apache Spark applications that provides automatic serialization, indexing, and key management features.
—
In-memory and persistent storage backends with different performance and durability characteristics. Choose the appropriate implementation based on your application's performance, persistence, and memory requirements.
Fast in-memory storage implementation that keeps all data deserialized in memory for maximum performance.
/**
* Implementation of KVStore that keeps data deserialized in memory. This store does not index
* data; instead, whenever iterating over an indexed field, the stored data is copied and sorted
* according to the index. This saves memory but makes iteration more expensive.
*/
public class InMemoryStore implements KVStore {
/**
* Creates a new in-memory store.
*/
public InMemoryStore();
}Characteristics:
Usage Examples:
// Create an in-memory store
KVStore store = new InMemoryStore();
// Fast writes - data kept in memory
Person person = new Person("p1", "Alice", 30);
store.write(person); // Very fast
// Fast reads - no disk I/O
Person retrieved = store.read(Person.class, "p1"); // Very fast
// Iteration sorts data on-demand
for (Person p : store.view(Person.class).index("age")) {
System.out.println(p.name + " is " + p.age); // Sorts during iteration
}
// Data is lost when store is closed or application exits
store.close(); // All data lostBest Use Cases:
Persistent storage implementation using LevelDB for durability and efficient indexed access.
/**
* Implementation of KVStore that uses LevelDB as the underlying data store.
*/
public class LevelDB implements KVStore {
/**
* Creates a LevelDB store at the specified path with default serializer.
* @param path - Directory path where LevelDB files will be stored
* @throws Exception If store creation or initialization fails
*/
public LevelDB(File path) throws Exception;
/**
* Creates a LevelDB store at the specified path with custom serializer.
* @param path - Directory path where LevelDB files will be stored
* @param serializer - Custom serializer for object persistence
* @throws Exception If store creation or initialization fails
*/
public LevelDB(File path, KVStoreSerializer serializer) throws Exception;
}Characteristics:
Usage Examples:
// Create a persistent LevelDB store
KVStore store = new LevelDB(new File("./my-data-store"));
// Or with custom serializer
KVStoreSerializer customSerializer = new MyCustomSerializer();
KVStore storeWithCustomSerializer = new LevelDB(new File("./custom-store"), customSerializer);
// Writes are persisted to disk
Person person = new Person("p1", "Alice", 30);
store.write(person); // Data written to disk
// Data survives application restart
store.close();
// Reopen the same store later
KVStore reopenedStore = new LevelDB(new File("./my-data-store"));
Person retrieved = reopenedStore.read(Person.class, "p1"); // Data still there!
// Efficient indexed queries using disk-based indices
for (Person p : reopenedStore.view(Person.class).index("age").first(25).last(35)) {
System.out.println(p.name + " is " + p.age); // Uses persistent index
}
reopenedStore.close();Best Use Cases:
LevelDB stores include version checking and use internal keys for metadata and type alias management.
/**
* Exception thrown when the store implementation is not compatible with the underlying data.
*/
public class UnsupportedStoreVersionException extends IOException {
}
/**
* Internal constants used by LevelDB implementation for store management
*/
static final long STORE_VERSION = 1L;
static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);Usage Examples:
try {
// Attempt to open an existing LevelDB store
KVStore store = new LevelDB(new File("./legacy-store"));
// Store opened successfully
} catch (UnsupportedStoreVersionException e) {
// The store was created with an incompatible version
System.err.println("Store version incompatible - data migration required");
// Handle version incompatibility (backup, migrate, etc.)
}Write Performance:
// Benchmark: Writing 10,000 objects
// InMemoryStore - Fastest
KVStore memStore = new InMemoryStore();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
memStore.write(new Person("p" + i, "Name" + i, 20 + (i % 50)));
}
long memTime = System.currentTimeMillis() - startTime;
System.out.println("InMemoryStore: " + memTime + "ms");
// LevelDB - Slower but persistent
KVStore levelStore = new LevelDB(new File("./benchmark-store"));
startTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
levelStore.write(new Person("p" + i, "Name" + i, 20 + (i % 50)));
}
long levelTime = System.currentTimeMillis() - startTime;
System.out.println("LevelDB: " + levelTime + "ms");Query Performance:
// Indexed queries
// InMemoryStore - Sorts on each iteration
for (Person p : memStore.view(Person.class).index("age").first(30).last(40)) {
// Data sorted during iteration
}
// LevelDB - Uses pre-built disk indices
for (Person p : levelStore.view(Person.class).index("age").first(30).last(40)) {
// Uses persistent index for efficient access
}InMemoryStore Memory Usage:
KVStore memStore = new InMemoryStore();
// All objects kept in memory
for (int i = 0; i < 100000; i++) {
memStore.write(new LargeObject("obj" + i, generateLargeData()));
// Memory usage grows with each write
}
// Memory usage = number of objects × average object size
// Monitor with: Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()LevelDB Memory Usage:
KVStore levelStore = new LevelDB(new File("./large-store"));
// Objects stored on disk, minimal memory usage
for (int i = 0; i < 100000; i++) {
levelStore.write(new LargeObject("obj" + i, generateLargeData()));
// Memory usage remains relatively constant
}
// Memory usage = cache size + working set (much smaller than data size)Choose InMemoryStore when:
Choose LevelDB when:
Hybrid Approach:
// Use both stores for different purposes
KVStore cache = new InMemoryStore(); // For frequently accessed data
KVStore persistent = new LevelDB(new File("./data")); // For permanent storage
// Cache frequently accessed users in memory
User frequentUser = persistent.read(User.class, "frequent_user_123");
cache.write(frequentUser); // Cache for fast access
// Read from cache first, fall back to persistent store
User user;
try {
user = cache.read(User.class, userId); // Try cache first
} catch (NoSuchElementException e) {
user = persistent.read(User.class, userId); // Fall back to persistent
cache.write(user); // Cache for next time
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-kvstore