0
# Shuffle Database
1
2
The shuffle database API provides specialized key-value database functionality for handling shuffle data storage in Apache Spark. It supports both LevelDB and RocksDB backends, offering high-performance storage solutions optimized for Spark's shuffle operations with features like atomic operations, iteration support, and version management.
3
4
## Capabilities
5
6
### Database Interface
7
8
Core interface for key-value database operations used in shuffle data management.
9
10
```java { .api }
11
public interface DB extends Closeable {
12
/**
13
* Store a key-value pair in the database
14
* @param key - byte array representing the key
15
* @param value - byte array representing the value to store
16
* @throws IOException if the put operation fails
17
*/
18
void put(byte[] key, byte[] value) throws IOException;
19
20
/**
21
* Retrieve a value by its key from the database
22
* @param key - byte array representing the key to look up
23
* @return byte array containing the value, or null if key not found
24
* @throws IOException if the get operation fails
25
*/
26
byte[] get(byte[] key) throws IOException;
27
28
/**
29
* Delete a key-value pair from the database
30
* @param key - byte array representing the key to delete
31
* @throws IOException if the delete operation fails
32
*/
33
void delete(byte[] key) throws IOException;
34
35
/**
36
* Create an iterator for traversing all key-value pairs in the database
37
* @return DBIterator for iterating over database entries
38
*/
39
DBIterator iterator();
40
41
/**
42
* Close the database and release all associated resources
43
* @throws IOException if the close operation fails
44
*/
45
void close() throws IOException;
46
}
47
```
48
49
### Database Iterator
50
51
Iterator interface for traversing database entries with proper resource management.
52
53
```java { .api }
54
public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {
55
/**
56
* Check if there are more entries to iterate over
57
* @return boolean indicating if more entries exist
58
*/
59
@Override
60
boolean hasNext();
61
62
/**
63
* Get the next key-value pair from the iterator
64
* @return Map.Entry containing the next key-value pair
65
* @throws NoSuchElementException if no more entries exist
66
*/
67
@Override
68
Map.Entry<byte[], byte[]> next();
69
70
/**
71
* Close the iterator and release associated resources
72
* @throws IOException if the close operation fails
73
*/
74
@Override
75
void close() throws IOException;
76
}
77
```
78
79
## Database Backends
80
81
### Database Backend Enumeration
82
83
Enumeration of supported database backend implementations.
84
85
```java { .api }
86
public enum DBBackend {
87
LEVELDB("leveldb"),
88
ROCKSDB("rocksdb");
89
90
private final String name;
91
92
DBBackend(String name) {
93
this.name = name;
94
}
95
96
/**
97
* Generate a filename for the database with the given prefix
98
* @param prefix - String prefix for the database filename
99
* @return String representing the complete filename
100
*/
101
public String fileName(String prefix) {
102
return prefix + "." + name;
103
}
104
105
/**
106
* Get the backend name
107
* @return String representing the backend name
108
*/
109
public String name() {
110
return name;
111
}
112
113
/**
114
* Get a database backend by name
115
* @param value - String name of the backend ("leveldb" or "rocksdb")
116
* @return DBBackend corresponding to the name
117
* @throws IllegalArgumentException if the name is not recognized
118
*/
119
public static DBBackend byName(String value) {
120
for (DBBackend backend : values()) {
121
if (backend.name.equals(value)) {
122
return backend;
123
}
124
}
125
throw new IllegalArgumentException("Unknown DB backend: " + value);
126
}
127
}
128
```
129
130
### LevelDB Implementation
131
132
LevelDB-based database implementation for shuffle data storage.
133
134
```java { .api }
135
public class LevelDB implements DB {
136
/**
137
* Create a LevelDB database instance
138
* @param file - File path where the database should be stored
139
* @throws IOException if database creation or opening fails
140
*/
141
public LevelDB(File file) throws IOException;
142
143
@Override
144
public void put(byte[] key, byte[] value) throws IOException;
145
146
@Override
147
public byte[] get(byte[] key) throws IOException;
148
149
@Override
150
public void delete(byte[] key) throws IOException;
151
152
@Override
153
public DBIterator iterator();
154
155
@Override
156
public void close() throws IOException;
157
158
/**
159
* Get the database file location
160
* @return File representing the database location
161
*/
162
public File getFile();
163
164
/**
165
* Check if the database is closed
166
* @return boolean indicating if the database is closed
167
*/
168
public boolean isClosed();
169
}
170
```
171
172
### LevelDB Iterator
173
174
Iterator implementation for LevelDB databases.
175
176
```java { .api }
177
public class LevelDBIterator implements DBIterator {
178
/**
179
* Create a LevelDB iterator (typically created through LevelDB.iterator())
180
* @param db - LevelDB instance to iterate over
181
*/
182
LevelDBIterator(LevelDB db);
183
184
@Override
185
public boolean hasNext();
186
187
@Override
188
public Map.Entry<byte[], byte[]> next();
189
190
@Override
191
public void close() throws IOException;
192
}
193
```
194
195
### RocksDB Implementation
196
197
RocksDB-based database implementation for shuffle data storage with enhanced performance features.
198
199
```java { .api }
200
public class RocksDB implements DB {
201
/**
202
* Create a RocksDB database instance
203
* @param file - File path where the database should be stored
204
* @throws IOException if database creation or opening fails
205
*/
206
public RocksDB(File file) throws IOException;
207
208
@Override
209
public void put(byte[] key, byte[] value) throws IOException;
210
211
@Override
212
public byte[] get(byte[] key) throws IOException;
213
214
@Override
215
public void delete(byte[] key) throws IOException;
216
217
@Override
218
public DBIterator iterator();
219
220
@Override
221
public void close() throws IOException;
222
223
/**
224
* Get the database file location
225
* @return File representing the database location
226
*/
227
public File getFile();
228
229
/**
230
* Check if the database is closed
231
* @return boolean indicating if the database is closed
232
*/
233
public boolean isClosed();
234
235
/**
236
* Perform a manual compaction of the database
237
* @throws IOException if compaction fails
238
*/
239
public void compactRange() throws IOException;
240
}
241
```
242
243
### RocksDB Iterator
244
245
Iterator implementation for RocksDB databases.
246
247
```java { .api }
248
public class RocksDBIterator implements DBIterator {
249
/**
250
* Create a RocksDB iterator (typically created through RocksDB.iterator())
251
* @param db - RocksDB instance to iterate over
252
*/
253
RocksDBIterator(RocksDB db);
254
255
@Override
256
public boolean hasNext();
257
258
@Override
259
public Map.Entry<byte[], byte[]> next();
260
261
@Override
262
public void close() throws IOException;
263
}
264
```
265
266
## Version Management
267
268
### Store Version
269
270
Version management for shuffle store data with backward compatibility support.
271
272
```java { .api }
273
public class StoreVersion {
274
/**
275
* Current version of the store format
276
*/
277
public static final StoreVersion CURRENT = new StoreVersion(1, 0);
278
279
/**
280
* Create a store version
281
* @param major - Major version number
282
* @param minor - Minor version number
283
*/
284
public StoreVersion(int major, int minor);
285
286
/**
287
* Get the major version number
288
* @return int representing the major version
289
*/
290
public int major();
291
292
/**
293
* Get the minor version number
294
* @return int representing the minor version
295
*/
296
public int minor();
297
298
/**
299
* Check if this version is compatible with another version
300
* @param other - StoreVersion to check compatibility against
301
* @return boolean indicating if versions are compatible
302
*/
303
public boolean isCompatible(StoreVersion other);
304
305
/**
306
* Write the version to a byte array
307
* @return byte array containing the encoded version
308
*/
309
public byte[] toBytes();
310
311
/**
312
* Read a version from a byte array
313
* @param bytes - byte array containing the encoded version
314
* @return StoreVersion decoded from the bytes
315
* @throws IllegalArgumentException if bytes are invalid
316
*/
317
public static StoreVersion fromBytes(byte[] bytes);
318
319
@Override
320
public String toString();
321
322
@Override
323
public boolean equals(Object obj);
324
325
@Override
326
public int hashCode();
327
}
328
```
329
330
## Database Providers
331
332
### Database Provider Interface
333
334
Base interface for database provider implementations.
335
336
```java { .api }
337
public interface DBProvider {
338
/**
339
* Initialize the database provider with configuration
340
* @param dbFile - File location for the database
341
* @param version - StoreVersion for the database format
342
* @throws IOException if initialization fails
343
*/
344
void init(File dbFile, StoreVersion version) throws IOException;
345
346
/**
347
* Get the database instance
348
* @return DB instance for database operations
349
* @throws IOException if database access fails
350
*/
351
DB getDB() throws IOException;
352
353
/**
354
* Close the database provider and cleanup resources
355
* @throws IOException if cleanup fails
356
*/
357
void close() throws IOException;
358
}
359
```
360
361
### LevelDB Provider
362
363
Provider implementation for LevelDB databases.
364
365
```java { .api }
366
public class LevelDBProvider implements DBProvider {
367
/**
368
* Create a LevelDB provider instance
369
*/
370
public LevelDBProvider();
371
372
@Override
373
public void init(File dbFile, StoreVersion version) throws IOException;
374
375
@Override
376
public DB getDB() throws IOException;
377
378
@Override
379
public void close() throws IOException;
380
381
/**
382
* Check if LevelDB is available on the system
383
* @return boolean indicating if LevelDB native libraries are available
384
*/
385
public static boolean isAvailable();
386
}
387
```
388
389
### RocksDB Provider
390
391
Provider implementation for RocksDB databases.
392
393
```java { .api }
394
public class RocksDBProvider implements DBProvider {
395
/**
396
* Create a RocksDB provider instance
397
*/
398
public RocksDBProvider();
399
400
@Override
401
public void init(File dbFile, StoreVersion version) throws IOException;
402
403
@Override
404
public DB getDB() throws IOException;
405
406
@Override
407
public void close() throws IOException;
408
409
/**
410
* Check if RocksDB is available on the system
411
* @return boolean indicating if RocksDB native libraries are available
412
*/
413
public static boolean isAvailable();
414
}
415
```
416
417
## Usage Examples
418
419
### Basic Database Operations
420
421
```java
422
import org.apache.spark.network.shuffledb.*;
423
import java.io.File;
424
425
// Create database directory
426
File dbDir = new File("shuffle-data");
427
dbDir.mkdirs();
428
429
// Create LevelDB instance
430
try (LevelDB levelDB = new LevelDB(dbDir)) {
431
// Store key-value pairs
432
String key1 = "shuffle-block-1";
433
String value1 = "block data content 1";
434
levelDB.put(key1.getBytes(), value1.getBytes());
435
436
String key2 = "shuffle-block-2";
437
String value2 = "block data content 2";
438
levelDB.put(key2.getBytes(), value2.getBytes());
439
440
// Retrieve values
441
byte[] retrievedValue1 = levelDB.get(key1.getBytes());
442
if (retrievedValue1 != null) {
443
System.out.println("Retrieved: " + new String(retrievedValue1));
444
}
445
446
// Check if key exists
447
byte[] nonExistentValue = levelDB.get("non-existent-key".getBytes());
448
System.out.println("Non-existent key result: " + (nonExistentValue == null ? "null" : "found"));
449
450
// Delete a key
451
levelDB.delete(key2.getBytes());
452
453
// Verify deletion
454
byte[] deletedValue = levelDB.get(key2.getBytes());
455
System.out.println("Deleted key result: " + (deletedValue == null ? "deleted" : "still exists"));
456
457
} catch (IOException e) {
458
System.err.println("Database operation failed: " + e.getMessage());
459
}
460
```
461
462
### Database Iteration
463
464
```java
465
import org.apache.spark.network.shuffledb.*;
466
import java.util.Map;
467
468
// Populate database with test data
469
try (RocksDB rocksDB = new RocksDB(new File("iteration-test"))) {
470
// Store multiple key-value pairs
471
for (int i = 0; i < 10; i++) {
472
String key = "key-" + String.format("%03d", i);
473
String value = "value-" + i;
474
rocksDB.put(key.getBytes(), value.getBytes());
475
}
476
477
// Iterate over all entries
478
System.out.println("Database contents:");
479
try (DBIterator iterator = rocksDB.iterator()) {
480
while (iterator.hasNext()) {
481
Map.Entry<byte[], byte[]> entry = iterator.next();
482
String key = new String(entry.getKey());
483
String value = new String(entry.getValue());
484
System.out.println(" " + key + " = " + value);
485
}
486
}
487
488
} catch (IOException e) {
489
System.err.println("Database iteration failed: " + e.getMessage());
490
}
491
```
492
493
### Backend Selection and Availability
494
495
```java
496
// Check backend availability
497
System.out.println("LevelDB available: " + LevelDBProvider.isAvailable());
498
System.out.println("RocksDB available: " + RocksDBProvider.isAvailable());
499
500
// Select backend based on availability and preference
501
DBBackend selectedBackend;
502
if (RocksDBProvider.isAvailable()) {
503
selectedBackend = DBBackend.ROCKSDB;
504
System.out.println("Using RocksDB backend");
505
} else if (LevelDBProvider.isAvailable()) {
506
selectedBackend = DBBackend.LEVELDB;
507
System.out.println("Using LevelDB backend");
508
} else {
509
throw new RuntimeException("No database backend available");
510
}
511
512
// Create database with selected backend
513
File dbFile = new File("shuffle-db");
514
String filename = selectedBackend.fileName("shuffle-store");
515
File fullDbPath = new File(dbFile, filename);
516
517
DB database;
518
switch (selectedBackend) {
519
case LEVELDB:
520
database = new LevelDB(fullDbPath);
521
break;
522
case ROCKSDB:
523
database = new RocksDB(fullDbPath);
524
break;
525
default:
526
throw new IllegalArgumentException("Unsupported backend: " + selectedBackend);
527
}
528
529
// Use database...
530
try {
531
database.put("test-key".getBytes(), "test-value".getBytes());
532
System.out.println("Database created and tested successfully");
533
} finally {
534
database.close();
535
}
536
```
537
538
### Database Provider Pattern
539
540
```java
541
import org.apache.spark.network.shuffledb.*;
542
543
// Create database provider
544
DBProvider provider;
545
if (RocksDBProvider.isAvailable()) {
546
provider = new RocksDBProvider();
547
} else {
548
provider = new LevelDBProvider();
549
}
550
551
try {
552
// Initialize provider with version
553
File dbLocation = new File("versioned-shuffle-db");
554
StoreVersion version = StoreVersion.CURRENT;
555
provider.init(dbLocation, version);
556
557
// Get database instance
558
DB database = provider.getDB();
559
560
// Use database for shuffle operations
561
String blockId = "shuffle_1_2_0";
562
byte[] blockData = "compressed shuffle block data".getBytes();
563
database.put(blockId.getBytes(), blockData);
564
565
// Retrieve and verify
566
byte[] retrievedData = database.get(blockId.getBytes());
567
System.out.println("Block stored and retrieved successfully: " + (retrievedData != null));
568
569
} catch (IOException e) {
570
System.err.println("Provider operation failed: " + e.getMessage());
571
} finally {
572
try {
573
provider.close();
574
} catch (IOException e) {
575
System.err.println("Provider cleanup failed: " + e.getMessage());
576
}
577
}
578
```
579
580
### Version Management
581
582
```java
583
// Work with store versions
584
StoreVersion currentVersion = StoreVersion.CURRENT;
585
System.out.println("Current version: " + currentVersion);
586
System.out.println("Major: " + currentVersion.major() + ", Minor: " + currentVersion.minor());
587
588
// Create custom version
589
StoreVersion customVersion = new StoreVersion(1, 1);
590
System.out.println("Custom version: " + customVersion);
591
592
// Check compatibility
593
boolean compatible = currentVersion.isCompatible(customVersion);
594
System.out.println("Versions compatible: " + compatible);
595
596
// Serialize version
597
byte[] versionBytes = currentVersion.toBytes();
598
System.out.println("Serialized version length: " + versionBytes.length + " bytes");
599
600
// Deserialize version
601
StoreVersion deserializedVersion = StoreVersion.fromBytes(versionBytes);
602
System.out.println("Deserialized version: " + deserializedVersion);
603
System.out.println("Versions equal: " + currentVersion.equals(deserializedVersion));
604
```
605
606
### Batch Operations
607
608
```java
609
// Perform batch operations for better performance
610
try (RocksDB rocksDB = new RocksDB(new File("batch-operations"))) {
611
612
// Batch insert operation
613
System.out.println("Performing batch insert...");
614
long startTime = System.currentTimeMillis();
615
616
for (int partition = 0; partition < 100; partition++) {
617
for (int block = 0; block < 50; block++) {
618
String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
619
String value = "block-data-" + partition + "-" + block;
620
rocksDB.put(key.getBytes(), value.getBytes());
621
}
622
}
623
624
long insertTime = System.currentTimeMillis() - startTime;
625
System.out.println("Batch insert completed in " + insertTime + "ms");
626
627
// Batch read operation
628
System.out.println("Performing batch read...");
629
startTime = System.currentTimeMillis();
630
631
int foundCount = 0;
632
for (int partition = 0; partition < 100; partition++) {
633
for (int block = 0; block < 50; block++) {
634
String key = String.format("shuffle_%d_%d_%d", 1, partition, block);
635
byte[] value = rocksDB.get(key.getBytes());
636
if (value != null) {
637
foundCount++;
638
}
639
}
640
}
641
642
long readTime = System.currentTimeMillis() - startTime;
643
System.out.println("Batch read completed in " + readTime + "ms");
644
System.out.println("Found " + foundCount + " entries");
645
646
// Manual compaction for RocksDB
647
if (rocksDB instanceof RocksDB) {
648
System.out.println("Performing manual compaction...");
649
rocksDB.compactRange();
650
System.out.println("Compaction completed");
651
}
652
653
} catch (IOException e) {
654
System.err.println("Batch operation failed: " + e.getMessage());
655
}
656
```
657
658
### Database Cleanup and Resource Management
659
660
```java
661
// Proper resource management pattern
662
public class ShuffleDataManager {
663
private DB database;
664
private final File dbLocation;
665
666
public ShuffleDataManager(File dbLocation, DBBackend backend) throws IOException {
667
this.dbLocation = dbLocation;
668
669
switch (backend) {
670
case LEVELDB:
671
this.database = new LevelDB(dbLocation);
672
break;
673
case ROCKSDB:
674
this.database = new RocksDB(dbLocation);
675
break;
676
default:
677
throw new IllegalArgumentException("Unsupported backend: " + backend);
678
}
679
}
680
681
public void storeShuffleBlock(String blockId, byte[] data) throws IOException {
682
database.put(blockId.getBytes(), data);
683
}
684
685
public byte[] getShuffleBlock(String blockId) throws IOException {
686
return database.get(blockId.getBytes());
687
}
688
689
public void deleteShuffleBlock(String blockId) throws IOException {
690
database.delete(blockId.getBytes());
691
}
692
693
public void cleanup() {
694
if (database != null) {
695
try {
696
database.close();
697
} catch (IOException e) {
698
System.err.println("Failed to close database: " + e.getMessage());
699
}
700
}
701
}
702
703
// For testing: cleanup database files
704
public void deleteDatabase() {
705
cleanup();
706
if (dbLocation.exists()) {
707
deleteRecursively(dbLocation);
708
}
709
}
710
711
private void deleteRecursively(File file) {
712
if (file.isDirectory()) {
713
File[] children = file.listFiles();
714
if (children != null) {
715
for (File child : children) {
716
deleteRecursively(child);
717
}
718
}
719
}
720
file.delete();
721
}
722
}
723
724
// Usage
725
try {
726
ShuffleDataManager manager = new ShuffleDataManager(
727
new File("managed-shuffle-db"),
728
DBBackend.ROCKSDB
729
);
730
731
// Store shuffle data
732
manager.storeShuffleBlock("block-1", "shuffle data 1".getBytes());
733
manager.storeShuffleBlock("block-2", "shuffle data 2".getBytes());
734
735
// Retrieve shuffle data
736
byte[] block1Data = manager.getShuffleBlock("block-1");
737
System.out.println("Retrieved block 1: " + new String(block1Data));
738
739
// Cleanup
740
manager.cleanup();
741
742
} catch (IOException e) {
743
System.err.println("Shuffle data manager failed: " + e.getMessage());
744
}
745
```
746
747
## Best Practices
748
749
### Performance Optimization
750
751
1. **Batch Operations**: Group multiple put/get/delete operations together for better performance
752
2. **Iterator Management**: Always close iterators to prevent resource leaks
753
3. **Key Design**: Use consistent key naming schemes for better locality
754
4. **Compaction**: Use manual compaction for RocksDB in write-heavy scenarios
755
756
### Resource Management
757
758
1. **Database Lifecycle**: Always close databases using try-with-resources or explicit close() calls
759
2. **Provider Pattern**: Use DBProvider for better abstraction and configuration management
760
3. **Version Compatibility**: Check version compatibility when opening existing databases
761
4. **Backend Selection**: Choose backend based on performance requirements and availability
762
763
### Error Handling
764
765
```java
766
// Robust error handling pattern
767
public void robustDatabaseOperation(DB database, String key, byte[] value) {
768
try {
769
database.put(key.getBytes(), value);
770
System.out.println("Successfully stored key: " + key);
771
} catch (IOException e) {
772
System.err.println("Failed to store key " + key + ": " + e.getMessage());
773
// Implement retry logic or fallback behavior
774
handleDatabaseError(e, key, value);
775
}
776
}
777
778
private void handleDatabaseError(IOException e, String key, byte[] value) {
779
// Log error details
780
System.err.println("Database error details: " + e.getClass().getSimpleName());
781
782
// Implement retry with exponential backoff
783
// Or write to backup storage
784
// Or queue for later processing
785
}
786
```