0
# Serialization Framework
1
2
Pluggable serialization system with default FST-based implementations supporting custom serializers for both key-value stores and key-map stores, enabling efficient state persistence and cross-language compatibility.
3
4
## Capabilities
5
6
### Core Serialization Utility
7
8
Fast serialization utility using FST (Fast Serialization) library for efficient object-to-bytes conversion.
9
10
```java { .api }
11
/**
12
* FST serialization wrapper for efficient object serialization
13
*/
14
public class Serializer {
15
/**
16
* Serialize object to byte array using FST
17
* @param value Object to serialize
18
* @return Serialized byte array
19
*/
20
public static byte[] object2Bytes(Object value);
21
22
/**
23
* Deserialize byte array to object using FST
24
* @param buffer Byte array to deserialize
25
* @return Deserialized object
26
*/
27
public static Object bytes2Object(byte[] buffer);
28
}
29
```
30
31
**Usage Examples:**
32
33
```java
34
import io.ray.streaming.state.serialization.Serializer;
35
36
// Serialize different types of objects
37
String text = "Hello, Ray Streaming!";
38
byte[] textBytes = Serializer.object2Bytes(text);
39
40
Integer number = 42;
41
byte[] numberBytes = Serializer.object2Bytes(number);
42
43
List<String> list = Arrays.asList("item1", "item2", "item3");
44
byte[] listBytes = Serializer.object2Bytes(list);
45
46
// Deserialize objects back
47
String deserializedText = (String) Serializer.bytes2Object(textBytes);
48
Integer deserializedNumber = (Integer) Serializer.bytes2Object(numberBytes);
49
List<String> deserializedList = (List<String>) Serializer.bytes2Object(listBytes);
50
51
// Custom objects
52
class UserProfile {
53
public String name;
54
public int age;
55
public List<String> preferences;
56
}
57
58
UserProfile profile = new UserProfile();
59
profile.name = "Alice";
60
profile.age = 30;
61
profile.preferences = Arrays.asList("tech", "music");
62
63
byte[] profileBytes = Serializer.object2Bytes(profile);
64
UserProfile deserializedProfile = (UserProfile) Serializer.bytes2Object(profileBytes);
65
```
66
67
### Key-Value Store Serialization Interface
68
69
Interface defining serialization methods for key-value stores with separate key and value serialization support.
70
71
```java { .api }
72
/**
73
* Interface for key-value store serialization/deserialization
74
*/
75
public interface KeyValueStoreSerialization<K, V> {
76
/**
77
* Serialize key to byte array
78
* @param key Key object to serialize
79
* @return Serialized key bytes
80
*/
81
byte[] serializeKey(K key);
82
83
/**
84
* Serialize value to byte array
85
* @param value Value object to serialize
86
* @return Serialized value bytes
87
*/
88
byte[] serializeValue(V value);
89
90
/**
91
* Deserialize byte array to value object
92
* @param valueArray Serialized value bytes
93
* @return Deserialized value object
94
*/
95
V deserializeValue(byte[] valueArray);
96
}
97
```
98
99
### Key-Map Store Serialization Interface
100
101
Interface defining serialization methods for key-map stores supporting hierarchical key structures with sub-keys and sub-values.
102
103
```java { .api }
104
/**
105
* Interface for key-map store serialization/deserialization
106
*/
107
public interface KeyMapStoreSerializer<K, S, T> {
108
/**
109
* Serialize primary key to byte array
110
* @param key Primary key object to serialize
111
* @return Serialized key bytes
112
*/
113
byte[] serializeKey(K key);
114
115
/**
116
* Serialize sub-key to byte array
117
* @param uk Sub-key object to serialize
118
* @return Serialized sub-key bytes
119
*/
120
byte[] serializeUKey(S uk);
121
122
/**
123
* Serialize sub-value to byte array
124
* @param uv Sub-value object to serialize
125
* @return Serialized sub-value bytes
126
*/
127
byte[] serializeUValue(T uv);
128
129
/**
130
* Deserialize byte array to sub-key object
131
* @param ukArray Serialized sub-key bytes
132
* @return Deserialized sub-key object
133
*/
134
S deserializeUKey(byte[] ukArray);
135
136
/**
137
* Deserialize byte array to sub-value object
138
* @param uvArray Serialized sub-value bytes
139
* @return Deserialized sub-value object
140
*/
141
T deserializeUValue(byte[] uvArray);
142
}
143
```
144
145
### Default Serialization Implementations
146
147
The framework provides default implementations for common serialization scenarios.
148
149
#### Abstract Serialization Base Class
150
151
```java { .api }
152
/**
153
* Base class for serialization implementations
154
*/
155
public abstract class AbstractSerialization {
156
// Common serialization functionality and utilities
157
}
158
```
159
160
#### Default Key-Value Store Serialization
161
162
```java { .api }
163
/**
164
* Default implementation using FST serialization with key prefixing
165
*/
166
public class DefaultKeyValueStoreSerialization<K, V> extends AbstractSerialization implements KeyValueStoreSerialization<K, V> {
167
/**
168
* Serialize key with namespace prefix
169
* @param key Key to serialize
170
* @return Serialized key bytes with prefix
171
*/
172
public byte[] serializeKey(K key);
173
174
/**
175
* Serialize value using FST
176
* @param value Value to serialize
177
* @return Serialized value bytes
178
*/
179
public byte[] serializeValue(V value);
180
181
/**
182
* Deserialize value using FST
183
* @param valueArray Serialized value bytes
184
* @return Deserialized value object
185
*/
186
public V deserializeValue(byte[] valueArray);
187
}
188
```
189
190
#### Default Key-Map Store Serialization
191
192
```java { .api }
193
/**
194
* Default implementation for key-map store serialization
195
*/
196
public class DefaultKeyMapStoreSerializer<K, S, T> extends AbstractSerialization implements KeyMapStoreSerializer<K, S, T> {
197
/**
198
* Serialize primary key
199
* @param key Primary key to serialize
200
* @return Serialized key bytes
201
*/
202
public byte[] serializeKey(K key);
203
204
/**
205
* Serialize sub-key
206
* @param uk Sub-key to serialize
207
* @return Serialized sub-key bytes
208
*/
209
public byte[] serializeUKey(S uk);
210
211
/**
212
* Serialize sub-value
213
* @param uv Sub-value to serialize
214
* @return Serialized sub-value bytes
215
*/
216
public byte[] serializeUValue(T uv);
217
218
/**
219
* Deserialize sub-key
220
* @param ukArray Serialized sub-key bytes
221
* @return Deserialized sub-key object
222
*/
223
public S deserializeUKey(byte[] ukArray);
224
225
/**
226
* Deserialize sub-value
227
* @param uvArray Serialized sub-value bytes
228
* @return Deserialized sub-value object
229
*/
230
public T deserializeUValue(byte[] uvArray);
231
}
232
```
233
234
**Custom Serialization Implementation Examples:**
235
236
```java
237
// Example 1: JSON-based serialization for key-value stores
238
import com.fasterxml.jackson.databind.ObjectMapper;
239
240
public class JsonKeyValueSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
241
private final ObjectMapper objectMapper = new ObjectMapper();
242
private final Class<K> keyClass;
243
private final Class<V> valueClass;
244
245
public JsonKeyValueSerialization(Class<K> keyClass, Class<V> valueClass) {
246
this.keyClass = keyClass;
247
this.valueClass = valueClass;
248
}
249
250
@Override
251
public byte[] serializeKey(K key) {
252
try {
253
return objectMapper.writeValueAsBytes(key);
254
} catch (Exception e) {
255
throw new RuntimeException("Key serialization failed", e);
256
}
257
}
258
259
@Override
260
public byte[] serializeValue(V value) {
261
try {
262
return objectMapper.writeValueAsBytes(value);
263
} catch (Exception e) {
264
throw new RuntimeException("Value serialization failed", e);
265
}
266
}
267
268
@Override
269
public V deserializeValue(byte[] valueArray) {
270
try {
271
return objectMapper.readValue(valueArray, valueClass);
272
} catch (Exception e) {
273
throw new RuntimeException("Value deserialization failed", e);
274
}
275
}
276
}
277
278
// Example 2: Protobuf-based serialization for key-map stores
279
public class ProtobufKeyMapSerializer implements KeyMapStoreSerializer<String, String, UserData> {
280
281
@Override
282
public byte[] serializeKey(String key) {
283
return key.getBytes(StandardCharsets.UTF_8);
284
}
285
286
@Override
287
public byte[] serializeUKey(String uk) {
288
return uk.getBytes(StandardCharsets.UTF_8);
289
}
290
291
@Override
292
public byte[] serializeUValue(UserData uv) {
293
// Assuming UserData has toProtobuf() method
294
return uv.toProtobuf().toByteArray();
295
}
296
297
@Override
298
public String deserializeUKey(byte[] ukArray) {
299
return new String(ukArray, StandardCharsets.UTF_8);
300
}
301
302
@Override
303
public UserData deserializeUValue(byte[] uvArray) {
304
try {
305
// Assuming UserData has fromProtobuf() method
306
return UserData.fromProtobuf(UserDataProto.parseFrom(uvArray));
307
} catch (Exception e) {
308
throw new RuntimeException("UserData deserialization failed", e);
309
}
310
}
311
}
312
```
313
314
### Integration with Storage Systems
315
316
The serialization framework integrates seamlessly with the storage layer:
317
318
```java
319
// Example: Using custom serialization with key-value stores
320
public class CustomSerializationExample {
321
322
public void demonstrateCustomSerialization() {
323
// Create backend with custom serialization
324
Map<String, String> config = new HashMap<>();
325
config.put("state.backend.type", "MEMORY");
326
AbstractStateBackend backend = StateBackendBuilder.buildStateBackend(config);
327
328
// Get key-value store
329
KeyValueStore<String, UserProfile> store = backend.getKeyValueStore("user-profiles");
330
331
// The store will use the configured serialization automatically
332
UserProfile profile = new UserProfile("Alice", 30, Arrays.asList("tech", "music"));
333
store.put("user123", profile);
334
335
UserProfile retrieved = store.get("user123");
336
System.out.println("Retrieved profile: " + retrieved.name);
337
}
338
339
// Example: Serialization with compression
340
public static class CompressedSerialization<K, V> implements KeyValueStoreSerialization<K, V> {
341
private final Deflater deflater = new Deflater();
342
private final Inflater inflater = new Inflater();
343
344
@Override
345
public byte[] serializeKey(K key) {
346
byte[] raw = Serializer.object2Bytes(key);
347
return compress(raw);
348
}
349
350
@Override
351
public byte[] serializeValue(V value) {
352
byte[] raw = Serializer.object2Bytes(value);
353
return compress(raw);
354
}
355
356
@Override
357
public V deserializeValue(byte[] valueArray) {
358
byte[] decompressed = decompress(valueArray);
359
return (V) Serializer.bytes2Object(decompressed);
360
}
361
362
private byte[] compress(byte[] data) {
363
deflater.setInput(data);
364
deflater.finish();
365
366
ByteArrayOutputStream baos = new ByteArrayOutputStream();
367
byte[] buffer = new byte[1024];
368
369
while (!deflater.finished()) {
370
int count = deflater.deflate(buffer);
371
baos.write(buffer, 0, count);
372
}
373
374
deflater.reset();
375
return baos.toByteArray();
376
}
377
378
private byte[] decompress(byte[] data) {
379
inflater.setInput(data);
380
381
ByteArrayOutputStream baos = new ByteArrayOutputStream();
382
byte[] buffer = new byte[1024];
383
384
try {
385
while (!inflater.finished()) {
386
int count = inflater.inflate(buffer);
387
baos.write(buffer, 0, count);
388
}
389
} catch (DataFormatException e) {
390
throw new RuntimeException("Decompression failed", e);
391
}
392
393
inflater.reset();
394
return baos.toByteArray();
395
}
396
}
397
}
398
```
399
400
### Performance and Best Practices
401
402
```java
403
// Best practices for serialization performance
404
public class SerializationBestPractices {
405
406
// 1. Reuse serialization instances when possible
407
private static final DefaultKeyValueStoreSerialization<String, Object> REUSABLE_SERIALIZER =
408
new DefaultKeyValueStoreSerialization<>();
409
410
// 2. Use appropriate serialization for data types
411
public void chooseAppropriateSerializer() {
412
// For simple types, FST is efficient
413
KeyValueStoreSerialization<String, String> stringSerializer = new DefaultKeyValueStoreSerialization<>();
414
415
// For complex nested objects, consider JSON or Protobuf
416
KeyValueStoreSerialization<String, ComplexObject> complexSerializer = new JsonKeyValueSerialization<>(String.class, ComplexObject.class);
417
418
// For high-frequency serialization, use binary formats
419
KeyValueStoreSerialization<String, byte[]> binarySerializer = new DefaultKeyValueStoreSerialization<>();
420
}
421
422
// 3. Handle serialization errors gracefully
423
public byte[] safeSerialize(Object value) {
424
try {
425
return Serializer.object2Bytes(value);
426
} catch (Exception e) {
427
// Log error and provide fallback
428
System.err.println("Serialization failed for value: " + value.getClass().getName());
429
430
// Return empty bytes or default serialization
431
return new byte[0];
432
}
433
}
434
435
// 4. Optimize for common use cases
436
public static class OptimizedStringMapSerializer implements KeyMapStoreSerializer<String, String, String> {
437
438
@Override
439
public byte[] serializeKey(String key) {
440
return key.getBytes(StandardCharsets.UTF_8);
441
}
442
443
@Override
444
public byte[] serializeUKey(String uk) {
445
return uk.getBytes(StandardCharsets.UTF_8);
446
}
447
448
@Override
449
public byte[] serializeUValue(String uv) {
450
return uv.getBytes(StandardCharsets.UTF_8);
451
}
452
453
@Override
454
public String deserializeUKey(byte[] ukArray) {
455
return new String(ukArray, StandardCharsets.UTF_8);
456
}
457
458
@Override
459
public String deserializeUValue(byte[] uvArray) {
460
return new String(uvArray, StandardCharsets.UTF_8);
461
}
462
}
463
}
464
```