or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdclient-operations.mdconfiguration-management.mdindex.mdmessage-protocol.mdsecurity-authentication.mdserver-operations.mdshuffle-database.mdtransport-context.md

shuffle-database.mddocs/

0

# Shuffle Database

1

2

The shuffle database API provides specialized key-value database functionality for handling shuffle data storage in Apache Spark. It supports both LevelDB and RocksDB backends, offering high-performance storage solutions optimized for Spark's shuffle operations with features like atomic operations, iteration support, and version management.

3

4

## Capabilities

5

6

### Database Interface

7

8

Core interface for key-value database operations used in shuffle data management.

9

10

```java { .api }

11

public interface DB extends Closeable {

12

/**

13

* Store a key-value pair in the database

14

* @param key - byte array representing the key

15

* @param value - byte array representing the value to store

16

* @throws IOException if the put operation fails

17

*/

18

void put(byte[] key, byte[] value) throws IOException;

19

20

/**

21

* Retrieve a value by its key from the database

22

* @param key - byte array representing the key to look up

23

* @return byte array containing the value, or null if key not found

24

* @throws IOException if the get operation fails

25

*/

26

byte[] get(byte[] key) throws IOException;

27

28

/**

29

* Delete a key-value pair from the database

30

* @param key - byte array representing the key to delete

31

* @throws IOException if the delete operation fails

32

*/

33

void delete(byte[] key) throws IOException;

34

35

/**

36

* Create an iterator for traversing all key-value pairs in the database

37

* @return DBIterator for iterating over database entries

38

*/

39

DBIterator iterator();

40

41

/**

42

* Close the database and release all associated resources

43

* @throws IOException if the close operation fails

44

*/

45

void close() throws IOException;

46

}

47

```

48

49

### Database Iterator

50

51

Iterator interface for traversing database entries with proper resource management.

52

53

```java { .api }

54

public interface DBIterator extends Iterator<Map.Entry<byte[], byte[]>>, Closeable {

55

/**

56

* Check if there are more entries to iterate over

57

* @return boolean indicating if more entries exist

58

*/

59

@Override

60

boolean hasNext();

61

62

/**

63

* Get the next key-value pair from the iterator

64

* @return Map.Entry containing the next key-value pair

65

* @throws NoSuchElementException if no more entries exist

66

*/

67

@Override

68

Map.Entry<byte[], byte[]> next();

69

70

/**

71

* Close the iterator and release associated resources

72

* @throws IOException if the close operation fails

73

*/

74

@Override

75

void close() throws IOException;

76

}

77

```

78

79

## Database Backends

80

81

### Database Backend Enumeration

82

83

Enumeration of supported database backend implementations.

84

85

```java { .api }

86

public enum DBBackend {

87

LEVELDB("leveldb"),

88

ROCKSDB("rocksdb");

89

90

private final String name;

91

92

DBBackend(String name) {

93

this.name = name;

94

}

95

96

/**

97

* Generate a filename for the database with the given prefix

98

* @param prefix - String prefix for the database filename

99

* @return String representing the complete filename

100

*/

101

public String fileName(String prefix) {

102

return prefix + "." + name;

103

}

104

105

/**

106

* Get the backend name

107

* @return String representing the backend name

108

*/

109

public String name() {

110

return name;

111

}

112

113

/**

114

* Get a database backend by name

115

* @param value - String name of the backend ("leveldb" or "rocksdb")

116

* @return DBBackend corresponding to the name

117

* @throws IllegalArgumentException if the name is not recognized

118

*/

119

public static DBBackend byName(String value) {

120

for (DBBackend backend : values()) {

121

if (backend.name.equals(value)) {

122

return backend;

123

}

124

}

125

throw new IllegalArgumentException("Unknown DB backend: " + value);

126

}

127

}

128

```

129

130

### LevelDB Implementation

131

132

LevelDB-based database implementation for shuffle data storage.

133

134

```java { .api }

135

public class LevelDB implements DB {

136

/**

137

* Create a LevelDB database instance

138

* @param file - File path where the database should be stored

139

* @throws IOException if database creation or opening fails

140

*/

141

public LevelDB(File file) throws IOException;

142

143

@Override

144

public void put(byte[] key, byte[] value) throws IOException;

145

146

@Override

147

public byte[] get(byte[] key) throws IOException;

148

149

@Override

150

public void delete(byte[] key) throws IOException;

151

152

@Override

153

public DBIterator iterator();

154

155

@Override

156

public void close() throws IOException;

157

158

/**

159

* Get the database file location

160

* @return File representing the database location

161

*/

162

public File getFile();

163

164

/**

165

* Check if the database is closed

166

* @return boolean indicating if the database is closed

167

*/

168

public boolean isClosed();

169

}

170

```

