tessl install tessl/maven-org-apache-spark--spark-kvstore_2-13@3.5.0Local key/value store abstraction for Apache Spark with thread-safe operations, automatic serialization, and indexing capabilities
The KVStore indexing system uses annotations to automatically create indices on object fields and methods, enabling efficient sorting, filtering, and hierarchical queries without loading all data into memory.
import org.apache.spark.util.kvstore.KVIndex;
import org.apache.spark.util.kvstore.KVStoreView;
import org.apache.spark.util.kvstore.KVStoreIterator;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.Iterator;
import java.util.List;The @KVIndex annotation marks fields or methods for automatic indexing when objects are stored.
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD})
public @interface KVIndex {
String value() default "__main__";
String parent() default "";
boolean copy() default false;
}Usage Example:
public class Employee {
@KVIndex("__main__") // Natural index - required for all types
private String employeeId;
@KVIndex("name") // Secondary index for sorting by name
private String fullName;
@KVIndex("department") // Index for filtering by department
private String department;
@KVIndex(value = "location", parent = "department") // Hierarchical index
private String officeLocation;
@KVIndex(value = "salary", copy = true) // Copy index for faster reads
private int salary;
private String email; // Not indexed
// Getter methods can also be indexed
@KVIndex("displayName")
public String getDisplayName() {
return fullName + " (" + employeeId + ")";
}
}Annotation Parameters:
value: Index name (default "main" for natural index)parent: Parent index name for hierarchical relationshipscopy: Whether to copy data to index for faster reads (default false)Index Requirements:
"__main__")KVStoreView provides a fluent interface for configuring data iteration with filtering, sorting, and pagination.
public abstract class KVStoreView<T> implements Iterable<T> {
public KVStoreView<T> index(String name);
public KVStoreView<T> parent(Object value);
public KVStoreView<T> first(Object value);
public KVStoreView<T> last(Object value);
public KVStoreView<T> max(long max);
public KVStoreView<T> skip(long n);
public KVStoreView<T> reverse();
public KVStoreIterator<T> closeableIterator() throws Exception;
public Iterator<T> iterator();
}Basic Querying:
// Get all employees
for (Employee emp : store.view(Employee.class)) {
System.out.println(emp.getName());
}
// Get employees sorted by name
for (Employee emp : store.view(Employee.class).index("name")) {
System.out.println(emp.getName()); // Alphabetical order
}
// Get employees in reverse salary order
for (Employee emp : store.view(Employee.class).index("salary").reverse()) {
System.out.println(emp.getName() + ": $" + emp.getSalary());
}Range Queries:
// Employees with names from "Johnson" to "Smith"
for (Employee emp : store.view(Employee.class)
.index("name")
.first("Johnson")
.last("Smith")) {
System.out.println(emp.getName());
}
// Employees with salaries between 50000 and 100000
for (Employee emp : store.view(Employee.class)
.index("salary")
.first(50000)
.last(100000)) {
System.out.println(emp.getName() + ": $" + emp.getSalary());
}Pagination:
// Get employees 21-40 sorted by name (for pagination)
KVStoreView<Employee> page2 = store.view(Employee.class)
.index("name")
.skip(20)
.max(20);
for (Employee emp : page2) {
System.out.println(emp.getName());
}Parent-child index relationships enable efficient filtering of related data.
Setup:
public class Task {
@KVIndex("__main__")
private String taskId;
@KVIndex("project") // Parent index
private String projectId;
@KVIndex(value = "assignee", parent = "project") // Child index
private String assignedTo;
@KVIndex(value = "status", parent = "project") // Another child index
private String status;
}Hierarchical Querying:
// Get all tasks in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("project")
.first("PROJ-123")
.last("PROJ-123")) {
System.out.println(task.getTaskId());
}
// Get tasks assigned to "alice" in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("assignee")
.parent("PROJ-123") // Filter by parent project
.first("alice")
.last("alice")) {
System.out.println(task.getTaskId());
}
// Get completed tasks in project "PROJ-123"
for (Task task : store.view(Task.class)
.index("status")
.parent("PROJ-123")
.first("completed")
.last("completed")) {
System.out.println(task.getTaskId());
}KVStoreIterator provides additional capabilities beyond standard Java iteration.
public interface KVStoreIterator<T> extends Iterator<T>, Closeable {
boolean hasNext();
T next();
List<T> next(int max);
boolean skip(long n);
void close();
}Batch Processing:
try (KVStoreIterator<Employee> iter = store.view(Employee.class)
.index("department")
.first("Engineering")
.last("Engineering")
.closeableIterator()) {
// Process in batches of 100
while (iter.hasNext()) {
List<Employee> batch = iter.next(100);
processBatch(batch);
}
}Dynamic Skipping:
try (KVStoreIterator<Employee> iter = store.view(Employee.class)
.index("salary")
.reverse()
.closeableIterator()) {
// Get top 10 earners
List<Employee> top10 = iter.next(10);
// Skip next 40 employees
iter.skip(40);
// Get employees ranked 51-60
List<Employee> ranked51to60 = iter.next(10);
}Reference Indices (default):
Copy Indices:
public class Product {
@KVIndex("__main__")
private String productId;
@KVIndex("category") // Reference index - saves space
private String category;
@KVIndex(value = "price", copy = true) // Copy index - faster reads
private double price;
private String description; // Large field not duplicated in price index
}Write Performance:
Storage Overhead:
KVStore supports arrays as index keys through automatic wrapper conversion:
public class Document {
@KVIndex("__main__")
private String documentId;
@KVIndex("tags") // Array index
private String[] tags;
@KVIndex("coordinates") // Numeric array index
private int[] coordinates;
}
// Query by array values
Document doc = new Document("doc1", new String[]{"java", "database"}, new int[]{10, 20});
store.write(doc);
// Arrays are automatically made comparable for indexing
for (Document d : store.view(Document.class).index("tags")) {
// Results sorted by array content
}Common Index Errors:
// Missing natural index
public class BadClass {
// ERROR: No @KVIndex("__main__") field
private String id;
}
// Invalid index name
public class AnotherBadClass {
@KVIndex("_invalid") // ERROR: Cannot start with underscore
private String field;
}
// Circular parent relationship
public class CircularClass {
@KVIndex(value = "parent", parent = "child") // ERROR: Creates cycle
private String parentField;
@KVIndex(value = "child", parent = "parent")
private String childField;
}Exception Handling:
try {
for (Employee emp : store.view(Employee.class).index("nonexistent")) {
// Process employees
}
} catch (Exception e) {
// Handle invalid index name or other query errors
System.err.println("Query failed: " + e.getMessage());
}