Local key/value store abstraction for Apache Spark with thread-safe operations, automatic serialization, and indexing capabilities
—
The KVStore indexing system uses annotations to automatically create indices on object fields and methods, enabling efficient sorting, filtering, and hierarchical queries without loading all data into memory.
import org.apache.spark.util.kvstore.KVIndex;
import org.apache.spark.util.kvstore.KVStoreView;
import org.apache.spark.util.kvstore.KVStoreIterator;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Iterator;
import java.util.List;The @KVIndex annotation marks fields or methods for automatic indexing when objects are stored.
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD})
public @interface KVIndex {
String value() default "__main__";
String parent() default "";
boolean copy() default false;
}Usage Example:
public class Employee {
@KVIndex("__main__") // Natural index - required for all types
private String employeeId;
@KVIndex("name") // Secondary index for sorting by name
private String fullName;
@KVIndex("department") // Index for filtering by department
private String department;
@KVIndex(value = "location", parent = "department") // Hierarchical index
private String officeLocation;
@KVIndex(value = "salary", copy = true) // Copy index for faster reads
private int salary;
private String email; // Not indexed
// Getter methods can also be indexed
@KVIndex("displayName")
public String getDisplayName() {
return fullName + " (" + employeeId + ")";
}
}Annotation Parameters:
value: Index name (default "main" for natural index)parent: Parent index name for hierarchical relationshipscopy: Whether to copy data to index for faster reads (default false)Index Requirements:
"__main__")KVStoreView provides a fluent interface for configuring data iteration with filtering, sorting, and pagination.
public abstract class KVStoreView<T> implements Iterable<T> {
public KVStoreView<T> index(String name);
public KVStoreView<T> parent(Object value);
public KVStoreView<T> first(Object value);
public KVStoreView<T> last(Object value);
public KVStoreView<T> max(long max);
public KVStoreView<T> skip(long n);
public KVStoreView<T> reverse();
public KVStoreIterator<T> closeableIterator() throws Exception;
public Iterator<T> iterator();
}Basic Querying:
// Get all employees
for (Employee emp : store.view(Employee.class)) {
System.out.println(emp.getName());
}
// Get employees sorted by name
for (Employee emp : store.view(Employee.class).index("name")) {
System.out.println(emp.getName()); // Alphabetical order
}
// Get employees in reverse salary order
for (Employee emp : store.view(Employee.class).index("salary").reverse()) {
System.out.println(emp.getName() + ": $" + emp.getSalary());
}Range Queries:
// Employees with names from "Johnson" to "Smith"
for (Employee emp : store.view(Employee.class)
.index("name")
.first("Johnson")
.last("Smith")) {
System.out.println(emp.getName());
}
// Employees with salaries between 50000 and 100000
for (Employee emp : store.view(Employee.class)
.index("salary")
.first(50000)
.last(100000)) {
System.out.println(emp.getName() + ": $" + emp.getSalary());
}Pagination:
// Get employees 21-40 sorted by name (for pagination)
KVStoreView<Employee> page2 = store.view(Employee.class)
.index("name")
.skip(20)
.max(20);
for (Employee emp : page2) {
System.out.println(emp.getName());
}Parent-child index relationships enable efficient filtering of related data.
Setup:
public class Task {
@KVIndex("__main__")
private String taskId;
@KVIndex("project") // Parent index
private String projectId;
@KVIndex(value = "assignee", parent = "project") // Child index
private String assignedTo;
@KVIndex(value = "status", parent = "project") // Another child index
private String status;
}Hierarchical Querying:
// Get all tasks in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("project")
.first("PROJ-123")
.last("PROJ-123")) {
System.out.println(task.getTaskId());
}
// Get tasks assigned to "alice" in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("assignee")
.parent("PROJ-123") // Filter by parent project
.first("alice")
.last("alice")) {
System.out.println(task.getTaskId());
}
// Get completed tasks in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("status")
.parent("PROJ-123")
.first("completed")
.last("completed")) {
System.out.println(task.getTaskId());
}KVStoreIterator provides additional capabilities beyond standard Java iteration.
public interface KVStoreIterator<T> extends Iterator<T>, Closeable {
boolean hasNext();
T next();
List<T> next(int max);
boolean skip(long n);
void close();
}Batch Processing:
try (KVStoreIterator<Employee> iter = store.view(Employee.class)
.index("department")
.first("Engineering")
.last("Engineering")
.closeableIterator()) {
// Process in batches of 100
while (iter.hasNext()) {
List<Employee> batch = iter.next(100);
processBatch(batch);
}
}Dynamic Skipping:
try (KVStoreIterator<Employee> iter = store.view(Employee.class)
.index("salary")
.reverse()
.closeableIterator()) {
// Get top 10 earners
List<Employee> top10 = iter.next(10);
// Skip next 40 employees
iter.skip(40);
// Get employees ranked 51-60
List<Employee> ranked51to60 = iter.next(10);
}Reference Indices (default):
Copy Indices:
public class Product {
@KVIndex("__main__")
private String productId;
@KVIndex("category") // Reference index - saves space
private String category;
@KVIndex(value = "price", copy = true) // Copy index - faster reads
private double price;
private String description; // Large field not duplicated in price index
}Write Performance:
Storage Overhead:
KVStore supports arrays as index keys through automatic wrapper conversion:
public class Document {
@KVIndex("__main__")
private String documentId;
@KVIndex("tags") // Array index
private String[] tags;
@KVIndex("coordinates") // Numeric array index
private int[] coordinates;
}
// Query by array values
Document doc = new Document("doc1", new String[]{"java", "database"}, new int[]{10, 20});
store.write(doc);
// Arrays are automatically made comparable for indexing
for (Document d : store.view(Document.class).index("tags")) {
// Results sorted by array content
}Common Index Errors:
// Missing natural index
public class BadClass {
// ERROR: No @KVIndex("__main__") field
private String id;
}
// Invalid index name
public class AnotherBadClass {
@KVIndex("_invalid") // ERROR: Cannot start with underscore
private String field;
}
// Circular parent relationship
public class CircularClass {
@KVIndex(value = "parent", parent = "child") // ERROR: Creates cycle
private String parentField;
@KVIndex(value = "child", parent = "parent")
private String childField;
}Exception Handling:
try {
for (Employee emp : store.view(Employee.class).index("nonexistent")) {
// Process employees
}
} catch (Exception e) {
// Handle invalid index name or other query errors
System.err.println("Query failed: " + e.getMessage());
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-kvstore_2-13