171

172

### LevelDB Iterator

173

174

Iterator implementation for LevelDB databases.

175

176

```java { .api }

177

public class LevelDBIterator implements DBIterator {

178

/**

179

* Create a LevelDB iterator (typically created through LevelDB.iterator())

180

* @param db - LevelDB instance to iterate over

181

*/

182

LevelDBIterator(LevelDB db);

183

184

@Override

185

public boolean hasNext();

186

187

@Override

188

public Map.Entry<byte[], byte[]> next();

189

190

@Override

191

public void close() throws IOException;

192

}

193

```

194

195

### RocksDB Implementation

196

197

RocksDB-based database implementation for shuffle data storage with enhanced performance features.

198

199

```java { .api }

200

public class RocksDB implements DB {

201

/**

202

* Create a RocksDB database instance

203

* @param file - File path where the database should be stored

204

* @throws IOException if database creation or opening fails

205

*/

206

public RocksDB(File file) throws IOException;

207

208

@Override

209

public void put(byte[] key, byte[] value) throws IOException;

210

211

@Override

212

public byte[] get(byte[] key) throws IOException;

213

214

@Override

215

public void delete(byte[] key) throws IOException;

216

217

@Override

218

public DBIterator iterator();

219

220

@Override

221

public void close() throws IOException;

222

223

/**

224

* Get the database file location

225

* @return File representing the database location

226

*/

227

public File getFile();

228

229

/**

230

* Check if the database is closed

231

* @return boolean indicating if the database is closed

232

*/

233

public boolean isClosed();

234

235

/**

236

* Perform a manual compaction of the database

237

* @throws IOException if compaction fails

238

*/

239

public void compactRange() throws IOException;

240

}

241

```

242

243

### RocksDB Iterator

244

245

Iterator implementation for RocksDB databases.

246

247

```java { .api }

248

public class RocksDBIterator implements DBIterator {

249

/**

250

* Create a RocksDB iterator (typically created through RocksDB.iterator())

251

* @param db - RocksDB instance to iterate over

252

*/

253

RocksDBIterator(RocksDB db);

254

255

@Override

256

public boolean hasNext();

257

258

@Override

259

public Map.Entry<byte[], byte[]> next();

260

261

@Override

262

public void close() throws IOException;

263

}

264

```

265

266

## Version Management

267

268

### Store Version

269

270

Version management for shuffle store data with backward compatibility support.

271

272

```java { .api }

273

public class StoreVersion {

274

/**

275

* Current version of the store format

276

*/

277

public static final StoreVersion CURRENT = new StoreVersion(1, 0);

278

279

/**

280

* Create a store version

281

* @param major - Major version number

282

* @param minor - Minor version number

283

*/

284

public StoreVersion(int major, int minor);

285

286

/**

287

* Get the major version number

288

* @return int representing the major version

289

*/

290

public int major();

291

292

/**

293

* Get the minor version number

294

* @return int representing the minor version

295

*/

296

public int minor();

297

298

/**

299

* Check if this version is compatible with another version

300

* @param other - StoreVersion to check compatibility against

301

* @return boolean indicating if versions are compatible

302

*/

303

public boolean isCompatible(StoreVersion other);

304

305

/**

306

* Write the version to a byte array

307

* @return byte array containing the encoded version

308

*/

309

public byte[] toBytes();

310

311

/**

312

* Read a version from a byte array

313

* @param bytes - byte array containing the encoded version

314

* @return StoreVersion decoded from the bytes

315

* @throws IllegalArgumentException if bytes are invalid

316

*/

317

public static StoreVersion fromBytes(byte[] bytes);

318

319

@Override

320

public String toString();

321

322

@Override

323

public boolean equals(Object obj);

324

325

@Override

326

public int hashCode();

327

}

328

```

329

330

## Database Providers

331

332

### Database Provider Interface

333

334

Base interface for database provider implementations.

335

336

```java { .api }

337

public interface DBProvider {

338

/**

339

* Initialize the database provider with configuration

340

* @param dbFile - File location for the database

341

* @param version - StoreVersion for the database format

342

* @throws IOException if initialization fails

343

*/

344

void init(File dbFile, StoreVersion version) throws IOException;

345

346

/**

347

* Get the database instance

348

* @return DB instance for database operations

349

* @throws IOException if database access fails

350

*/

351

DB getDB() throws IOException;

352

353

/**

354

* Close the database provider and cleanup resources

355

* @throws IOException if cleanup fails

356

*/

357

void close() throws IOException;

358

}

359

```

