or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-kvstore_2.13@3.5.x

docs

core-operations.mdindex.mdindexing-querying.mdserialization.mdstorage-implementations.md
tile.json

tessl/maven-org-apache-spark--spark-kvstore_2-13

tessl install tessl/maven-org-apache-spark--spark-kvstore_2-13@3.5.0

Local key/value store abstraction for Apache Spark with thread-safe operations, automatic serialization, and indexing capabilities

serialization.mddocs/

Serialization

The KVStore serialization system uses Jackson-based JSON serialization with GZIP compression to efficiently convert Java objects to and from byte arrays for persistent storage. It supports all Jackson annotations and provides hooks for customization.

Imports

import org.apache.spark.util.kvstore.KVStoreSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

Capabilities

KVStoreSerializer Class

The core serialization class that handles object conversion with automatic compression and type-safe deserialization.

public class KVStoreSerializer {
    protected final ObjectMapper mapper;
    
    public KVStoreSerializer();
    public byte[] serialize(Object o) throws Exception;
    public <T> T deserialize(byte[] data, Class<T> klass) throws Exception;
}

Basic Usage:

// Create serializer (usually done internally by persistent stores)
KVStoreSerializer serializer = new KVStoreSerializer();

// Serialize object to compressed bytes
User user = new User("user123", "Alice", "Engineering");
byte[] serializedData = serializer.serialize(user);

// Deserialize bytes back to object
User deserializedUser = serializer.deserialize(serializedData, User.class);

Automatic Data Compression

All non-string objects are automatically compressed using GZIP to reduce storage space.

Compression Behavior:

public class DataExample {
    public static void main(String[] args) throws Exception {
        KVStoreSerializer serializer = new KVStoreSerializer();
        
        // Strings are stored as UTF-8 bytes (no compression)
        String text = "Hello, World!";
        byte[] stringBytes = serializer.serialize(text);
        // Result: UTF-8 encoded bytes
        
        // Complex objects are JSON + GZIP compressed
        ComplexObject obj = new ComplexObject(/* ... */);
        byte[] compressedBytes = serializer.serialize(obj);
        // Result: GZIP(JSON(object))
        
        // Deserialization automatically handles both cases
        String restoredText = serializer.deserialize(stringBytes, String.class);
        ComplexObject restoredObj = serializer.deserialize(compressedBytes, ComplexObject.class);
    }
}

Jackson Integration

The serializer uses Jackson ObjectMapper with full support for Jackson annotations and features.

Supported Jackson Annotations:

public class AnnotatedUser {
    @JsonProperty("user_id")
    @KVIndex("__main__")
    private String userId;
    
    @JsonProperty("full_name")
    @KVIndex("name")
    private String name;
    
    @JsonIgnore
    private String internalField; // Won't be serialized
    
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createdAt;
    
    @JsonInclude(JsonInclude.Include.NON_NULL)
    private String optionalField;
    
    // Jackson will use this constructor during deserialization
    @JsonCreator
    public AnnotatedUser(@JsonProperty("user_id") String userId,
                        @JsonProperty("full_name") String name) {
        this.userId = userId;
        this.name = name;
        this.createdAt = new Date();
    }
}

Usage with Annotations:

AnnotatedUser user = new AnnotatedUser("user123", "Alice Johnson");
user.setOptionalField("some value");

// Store with Jackson annotations applied
store.write(user);

// Retrieve with proper deserialization
AnnotatedUser retrieved = store.read(AnnotatedUser.class, "user123");
// Jackson annotations ensure proper field mapping

Custom Serialization

Extend KVStoreSerializer to customize Jackson ObjectMapper configuration for specific requirements.

public class CustomSerializer extends KVStoreSerializer {
    public CustomSerializer() {
        super();
        // Access to protected ObjectMapper for customization
        configureMapper();
    }
    
    private void configureMapper() {
        // Custom Jackson configuration
    }
}

Custom Serializer Example:

public class CustomKVStoreSerializer extends KVStoreSerializer {
    public CustomKVStoreSerializer() {
        super();
        
        // Configure date format
        mapper.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        
        // Handle unknown properties gracefully
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        
        // Write numbers as strings for precision
        mapper.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, true);
        
        // Pretty print JSON (for debugging)
        mapper.enable(SerializationFeature.INDENT_OUTPUT);
        
        // Custom module for special types
        SimpleModule module = new SimpleModule();
        module.addSerializer(BigDecimal.class, new BigDecimalSerializer());
        module.addDeserializer(BigDecimal.class, new BigDecimalDeserializer());
        mapper.registerModule(module);
    }
}

// Use custom serializer with persistent stores
KVStore store = new RocksDB(new File("/path/to/db"), new CustomKVStoreSerializer());

Type Safety and Generics

The serialization system maintains type safety through compile-time generics and runtime type checking.

Generic Type Handling:

public class Container<T> {
    @KVIndex("__main__")
    private String id;
    
    private T content;
    private List<T> items;
    
    // Jackson needs type information for generic deserialization
    @JsonIgnore
    private Class<T> contentType;
    
    @JsonCreator
    public Container(@JsonProperty("id") String id) {
        this.id = id;
    }
    
