0
# Storage Backends
1
2
Apache Spark KVStore provides multiple storage backend implementations to suit different deployment scenarios, from development and testing to high-performance production environments. Each backend implements the same KVStore interface while offering different performance characteristics and resource requirements.
3
4
## Capabilities
5
6
### InMemoryStore
7
8
In-memory implementation that keeps all data deserialized in memory. Ideal for development, testing, and scenarios where data fits comfortably in memory.
9
10
```java { .api }
11
/**
12
* Implementation of KVStore that keeps data deserialized in memory.
13
* Does not persist data; suitable for testing and development.
14
*/
15
public class InMemoryStore implements KVStore {
16
17
/**
18
* Creates a new in-memory store instance.
19
*/
20
public InMemoryStore();
21
22
// Inherits all KVStore interface methods
23
}
24
```
25
26
**Characteristics:**
27
- **Performance**: Fastest read/write operations (no serialization/deserialization)
28
- **Memory Usage**: High - stores all data in deserialized form
29
- **Persistence**: None - data is lost when process ends
30
- **Indexing**: Dynamic sorting on query (no persistent indices)
31
- **Use Cases**: Unit testing, development, temporary caching
32
33
### LevelDB Backend
34
35
Production-ready persistent storage using Google's LevelDB. Provides excellent read performance and efficient storage with compression.
36
37
```java { .api }
38
/**
39
* Implementation of KVStore that uses LevelDB as the underlying data store.
40
* Provides persistent storage with compression and efficient indexing.
41
*/
42
public class LevelDB implements KVStore {
43
44
/**
45
* Store version for compatibility checking.
46
*/
47
public static final long STORE_VERSION = 1L;
48
49
/**
50
* Key used to store version information.
51
*/
52
public static final byte[] STORE_VERSION_KEY;
53
54
/**
55
* Creates a LevelDB store with default serializer.
56
* @param path Directory path where LevelDB files will be stored
57
* @throws Exception if store creation fails
58
*/
59
public LevelDB(File path) throws Exception;
60
61
/**
62
* Creates a LevelDB store with custom serializer.
63
* @param path Directory path where LevelDB files will be stored
64
* @param serializer Custom serializer for data transformation
65
* @throws Exception if store creation fails
66
*/
67
public LevelDB(File path, KVStoreSerializer serializer) throws Exception;
68
69
// Inherits all KVStore interface methods
70
}
71
```
72
73
**Characteristics:**
74
- **Performance**: Fast reads, good write performance
75
- **Memory Usage**: Low - data stored on disk with configurable cache
76
- **Persistence**: Full persistence with write-ahead logging
77
- **Compression**: Built-in compression (Snappy by default)
78
- **Indexing**: Persistent indices for efficient range queries
79
- **Use Cases**: Production applications, moderate to high data volumes
80
81
### RocksDB Backend
82
83
High-performance persistent storage using Facebook's RocksDB. Optimized for SSD storage and high-throughput scenarios.
84
85
```java { .api }
86
/**
87
* Implementation of KVStore that uses RocksDB as the underlying data store.
88
* Provides high-performance persistent storage optimized for modern hardware.
89
*/
90
public class RocksDB implements KVStore {
91
92
/**
93
* Store version for compatibility checking.
94
*/
95
public static final long STORE_VERSION = 1L;
96
97
/**
98
* Key used to store version information.
99
*/
100
public static final byte[] STORE_VERSION_KEY;
101
102
/**
103
* Creates a RocksDB store with default serializer.
104
* @param path Directory path where RocksDB files will be stored
105
* @throws Exception if store creation fails
106
*/
107
public RocksDB(File path) throws Exception;
108
109
/**
110
* Creates a RocksDB store with custom serializer.
111
* @param path Directory path where RocksDB files will be stored
112
* @param serializer Custom serializer for data transformation
113
* @throws Exception if store creation fails
114
*/
115
public RocksDB(File path, KVStoreSerializer serializer) throws Exception;
116
117
// Inherits all KVStore interface methods
118
}
119
```
120
121
**Characteristics:**
122
- **Performance**: Highest throughput, optimized for SSDs
123
- **Memory Usage**: Configurable - can use significant memory for caching
124
- **Persistence**: Full persistence with advanced write-ahead logging
125
- **Compression**: Multi-level compression (LZ4 + ZSTD)
126
- **Indexing**: Advanced indexing with bloom filters and prefix seeks
127
- **Use Cases**: High-performance applications, large data volumes, SSD storage
128
129
**Usage Examples:**
130
131
### Basic Store Creation
132
133
```java
134
import org.apache.spark.util.kvstore.*;
135
import java.io.File;
136
137
// In-memory store for testing
138
KVStore memoryStore = new InMemoryStore();
139
140
// LevelDB for moderate production use
141
KVStore levelStore = new LevelDB(new File("/data/app/leveldb"));
142
143
// RocksDB for high-performance scenarios
144
KVStore rocksStore = new RocksDB(new File("/data/app/rocksdb"));
145
146
// All stores implement the same interface
147
public void processData(KVStore store) {
148
// Same code works with any backend
149
store.write(new MyData("key1", "value1"));
150
MyData data = store.read(MyData.class, "key1");
151
long count = store.count(MyData.class);
152
}
153
```
154
155
### Custom Serialization
156
157
```java
158
// Create custom serializer by extending KVStoreSerializer
159
public class CustomKVStoreSerializer extends KVStoreSerializer {
160
public CustomKVStoreSerializer() {
161
super();
162
// Configure the inherited ObjectMapper for custom needs
163
mapper.registerModule(new com.fasterxml.jackson.datatype.jsr310.JavaTimeModule());
164
}
165
}
166
167
// Use with any persistent backend
168
KVStore store = new LevelDB(new File("/data/store"), new CustomKVStoreSerializer());
169
// or
170
KVStore store = new RocksDB(new File("/data/store"), new CustomKVStoreSerializer());
171
172
// Store objects with Java 8 time types
173
public class Event {
174
@KVIndex
175
public String id;
176
177
@KVIndex("timestamp")
178
public java.time.Instant timestamp;
179
180
public Event(String id, java.time.Instant timestamp) {
181
this.id = id;
182
this.timestamp = timestamp;
183
}
184
}
185
186
store.write(new Event("event1", java.time.Instant.now()));
187
```
188
189
### Performance Optimization
190
191
```java
192
// LevelDB with larger write buffer for bulk operations
193
System.setProperty("leveldb.writebuffer.size", "67108864"); // 64MB
194
KVStore levelStore = new LevelDB(new File("/data/leveldb"));
195
196
// RocksDB automatically uses optimized settings for:
197
// - Bloom filters for faster key lookups
198
// - Multi-level compression (LZ4 + ZSTD)
199
// - Index compression disabled for faster access
200
// - Optimized block restart intervals
201
KVStore rocksStore = new RocksDB(new File("/data/rocksdb"));
202
203
// Batch operations for better performance
204
List<MyData> largeBatch = generateLargeDataset();
205
for (MyData item : largeBatch) {
206
store.write(item); // Each backend optimizes batch writes internally
207
}
208
```
209
210
### Store Selection Guidelines
211
212
```java
213
/**
214
* Choose storage backend based on requirements:
215
*/
216
217
// Development and Testing
218
if (environment.equals("test") || dataSize < 100_000) {
219
return new InMemoryStore();
220
}
221
222
// Production with moderate data volumes (< 10GB)
223
if (dataSize < 10_000_000 && !requiresHighThroughput) {
224
return new LevelDB(dataPath);
225
}
226
227
// High-performance production (large data, high throughput)
228
if (hasSSDs && requiresHighThroughput) {
229
return new RocksDB(dataPath);
230
}
231
232
// Default safe choice for production
233
return new LevelDB(dataPath);
234
```
235
236
### Resource Management
237
238
```java
239
// All stores should be properly closed
240
try (KVStore store = new LevelDB(new File("/data/store"))) {
241
242
// Perform operations
243
store.write(data);
244
MyData result = store.read(MyData.class, key);
245
246
// Store is automatically closed due to try-with-resources
247
248
} catch (Exception e) {
249
logger.error("Store operation failed", e);
250
}
251
252
// Manual resource management
253
KVStore store = new RocksDB(new File("/data/store"));
254
try {
255
// Use store
256
processData(store);
257
} finally {
258
store.close(); // Always close to release resources
259
}
260
```
261
262
### Migration Between Backends
263
264
```java
265
/**
266
* Utility to migrate data between different storage backends.
267
*/
268
public class StoreMigration {
269
270
public static void migrate(KVStore source, KVStore target, Class<?>... types)
271
throws Exception {
272
273
for (Class<?> type : types) {
274
// Copy metadata
275
Object metadata = source.getMetadata(type);
276
if (metadata != null) {
277
target.setMetadata(metadata);
278
}
279
280
// Copy all instances of each type
281
KVStoreView<?> view = source.view(type);
282
for (Object item : view) {
283
target.write(item);
284
}
285
}
286
}
287
}
288
289
// Example: Upgrade from LevelDB to RocksDB
290
KVStore oldStore = new LevelDB(new File("/data/old"));
291
KVStore newStore = new RocksDB(new File("/data/new"));
292
293
try {
294
StoreMigration.migrate(oldStore, newStore,
295
Employee.class, Task.class, Project.class);
296
} finally {
297
oldStore.close();
298
newStore.close();
299
}
300
```
301
302
### Monitoring and Diagnostics
303
304
```java
305
// Store statistics and health checks
306
public class StoreMonitoring {
307
308
public static void checkStoreHealth(KVStore store, Class<?>... types)
309
throws Exception {
310
311
System.out.println("Store Health Check:");
312
313
for (Class<?> type : types) {
314
long count = store.count(type);
315
System.out.println(type.getSimpleName() + ": " + count + " items");
316
}
317
318
// Check if store is responsive
319
long startTime = System.currentTimeMillis();
320
store.getMetadata(String.class); // Simple read operation
321
long responseTime = System.currentTimeMillis() - startTime;
322
323
System.out.println("Response time: " + responseTime + "ms");
324
325
if (responseTime > 1000) {
326
System.out.println("WARNING: Store is responding slowly");
327
}
328
}
329
}
330
```