360

361

### LevelDB Provider

362

363

Provider implementation for LevelDB databases.

364

365

```java { .api }

366

public class LevelDBProvider implements DBProvider {

367

/**

368

* Create a LevelDB provider instance

369

*/

370

public LevelDBProvider();

371

372

@Override

373

public void init(File dbFile, StoreVersion version) throws IOException;

374

375

@Override

376

public DB getDB() throws IOException;

377

378

@Override

379

public void close() throws IOException;

380

381

/**

382

* Check if LevelDB is available on the system

383

* @return boolean indicating if LevelDB native libraries are available

384

*/

385

public static boolean isAvailable();

386

}

387

```

388

389

### RocksDB Provider

390

391

Provider implementation for RocksDB databases.

392

393

```java { .api }

394

public class RocksDBProvider implements DBProvider {

395

/**

396

* Create a RocksDB provider instance

397

*/

398

public RocksDBProvider();

399

400

@Override

401

public void init(File dbFile, StoreVersion version) throws IOException;

402

403

@Override

404

public DB getDB() throws IOException;

405

406

@Override

407

public void close() throws IOException;

408

409

/**

410

* Check if RocksDB is available on the system

411

* @return boolean indicating if RocksDB native libraries are available

412

*/

413

public static boolean isAvailable();

414

}

415

```

416

417

## Usage Examples

418

419

### Basic Database Operations

420

421

```java

422

import org.apache.spark.network.shuffledb.*;

423

import java.io.File;

424

425

// Create database directory

426

File dbDir = new File("shuffle-data");

427

dbDir.mkdirs();

428

429

// Create LevelDB instance

430

try (LevelDB levelDB = new LevelDB(dbDir)) {

431

// Store key-value pairs

432

String key1 = "shuffle-block-1";

433

String value1 = "block data content 1";

434

levelDB.put(key1.getBytes(), value1.getBytes());

435

436

String key2 = "shuffle-block-2";

437

String value2 = "block data content 2";

438

levelDB.put(key2.getBytes(), value2.getBytes());

439

440

// Retrieve values

441

byte[] retrievedValue1 = levelDB.get(key1.getBytes());

442

if (retrievedValue1 != null) {

443

System.out.println("Retrieved: " + new String(retrievedValue1));

444

}

445

446

// Check if key exists

447

byte[] nonExistentValue = levelDB.get("non-existent-key".getBytes());

448

System.out.println("Non-existent key result: " + (nonExistentValue == null ? "null" : "found"));

449

450

// Delete a key

451

levelDB.delete(key2.getBytes());

452

453

// Verify deletion

454

byte[] deletedValue = levelDB.get(key2.getBytes());

455

System.out.println("Deleted key result: " + (deletedValue == null ? "deleted" : "still exists"));

456

457

} catch (IOException e) {

458

System.err.println("Database operation failed: " + e.getMessage());

459

}

460

```

461

462

### Database Iteration

463

464

```java

465

import org.apache.spark.network.shuffledb.*;

466

import java.util.Map;

467

468

// Populate database with test data

469

try (RocksDB rocksDB = new RocksDB(new File("iteration-test"))) {

470

// Store multiple key-value pairs

471

for (int i = 0; i < 10; i++) {

472

String key = "key-" + String.format("%03d", i);

473

String value = "value-" + i;

474

rocksDB.put(key.getBytes(), value.getBytes());

475

}

476

477

// Iterate over all entries

478

System.out.println("Database contents:");

479

try (DBIterator iterator = rocksDB.iterator()) {

480

while (iterator.hasNext()) {

481

Map.Entry<byte[], byte[]> entry = iterator.next();

482

String key = new String(entry.getKey());

483

String value = new String(entry.getValue());

484

System.out.println(" " + key + " = " + value);

485

}

486

}

487

488

} catch (IOException e) {

489

System.err.println("Database iteration failed: " + e.getMessage());

490

}

491

```

492

493

### Backend Selection and Availability

494

495

