The KVStore interface provides the fundamental operations for persisting, retrieving, and managing data in the key-value store. All operations are thread-safe and support automatic serialization and indexing.
Main abstraction for local key/value store operations with automatic serialization and indexing support.
/**
* Abstraction for a local key/value store for storing app data.
* Thread-safe for both reads and writes.
*/
public interface KVStore extends Closeable {
/**
* Returns app-specific metadata from the store, or null if not set.
* @param klass The class type of the metadata to retrieve
* @return The metadata object or null if not found
* @throws Exception if retrieval fails
*/
<T> T getMetadata(Class<T> klass) throws Exception;
/**
* Writes the given value in the store metadata key.
* @param value The metadata object to store
* @throws Exception if write fails
*/
void setMetadata(Object value) throws Exception;
/**
* Read a specific instance of an object by its natural key.
* @param klass The class type to read
* @param naturalKey The object's natural key (must not be null)
* @return The retrieved object
* @throws java.util.NoSuchElementException if key does not exist
* @throws Exception if read fails
*/
<T> T read(Class<T> klass, Object naturalKey) throws Exception;
/**
* Writes the given object to the store, including indexed fields.
* @param value The object to write (must not be null)
* @throws Exception if write fails
*/
void write(Object value) throws Exception;
/**
* Removes an object and all related data from the store.
* @param type The object's type
* @param naturalKey The object's natural key (must not be null)
* @throws java.util.NoSuchElementException if key does not exist
* @throws Exception if deletion fails
*/
void delete(Class<?> type, Object naturalKey) throws Exception;
/**
* Returns a configurable view for iterating over entities.
* @param type The class type to iterate over
* @return A configurable view instance
* @throws Exception if view creation fails
*/
<T> KVStoreView<T> view(Class<T> type) throws Exception;
/**
* Returns the number of items of the given type in the store.
* @param type The class type to count
* @return The count of items
* @throws Exception if count fails
*/
long count(Class<?> type) throws Exception;
/**
* Returns the number of items matching the given indexed value.
* @param type The class type to count
* @param index The index name to query
* @param indexedValue The value to match in the index
* @return The count of matching items
* @throws Exception if count fails
*/
long count(Class<?> type, String index, Object indexedValue) throws Exception;
/**
* Efficiently remove multiple items by index values.
* @param klass The class type of items to remove
* @param index The index name to use for matching
* @param indexValues Collection of index values to remove
* @return true if any items were removed
* @throws Exception if removal fails
*/
<T> boolean removeAllByIndexValues(Class<T> klass, String index, Collection<?> indexValues) throws Exception;
/**
* Close the store and release resources.
* @throws Exception if close fails
*/
void close() throws Exception;
}Usage Examples:
import org.apache.spark.util.kvstore.*;
import java.io.File;
// Create store instance
KVStore store = new LevelDB(new File("/path/to/store"));
// Basic CRUD operations
public class Task {
@KVIndex
public String id;
@KVIndex("status")
public String status;
public Task(String id, String status) {
this.id = id;
this.status = status;
}
}
// Write data
Task task = new Task("task-1", "running");
store.write(task);
// Read data
Task retrieved = store.read(Task.class, "task-1");
// Count items
long totalTasks = store.count(Task.class);
long runningTasks = store.count(Task.class, "status", "running");
// Batch operations
Collection<String> completedIds = Arrays.asList("task-2", "task-3");
store.removeAllByIndexValues(Task.class, "id", completedIds);
// Metadata operations
AppConfig config = new AppConfig("v1.0", true);
store.setMetadata(config);
AppConfig retrievedConfig = store.getMetadata(AppConfig.class);
// Always close when done
store.close();The KVStore automatically handles serialization using Jackson with GZIP compression for non-string data.
/**
* Serializer for translating between app-defined types and disk storage.
* Based on Jackson ObjectMapper with automatic GZIP compression for non-strings.
*/
public class KVStoreSerializer {
protected final ObjectMapper mapper;
public KVStoreSerializer();
/**
* Serialize object to bytes. Uses UTF-8 encoding for strings,
* GZIP compression with Jackson JSON for other objects.
* @param o The object to serialize
* @return Serialized byte array
* @throws Exception if serialization fails
*/
public byte[] serialize(Object o) throws Exception;
/**
* Deserialize bytes to object. Handles UTF-8 strings and
* GZIP-compressed JSON for other objects.
* @param data The byte array to deserialize
* @param klass The target class type
* @return The deserialized object
* @throws Exception if deserialization fails
*/
public <T> T deserialize(byte[] data, Class<T> klass) throws Exception;
/**
* Serialize long value to UTF-8 string bytes.
* @param value The long value to serialize
* @return Serialized byte array
*/
final byte[] serialize(long value);
/**
* Deserialize UTF-8 string bytes to long value.
* @param data The byte array to deserialize
* @return The long value
*/
final long deserializeLong(byte[] data);
}/**
* Exception thrown when store version is incompatible.
*/
public class UnsupportedStoreVersionException extends IOException {
// Standard IOException constructors
}Error Handling Examples:
try {
Task task = store.read(Task.class, "nonexistent");
} catch (NoSuchElementException e) {
System.out.println("Task not found: " + e.getMessage());
}
try {
KVStore store = new LevelDB(new File("/invalid/path"));
} catch (UnsupportedStoreVersionException e) {
System.out.println("Store version incompatible: " + e.getMessage());
}