A local key/value store abstraction library for Apache Spark applications that provides automatic serialization, indexing, and key management features.
—
JSON-based object serialization with compression and custom ObjectMapper support. The serialization system handles converting Java objects to byte arrays for storage and retrieval.
Core serialization class that handles converting objects to and from byte arrays using Jackson JSON serialization with GZIP compression.
/**
* Serializer used to translate between app-defined types and the LevelDB store.
*
* The serializer is based on Jackson, so values are written as JSON. It also allows "naked strings"
* and integers to be written as values directly, which will be written as UTF-8 strings.
*/
public class KVStoreSerializer {
/**
* Object mapper used to process app-specific types. If an application requires a specific
* configuration of the mapper, it can subclass this serializer and add custom configuration
* to this object.
*/
protected final ObjectMapper mapper;
/**
* Creates a new serializer with default Jackson ObjectMapper configuration.
*/
public KVStoreSerializer();
}Default Behavior:
Convert Java objects to byte arrays for storage, with automatic compression for non-string values.
/**
* Serialize an object to bytes for storage.
* @param o - The object to serialize. Strings are stored as UTF-8, other objects as compressed JSON.
* @return Byte array representation of the object
* @throws Exception If serialization fails
*/
public final byte[] serialize(Object o) throws Exception;Usage Examples:
KVStoreSerializer serializer = new KVStoreSerializer();
// Serialize a simple string (stored as UTF-8, no compression)
String text = "Hello, World!";
byte[] stringBytes = serializer.serialize(text);
// Serialize a complex object (stored as compressed JSON)
Person person = new Person("p1", "Alice", 30);
byte[] personBytes = serializer.serialize(person);
// Serialize numbers and primitives
Integer number = 42;
byte[] numberBytes = serializer.serialize(number);
// Serialize collections
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
byte[] listBytes = serializer.serialize(names);
// Serialize maps
Map<String, Integer> scores = new HashMap<>();
scores.put("Alice", 95);
scores.put("Bob", 87);
byte[] mapBytes = serializer.serialize(scores);Convert byte arrays back to Java objects, with automatic decompression and type handling.
/**
* Deserialize bytes back to an object of the specified type.
* @param data - The byte array to deserialize
* @param klass - The target class type
* @return The deserialized object
* @throws Exception If deserialization fails
*/
public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception;Usage Examples:
KVStoreSerializer serializer = new KVStoreSerializer();
// Deserialize string
byte[] stringBytes = /* ... */;
String text = serializer.deserialize(stringBytes, String.class);
// Deserialize complex object
byte[] personBytes = /* ... */;
Person person = serializer.deserialize(personBytes, Person.class);
// Deserialize numbers
byte[] numberBytes = /* ... */;
Integer number = serializer.deserialize(numberBytes, Integer.class);
// Deserialize collections (requires generic type handling)
byte[] listBytes = /* ... */;
// Note: For generic collections, you may need TypeReference with Jackson
List<String> names = serializer.deserialize(listBytes, List.class);
// Deserialize maps
byte[] mapBytes = /* ... */;
Map<String, Integer> scores = serializer.deserialize(mapBytes, Map.class);Special handling for long values used internally by the store for counters and metadata.
/**
* Serialize a long value to bytes.
* @param value - The long value to serialize
* @return Byte array representation of the long
*/
final byte[] serialize(long value);
/**
* Deserialize bytes back to a long value.
* @param data - The byte array to deserialize
* @return The long value
*/
final long deserializeLong(byte[] data);Usage Examples:
KVStoreSerializer serializer = new KVStoreSerializer();
// Serialize long values (used internally for counters)
long counter = 12345L;
byte[] counterBytes = serializer.serialize(counter);
// Deserialize long values
long retrievedCounter = serializer.deserializeLong(counterBytes);
System.out.println("Counter: " + retrievedCounter);Create custom serializers by subclassing KVStoreSerializer to configure Jackson ObjectMapper behavior.
Usage Examples:
// Custom serializer with specific Jackson configuration
public class CustomKVStoreSerializer extends KVStoreSerializer {
public CustomKVStoreSerializer() {
super();
// Configure the mapper for custom behavior
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
// Add custom modules
mapper.registerModule(new JavaTimeModule());
// Configure property naming strategy
mapper.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
}
}
// Use custom serializer with LevelDB
KVStoreSerializer customSerializer = new CustomKVStoreSerializer();
KVStore store = new LevelDB(new File("./custom-store"), customSerializer);
// Objects will be serialized using custom configuration
Person person = new Person("p1", "Alice", 30);
person.birthDate = LocalDateTime.now();
store.write(person); // Uses custom date formatting and snake_case propertiesThe serializer supports all Jackson annotations for controlling serialization behavior.
Usage Examples:
import com.fasterxml.jackson.annotation.*;
public class AnnotatedPerson {
@KVIndex
@JsonProperty("person_id") // Custom JSON property name
public String id;
@KVIndex("name")
@JsonProperty("full_name")
public String name;
@KVIndex("age")
public int age;
@JsonIgnore // Exclude from serialization
public String internalNotes;
@JsonFormat(pattern = "yyyy-MM-dd") // Custom date format
public Date birthDate;
@JsonInclude(JsonInclude.Include.NON_NULL) // Only include if not null
public String nickname;
// Jackson will use this constructor for deserialization
@JsonCreator
public AnnotatedPerson(
@JsonProperty("person_id") String id,
@JsonProperty("full_name") String name,
@JsonProperty("age") int age) {
this.id = id;
this.name = name;
this.age = age;
}
// Getters and setters...
}
// The serializer will respect all Jackson annotations
KVStore store = new InMemoryStore();
AnnotatedPerson person = new AnnotatedPerson("p1", "Alice Johnson", 30);
person.nickname = "Al";
person.internalNotes = "VIP customer"; // Will be ignored during serialization
store.write(person); // Serialized using Jackson annotations
AnnotatedPerson retrieved = store.read(AnnotatedPerson.class, "p1");
// retrieved.internalNotes will be null (was ignored)Handle serialization and deserialization errors appropriately.
Usage Examples:
KVStoreSerializer serializer = new KVStoreSerializer();
try {
// Attempt to serialize an object
ComplexObject obj = new ComplexObject();
byte[] data = serializer.serialize(obj);
// Attempt to deserialize
ComplexObject retrieved = serializer.deserialize(data, ComplexObject.class);
} catch (JsonProcessingException e) {
// Jackson serialization/deserialization error
System.err.println("JSON processing failed: " + e.getMessage());
} catch (IOException e) {
// GZIP compression/decompression error
System.err.println("Compression failed: " + e.getMessage());
} catch (Exception e) {
// Other serialization errors
System.err.println("Serialization failed: " + e.getMessage());
}String vs Object Serialization:
KVStoreSerializer serializer = new KVStoreSerializer();
// Strings are stored as UTF-8 (fast, no compression)
String simpleText = "Hello World";
byte[] stringData = serializer.serialize(simpleText); // Fast, small overhead
// Objects are JSON + GZIP compressed (slower, better space efficiency)
Person person = new Person("p1", "Alice", 30);
byte[] objectData = serializer.serialize(person); // Slower, compressedLarge Object Handling:
// For large objects, serialization cost increases
LargeDataObject largeObj = new LargeDataObject();
largeObj.data = new byte[1024 * 1024]; // 1MB of data
long start = System.currentTimeMillis();
byte[] serialized = serializer.serialize(largeObj);
long serializationTime = System.currentTimeMillis() - start;
System.out.println("Serialization took: " + serializationTime + "ms");
System.out.println("Original size: " + largeObj.data.length + " bytes");
System.out.println("Compressed size: " + serialized.length + " bytes");
System.out.println("Compression ratio: " + (1.0 - (double)serialized.length / largeObj.data.length));The KVStoreSerializer is thread-safe and can be used concurrently.
Usage Examples:
// Single serializer instance can be used by multiple threads
KVStoreSerializer sharedSerializer = new KVStoreSerializer();
// Thread-safe concurrent usage
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
final int index = i;
executor.submit(() -> {
try {
Person person = new Person("p" + index, "Name" + index, 20 + index);
byte[] data = sharedSerializer.serialize(person); // Thread-safe
Person retrieved = sharedSerializer.deserialize(data, Person.class); // Thread-safe
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-kvstore