tessl install tessl/maven-org-apache-spark--spark-kvstore_2-13@3.5.0Local key/value store abstraction for Apache Spark with thread-safe operations, automatic serialization, and indexing capabilities
The KVStore serialization system uses Jackson-based JSON serialization with GZIP compression to efficiently convert Java objects to and from byte arrays for persistent storage. It supports all Jackson annotations and provides hooks for customization.
import org.apache.spark.util.kvstore.KVStoreSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;The core serialization class that handles object conversion with automatic compression and type-safe deserialization.
public class KVStoreSerializer {
protected final ObjectMapper mapper;
public KVStoreSerializer();
public byte[] serialize(Object o) throws Exception;
public <T> T deserialize(byte[] data, Class<T> klass) throws Exception;
}Basic Usage:
// Create serializer (usually done internally by persistent stores)
KVStoreSerializer serializer = new KVStoreSerializer();
// Serialize object to compressed bytes
User user = new User("user123", "Alice", "Engineering");
byte[] serializedData = serializer.serialize(user);
// Deserialize bytes back to object
User deserializedUser = serializer.deserialize(serializedData, User.class);All non-string objects are automatically compressed using GZIP to reduce storage space.
Compression Behavior:
public class DataExample {
public static void main(String[] args) throws Exception {
KVStoreSerializer serializer = new KVStoreSerializer();
// Strings are stored as UTF-8 bytes (no compression)
String text = "Hello, World!";
byte[] stringBytes = serializer.serialize(text);
// Result: UTF-8 encoded bytes
// Complex objects are JSON + GZIP compressed
ComplexObject obj = new ComplexObject(/* ... */);
byte[] compressedBytes = serializer.serialize(obj);
// Result: GZIP(JSON(object))
// Deserialization automatically handles both cases
String restoredText = serializer.deserialize(stringBytes, String.class);
ComplexObject restoredObj = serializer.deserialize(compressedBytes, ComplexObject.class);
}
}The serializer uses Jackson ObjectMapper with full support for Jackson annotations and features.
Supported Jackson Annotations:
public class AnnotatedUser {
@JsonProperty("user_id")
@KVIndex("__main__")
private String userId;
@JsonProperty("full_name")
@KVIndex("name")
private String name;
@JsonIgnore
private String internalField; // Won't be serialized
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createdAt;
@JsonInclude(JsonInclude.Include.NON_NULL)
private String optionalField;
// Jackson will use this constructor during deserialization
@JsonCreator
public AnnotatedUser(@JsonProperty("user_id") String userId,
@JsonProperty("full_name") String name) {
this.userId = userId;
this.name = name;
this.createdAt = new Date();
}
}Usage with Annotations:
AnnotatedUser user = new AnnotatedUser("user123", "Alice Johnson");
user.setOptionalField("some value");
// Store with Jackson annotations applied
store.write(user);
// Retrieve with proper deserialization
AnnotatedUser retrieved = store.read(AnnotatedUser.class, "user123");
// Jackson annotations ensure proper field mappingExtend KVStoreSerializer to customize Jackson ObjectMapper configuration for specific requirements.
public class CustomSerializer extends KVStoreSerializer {
public CustomSerializer() {
super();
// Access to protected ObjectMapper for customization
configureMapper();
}
private void configureMapper() {
// Custom Jackson configuration
}
}Custom Serializer Example:
public class CustomKVStoreSerializer extends KVStoreSerializer {
public CustomKVStoreSerializer() {
super();
// Configure date format
mapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
// Handle unknown properties gracefully
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Write numbers as strings for precision
mapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
// Pretty print JSON (for debugging)
mapper.enable(SerializationFeature.INDENT_OUTPUT);
// Custom module for special types
SimpleModule module = new SimpleModule();
module.addSerializer(BigDecimal.class, new BigDecimalSerializer());
module.addDeserializer(BigDecimal.class, new BigDecimalDeserializer());
mapper.registerModule(module);
}
}
// Use custom serializer with persistent stores
KVStore store = new RocksDB(new File("/path/to/db"), new CustomKVStoreSerializer());The serialization system maintains type safety through compile-time generics and runtime type checking.
Generic Type Handling:
public class Container<T> {
@KVIndex("__main__")
private String id;
private T content;
private List<T> items;
// Jackson needs type information for generic deserialization
@JsonIgnore
private Class<T> contentType;
@JsonCreator
public Container(@JsonProperty("id") String id) {
this.id = id;
}
// Helper methods for type-safe operations
public void setContent(T content, Class<T> type) {
this.content = content;
this.contentType = type;
}
}
// Usage with type safety
Container<User> userContainer = new Container<>("container1");
userContainer.setContent(new User("user1", "Alice", "Engineering"), User.class);
store.write(userContainer);
Container<User> retrieved = store.read(Container.class, "container1");Polymorphic Types:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = Manager.class, name = "manager"),
@JsonSubTypes.Type(value = Developer.class, name = "developer")
})
public abstract class Employee {
@KVIndex("__main__")
protected String employeeId;
@KVIndex("name")
protected String name;
}
public class Manager extends Employee {
private List<String> teamMembers;
}
public class Developer extends Employee {
private List<String> skills;
}
// Jackson automatically handles polymorphic serialization
Employee emp = new Developer("dev1", "Alice", Arrays.asList("Java", "Python"));
store.write(emp);
// Correct subtype restored during deserialization
Employee retrieved = store.read(Employee.class, "dev1");
// retrieved is actually a Developer instanceCustom Serializers for Complex Types:
public class LocationSerializer extends JsonSerializer<Location> {
@Override
public void serialize(Location location, JsonGenerator gen, SerializerProvider serializers)
throws IOException {
gen.writeStartObject();
gen.writeNumberField("lat", location.getLatitude());
gen.writeNumberField("lng", location.getLongitude());
gen.writeStringField("addr", location.getAddress());
gen.writeEndObject();
}
}
public class LocationDeserializer extends JsonDeserializer<Location> {
@Override
public Location deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException {
JsonNode node = p.getCodec().readTree(p);
double lat = node.get("lat").asDouble();
double lng = node.get("lng").asDouble();
String addr = node.get("addr").asText();
return new Location(lat, lng, addr);
}
}Storage Savings:
Performance Trade-offs:
Optimization Tips:
@JsonIgnoreSerialization Failures:
try {
store.write(problematicObject);
} catch (JsonProcessingException e) {
// Jackson serialization failed
System.err.println("Cannot serialize object: " + e.getMessage());
} catch (IOException e) {
// GZIP compression failed
System.err.println("Compression error: " + e.getMessage());
}Deserialization Failures:
try {
User user = store.read(User.class, "user123");
} catch (JsonMappingException e) {
// JSON structure doesn't match class
System.err.println("Cannot deserialize: " + e.getMessage());
} catch (JsonParseException e) {
// Invalid JSON data
System.err.println("Corrupted data: " + e.getMessage());
} catch (IOException e) {
// GZIP decompression failed
System.err.println("Decompression error: " + e.getMessage());
}Handling Class Changes:
public class User {
@KVIndex("__main__")
private String userId;
@KVIndex("name")
private String name;
// New field added in version 2
@JsonProperty(defaultValue = "unknown")
private String department = "unknown";
// Old field removed - marked for compatibility
@JsonIgnore
@Deprecated
private String oldField;
}Best Practices for Evolution:
// Strings are handled specially for efficiency
if (o instanceof String) {
return ((String) o).getBytes(UTF_8); // Direct UTF-8 encoding
} else {
// All other objects: JSON + GZIP
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
mapper.writeValue(out, o);
}
return bytes.toByteArray();
}// Special handling for long values used internally
final byte[] serialize(long value) {
return String.valueOf(value).getBytes(UTF_8);
}
final long deserializeLong(byte[] data) {
return Long.parseLong(new String(data, UTF_8));
}This optimization ensures efficient storage of numeric indices and internal metadata without JSON overhead.