```java

496

// Check backend availability

497

System.out.println("LevelDB available: " + LevelDBProvider.isAvailable());

498

System.out.println("RocksDB available: " + RocksDBProvider.isAvailable());

499

500

// Select backend based on availability and preference

501

DBBackend selectedBackend;

502

if (RocksDBProvider.isAvailable()) {

503

selectedBackend = DBBackend.ROCKSDB;

504

System.out.println("Using RocksDB backend");

505

} else if (LevelDBProvider.isAvailable()) {

506

selectedBackend = DBBackend.LEVELDB;

507

System.out.println("Using LevelDB backend");

508

} else {

509

throw new RuntimeException("No database backend available");

510

}

511

512

// Create database with selected backend

513

File dbFile = new File("shuffle-db");

514

String filename = selectedBackend.fileName("shuffle-store");

515

File fullDbPath = new File(dbFile, filename);

516

517

DB database;

518

switch (selectedBackend) {

519

case LEVELDB:

520

database = new LevelDB(fullDbPath);

521

break;

522

case ROCKSDB:

523

database = new RocksDB(fullDbPath);

524

break;

525

default:

526

throw new IllegalArgumentException("Unsupported backend: " + selectedBackend);

527

}

528

529

// Use database...

530

try {

531

database.put("test-key".getBytes(), "test-value".getBytes());

532

System.out.println("Database created and tested successfully");

533

} finally {

534

database.close();

535

}

536

```

537

538

### Database Provider Pattern

539

540

```java

541

import org.apache.spark.network.shuffledb.*;

542

543

// Create database provider

544

DBProvider provider;

545

if (RocksDBProvider.isAvailable()) {

546

provider = new RocksDBProvider();

547

} else {

548

provider = new LevelDBProvider();

549

}

550

551

try {

552

// Initialize provider with version

553

File dbLocation = new File("versioned-shuffle-db");

554

StoreVersion version = StoreVersion.CURRENT;

555

provider.init(dbLocation, version);

556

557

// Get database instance

558

DB database = provider.getDB();

559

560

// Use database for shuffle operations

561

String blockId = "shuffle_1_2_0";

562

byte[] blockData = "compressed shuffle block data".getBytes();

563

database.put(blockId.getBytes(), blockData);

564

565

// Retrieve and verify

566

byte[] retrievedData = database.get(blockId.getBytes());

567

System.out.println("Block stored and retrieved successfully: " + (retrievedData != null));

568

569

} catch (IOException e) {

570

System.err.println("Provider operation failed: " + e.getMessage());

571

} finally {

572

try {

573

provider.close();

574

} catch (IOException e) {

575

System.err.println("Provider cleanup failed: " + e.getMessage());

576

}

577

}

578

```

579

580

### Version Management

581

582

```java

583

// Work with store versions

584

StoreVersion currentVersion = StoreVersion.CURRENT;

585

System.out.println("Current version: " + currentVersion);

586

System.out.println("Major: " + currentVersion.major() + ", Minor: " + currentVersion.minor());

587

588

// Create custom version

589

StoreVersion customVersion = new StoreVersion(1, 1);

590

System.out.println("Custom version: " + customVersion);

591

592

// Check compatibility

593

boolean compatible = currentVersion.isCompatible(customVersion);

594

System.out.println("Versions compatible: " + compatible);

595

596

// Serialize version

597

byte[] versionBytes = currentVersion.toBytes();

598

System.out.println("Serialized version length: " + versionBytes.length + " bytes");

599

600

// Deserialize version

601

StoreVersion deserializedVersion = StoreVersion.fromBytes(versionBytes);

602

System.out.println("Deserialized version: " + deserializedVersion);

603

System.out.println("Versions equal: " + currentVersion.equals(deserializedVersion));

604

```

605

606

### Batch Operations

607

608

```java

609

// Perform batch operations for better performance

610

try (RocksDB rocksDB = new RocksDB(new File("batch-operations"))) {

611

612

// Batch insert operation

613

System.out.println("Performing batch insert...");

614

long startTime = System.currentTimeMillis();

615

616

for (int partition = 0; partition < 100; partition++) {

617

for (int block = 0; block < 50; block++) {

618

String key = String.format("shuffle_%d_%d_%d", 1, partition, block);

619

String value = "block-data-" + partition + "-" + block;

620

rocksDB.put(key.getBytes(), value.getBytes());

621

}

622

}

623

624

long insertTime = System.currentTimeMillis() - startTime;

625

System.out.println("Batch insert completed in " + insertTime + "ms");

626

627

// Batch read operation

628

System.out.println("Performing batch read...");

629

startTime = System.currentTimeMillis();

630

631

int foundCount = 0;

632

for (int partition = 0; partition < 100; partition++) {

633

for (int block = 0; block < 50; block++) {

634

String key = String.format("shuffle_%d_%d_%d", 1, partition, block);

635

byte[] value = rocksDB.get(key.getBytes());

636

if (value != null) {

637

foundCount++;

638

}

639

}

640

}

641

642

long readTime = System.currentTimeMillis() - startTime;

643

System.out.println("Batch read completed in " + readTime + "ms");

644

System.out.println("Found " + foundCount + " entries");

645

646

// Manual compaction for RocksDB

647

if (rocksDB instanceof RocksDB) {

648

System.out.println("Performing manual compaction...");

649

rocksDB.compactRange();

650

System.out.println("Compaction completed");

651

}

652

653

} catch (IOException e) {

654

System.err.println("Batch operation failed: " + e.getMessage());

655

}

656

```