    // Helper methods for type-safe operations
    public void setContent(T content, Class<T> type) {
        this.content = content;
        this.contentType = type;
    }
}

// Usage with type safety
Container<User> userContainer = new Container<>("container1");
userContainer.setContent(new User("user1", "Alice", "Engineering"), User.class);

store.write(userContainer);
Container<User> retrieved = store.read(Container.class, "container1");

Advanced Serialization Patterns

Polymorphic Types:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
    @JsonSubTypes.Type(value = Manager.class, name = "manager"),
    @JsonSubTypes.Type(value = Developer.class, name = "developer")
})
public abstract class Employee {
    @KVIndex("__main__")
    protected String employeeId;
    
    @KVIndex("name")
    protected String name;
}

public class Manager extends Employee {
    private List<String> teamMembers;
}

public class Developer extends Employee {
    private List<String> skills;
}

// Jackson automatically handles polymorphic serialization
Employee emp = new Developer("dev1", "Alice", Arrays.asList("Java", "Python"));
store.write(emp);

// Correct subtype restored during deserialization
Employee retrieved = store.read(Employee.class, "dev1");
// retrieved is actually a Developer instance

Custom Serializers for Complex Types:

public class LocationSerializer extends JsonSerializer<Location> {
    @Override
    public void serialize(Location location, JsonGenerator gen, SerializerProvider serializers) 
            throws IOException {
        gen.writeStartObject();  
        gen.writeNumberField("lat", location.getLatitude());
        gen.writeNumberField("lng", location.getLongitude());
        gen.writeStringField("addr", location.getAddress());
        gen.writeEndObject();
    }
}

public class LocationDeserializer extends JsonDeserializer<Location> {
    @Override
    public Location deserialize(JsonParser p, DeserializationContext ctxt) 
            throws IOException {
        JsonNode node = p.getCodec().readTree(p);
        double lat = node.get("lat").asDouble();
        double lng = node.get("lng").asDouble();
        String addr = node.get("addr").asText();
        return new Location(lat, lng, addr);
    }
}

Performance Characteristics

Compression Benefits

Storage Savings:

  • Text-heavy objects: 60-80% size reduction
  • Numeric data: 30-50% size reduction
  • Mixed content: 40-70% size reduction
  • Already compressed data: Minimal additional compression

Performance Trade-offs:

  • Write operations: ~10-20% slower due to compression
  • Read operations: ~5-15% slower due to decompression
  • Memory usage: Lower due to compressed storage
  • Disk I/O: Significantly reduced due to smaller data size

Serialization Performance

Optimization Tips:

  1. Minimize Nested Objects: Deep object graphs increase serialization time
  2. Use Transient Fields: Mark non-essential fields as @JsonIgnore
  3. Pool ObjectMapper: Reuse serializer instances across operations
  4. Custom Serializers: Implement for frequently serialized complex types
  5. Batch Operations: Serialize multiple objects together when possible

Error Handling

Common Serialization Errors

Serialization Failures:

try {
    store.write(problematicObject);
} catch (JsonProcessingException e) {
    // Jackson serialization failed
    System.err.println("Cannot serialize object: " + e.getMessage());
} catch (IOException e) {
    // GZIP compression failed
    System.err.println("Compression error: " + e.getMessage());
}

Deserialization Failures:

try {
    User user = store.read(User.class, "user123");
} catch (JsonMappingException e) {
    // JSON structure doesn't match class
    System.err.println("Cannot deserialize: " + e.getMessage());
} catch (JsonParseException e) {
    // Invalid JSON data
    System.err.println("Corrupted data: " + e.getMessage());
} catch (IOException e) {
    // GZIP decompression failed
    System.err.println("Decompression error: " + e.getMessage());
}

Schema Evolution

Handling Class Changes:

public class User {
    @KVIndex("__main__")
    private String userId;
    
    @KVIndex("name")
    private String name;
    
    // New field added in version 2
    @JsonProperty(defaultValue = "unknown")
    private String department = "unknown";
    
    // Old field removed - marked for compatibility
    @JsonIgnore
    @Deprecated
    private String oldField;
}

Best Practices for Evolution:

  1. Add fields with defaults: New fields should have sensible default values
  2. Use @JsonIgnore: Remove fields gracefully without breaking deserialization
  3. Version your data: Include version fields for major schema changes
  4. Test backwards compatibility: Ensure new code can read old data
  5. Migrate data: Provide migration utilities for breaking changes

Internal Serialization Details

String Optimization

// Strings are handled specially for efficiency
if (o instanceof String) {
    return ((String) o).getBytes(UTF_8); // Direct UTF-8 encoding
} else {
    // All other objects: JSON + GZIP
    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
    try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
        mapper.writeValue(out, o);
    }
    return bytes.toByteArray();
}

Long Value Support

// Special handling for long values used internally
final byte[] serialize(long value) {
    return String.valueOf(value).getBytes(UTF_8);
}

final long deserializeLong(byte[] data) {
    return Long.parseLong(new String(data, UTF_8));
}

This optimization ensures efficient storage of numeric indices and internal metadata without JSON overhead.