657

658

### Database Cleanup and Resource Management

659

660

```java

661

// Proper resource management pattern

662

public class ShuffleDataManager {

663

private DB database;

664

private final File dbLocation;

665

666

public ShuffleDataManager(File dbLocation, DBBackend backend) throws IOException {

667

this.dbLocation = dbLocation;

668

669

switch (backend) {

670

case LEVELDB:

671

this.database = new LevelDB(dbLocation);

672

break;

673

case ROCKSDB:

674

this.database = new RocksDB(dbLocation);

675

break;

676

default:

677

throw new IllegalArgumentException("Unsupported backend: " + backend);

678

}

679

}

680

681

public void storeShuffleBlock(String blockId, byte[] data) throws IOException {

682

database.put(blockId.getBytes(), data);

683

}

684

685

public byte[] getShuffleBlock(String blockId) throws IOException {

686

return database.get(blockId.getBytes());

687

}

688

689

public void deleteShuffleBlock(String blockId) throws IOException {

690

database.delete(blockId.getBytes());

691

}

692

693

public void cleanup() {

694

if (database != null) {

695

try {

696

database.close();

697

} catch (IOException e) {

698

System.err.println("Failed to close database: " + e.getMessage());

699

}

700

}

701

}

702

703

// For testing: cleanup database files

704

public void deleteDatabase() {

705

cleanup();

706

if (dbLocation.exists()) {

707

deleteRecursively(dbLocation);

708

}

709

}

710

711

private void deleteRecursively(File file) {

712

if (file.isDirectory()) {

713

File[] children = file.listFiles();

714

if (children != null) {

715

for (File child : children) {

716

deleteRecursively(child);

717

}

718

}

719

}

720

file.delete();

721

}

722

}

723

724

// Usage

725

try {

726

ShuffleDataManager manager = new ShuffleDataManager(

727

new File("managed-shuffle-db"),

728

DBBackend.ROCKSDB

729

);

730

731

// Store shuffle data

732

manager.storeShuffleBlock("block-1", "shuffle data 1".getBytes());

733

manager.storeShuffleBlock("block-2", "shuffle data 2".getBytes());

734

735

// Retrieve shuffle data

736

byte[] block1Data = manager.getShuffleBlock("block-1");

737

System.out.println("Retrieved block 1: " + new String(block1Data));

738

739

// Cleanup

740

manager.cleanup();

741

742

} catch (IOException e) {

743

System.err.println("Shuffle data manager failed: " + e.getMessage());

744

}

745

```

746

747

## Best Practices

748

749

### Performance Optimization

750

751

1. **Batch Operations**: Group multiple put/get/delete operations together for better performance

752

2. **Iterator Management**: Always close iterators to prevent resource leaks

753

3. **Key Design**: Use consistent key naming schemes for better locality

754

4. **Compaction**: Use manual compaction for RocksDB in write-heavy scenarios

755

756

### Resource Management

757

758

1. **Database Lifecycle**: Always close databases using try-with-resources or explicit close() calls

759

2. **Provider Pattern**: Use DBProvider for better abstraction and configuration management

760

3. **Version Compatibility**: Check version compatibility when opening existing databases

761

4. **Backend Selection**: Choose backend based on performance requirements and availability

762

763

### Error Handling

764

765

```java

766

// Robust error handling pattern

767

public void robustDatabaseOperation(DB database, String key, byte[] value) {

768

try {

769

database.put(key.getBytes(), value);

770

System.out.println("Successfully stored key: " + key);

771

} catch (IOException e) {

772

System.err.println("Failed to store key " + key + ": " + e.getMessage());

773

// Implement retry logic or fallback behavior

774

handleDatabaseError(e, key, value);

775

}

776

}

777

778

private void handleDatabaseError(IOException e, String key, byte[] value) {

779

// Log error details

780

System.err.println("Database error details: " + e.getClass().getSimpleName());

781

782

// Implement retry with exponential backoff

783

// Or write to backup storage

784

// Or queue for later processing

785

}

786

```