or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bookkeeper-client-mocking.mdbookkeeper-server-testing.mdbookkeeper-testing-utilities.mdindex.mdzookeeper-mocking.md

bookkeeper-client-mocking.mddocs/

0

# BookKeeper Client Mocking

1

2

In-memory BookKeeper client implementation providing full distributed ledger functionality for testing without requiring a BookKeeper cluster. Supports all ledger operations with configurable behavior injection and failure simulation.

3

4

## Capabilities

5

6

### PulsarMockReadHandleInterceptor

7

8

Interface for intercepting and modifying read operations on mock ledger handles, enabling advanced testing scenarios.

9

10

```java { .api }

11

interface PulsarMockReadHandleInterceptor {

12

CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, LedgerEntries entries);

13

}

14

```

15

16

### PulsarMockBookKeeper

17

18

Main mock BookKeeper client that maintains all ledger data in memory while providing the complete BookKeeper API.

19

20

```java { .api }

21

class PulsarMockBookKeeper extends BookKeeper {

22

// Constructor

23

PulsarMockBookKeeper(OrderedExecutor orderedExecutor);

24

25

// Configuration

26

ClientConfiguration getConf();

27

OrderedExecutor getMainWorkerPool();

28

MetadataClientDriver getMetadataClientDriver();

29

30

// Static Methods

31

static Collection<BookieId> getMockEnsemble();

32

33

// Ledger Creation - Synchronous

34

LedgerHandle createLedger(DigestType digestType, byte[] passwd) throws BKException, InterruptedException;

35

LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;

36

LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;

37

void deleteLedger(long lId) throws InterruptedException, BKException;

38

39

// Ledger Creation - Asynchronous

40

void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx, Map<String, byte[]> properties);

41

void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx);

42

43

// Ledger Access

44

void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);

45

void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);

46

void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx);

47

48

// Builder Pattern APIs

49

OpenBuilder newOpenLedgerOp();

50

DeleteBuilder newDeleteLedgerOp();

51

52

// State Access

53

Set<Long> getLedgers();

54

Map<Long, PulsarMockLedgerHandle> getLedgerMap();

55

56

// Testing and Failure Injection

57

void delay(long millis);

58

void failNow(int rc);

59

void failAfter(int steps, int rc);

60

void addEntryFailAfter(int steps, int rc);

61

synchronized void returnEmptyLedgerAfter(int steps);

62

synchronized void addEntryDelay(long delay, TimeUnit unit);

63

synchronized void addEntryResponseDelay(long delay, TimeUnit unit);

64

65

// Interceptor Management

66

void setReadHandleInterceptor(PulsarMockReadHandleInterceptor readHandleInterceptor);

67

PulsarMockReadHandleInterceptor getReadHandleInterceptor();

68

69

// Resource Management

70

void close() throws InterruptedException, BKException;

71

void shutdown();

72

}

73

```

74

75

### PulsarMockLedgerHandle

76

77

Mock implementation of LedgerHandle providing in-memory ledger operations with full ReadHandle interface support.

78

79

```java { .api }

80

class PulsarMockLedgerHandle extends LedgerHandle {

81

// Constructor

82

PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd);

83

84

// State Access

85

long getId();

86

long getLastAddConfirmed();

87

long getLength();

88

boolean isClosed();

89

boolean isFenced();

90

91

// Entry Operations - Synchronous

92

long addEntry(byte[] data) throws InterruptedException, BKException;

93

94

// Entry Operations - Asynchronous

95

void asyncAddEntry(byte[] data, AddCallback cb, Object ctx);

96

void asyncAddEntry(byte[] data, int offset, int length, AddCallback cb, Object ctx);

97

void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx);

98

99

// Read Operations - Asynchronous (LedgerHandle interface)

100

void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx);

101

102

// Read Operations - CompletableFuture (ReadHandle interface)

103

CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);

104

CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);

105

CompletableFuture<Long> readLastAddConfirmedAsync();

106

CompletableFuture<Long> tryReadLastAddConfirmedAsync();

107

CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);

108

109

// Lifecycle

110

void asyncClose(CloseCallback cb, Object ctx);

111

}

112

```

113

114

### PulsarMockReadHandle

115

116

Separate read-only handle implementation for reading existing ledgers.

117

118

```java { .api }

119

class PulsarMockReadHandle implements ReadHandle {

120

// Constructor

121

PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, long lastAddConfirmed);

122

123

// ReadHandle Interface

124

long getId();

125

CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);

126

CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);

127

CompletableFuture<Long> readLastAddConfirmedAsync();

128

CompletableFuture<Long> tryReadLastAddConfirmedAsync();

129

CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);

130

boolean isClosed();

131

CompletableFuture<Void> closeAsync();

132

}

133

```

134

135

## Types

136

137

### BookKeeper Callback Interfaces

138

139

```java { .api }

140

interface CreateCallback {

141

void createComplete(int rc, LedgerHandle lh, Object ctx);

142

}

143

144

interface OpenCallback {

145

void openComplete(int rc, LedgerHandle lh, Object ctx);

146

}

147

148

interface DeleteCallback {

149

void deleteComplete(int rc, Object ctx);

150

}

151

152

interface AddCallback {

153

void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);

154

}

155

156

interface ReadCallback {

157

void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);

158

}

159

160

interface CloseCallback {

161

void closeComplete(int rc, LedgerHandle lh, Object ctx);

162

}

163

```

164

165

### BookKeeper Enums

166

167

```java { .api }

168

enum DigestType {

169

MAC, CRC32, CRC32C, DUMMY

170

}

171

```

172

173

### BookKeeper Builder Interfaces

174

175

```java { .api }

176

interface OpenBuilder {

177

OpenBuilder withPassword(byte[] password);

178

OpenBuilder withDigestType(DigestType digestType);

179

OpenBuilder withRecovery(boolean recovery);

180

CompletableFuture<ReadHandle> execute();

181

}

182

183

interface DeleteBuilder {

184

CompletableFuture<Void> execute();

185

}

186

```

187

188

### BookKeeper Exception Types

189

190

```java { .api }

191

class BKException extends Exception {

192

enum Code {

193

OK, ReadException, QuorumException, NoBookieAvailableException,

194

DigestNotInitializedException, DigestMatchException, NotEnoughBookiesException,

195

NoSuchLedgerExistsException, BookieHandleNotAvailableException,

196

ZKException, MetaStoreException, ClientClosedException,

197

LedgerRecoveryException, LedgerClosedException, WriteException,

198

NoSuchEntryException, IncorrectParameterException, InterruptedException,

199

ProtocolVersionException, MetadataVersionException, SecurityException,

200

IllegalOpException, InvalidCookieException, UnauthorizedAccessException,

201

UnavailableException, ReplicationException, LedgerFencedException,

202

TimeoutException, LedgerExistException

203

}

204

}

205

```

206

207

## Usage Examples

208

209

### Basic Ledger Operations

210

211

```java

212

import org.apache.bookkeeper.client.PulsarMockBookKeeper;

213

import org.apache.bookkeeper.client.LedgerHandle;

214

import org.apache.bookkeeper.client.api.DigestType;

215

import java.util.concurrent.Executors;

216

217

// Create mock BookKeeper

218

OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(4).name("test").build();

219

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

220

221

// Create ledger

222

LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());

223

224

// Add entries

225

long entryId1 = ledger.addEntry("entry1".getBytes());

226

long entryId2 = ledger.addEntry("entry2".getBytes());

227

228

// Read entries

229

Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());

230

231

// Close ledger

232

ledger.close();

233

mockBk.close();

234

```

235

236

### Asynchronous Operations

237

238

```java

239

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

240

241

// Async create ledger

242

mockBk.asyncCreateLedger(3, 2, 2, DigestType.CRC32, "password".getBytes(),

243

(rc, lh, ctx) -> {

244

if (rc == BKException.Code.OK) {

245

// Async add entry

246

lh.asyncAddEntry("data".getBytes(), (rc2, lh2, entryId, ctx2) -> {

247

if (rc2 == BKException.Code.OK) {

248

System.out.println("Added entry: " + entryId);

249

}

250

}, null);

251

}

252

}, null);

253

254

// Async open existing ledger

255

mockBk.asyncOpenLedger(ledgerId, DigestType.CRC32, "password".getBytes(),

256

(rc, lh, ctx) -> {

257

if (rc == BKException.Code.OK) {

258

// Async read entries

259

lh.asyncReadEntries(0, lh.getLastAddConfirmed(), (rc2, lh2, seq, ctx2) -> {

260

while (seq.hasMoreElements()) {

261

LedgerEntry entry = seq.nextElement();

262

System.out.println("Entry: " + new String(entry.getEntry()));

263

}

264

}, null);

265

}

266

}, null);

267

```

268

269

### CompletableFuture API

270

271

```java

272

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

273

274

// Using builder pattern with CompletableFuture

275

CompletableFuture<ReadHandle> future = mockBk.newOpenLedgerOp()

276

.withLedgerId(ledgerId)

277

.withPassword("password".getBytes())

278

.withDigestType(DigestType.CRC32)

279

.execute();

280

281

future.thenCompose(readHandle -> {

282

// Read entries asynchronously

283

return readHandle.readAsync(0, -1);

284

}).thenAccept(entries -> {

285

for (LedgerEntry entry : entries) {

286

System.out.println("Entry: " + new String(entry.getEntry()));

287

}

288

}).exceptionally(throwable -> {

289

System.err.println("Error: " + throwable.getMessage());

290

return null;

291

});

292

```

293

294

### Failure Injection Testing

295

296

```java

297

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

298

299

// Inject failure after 3 operations

300

mockBk.failAfter(3, BKException.Code.WriteException.getValue());

301

302

// Inject delays

303

mockBk.addEntryDelay(100, TimeUnit.MILLISECONDS);

304

mockBk.addEntryResponseDelay(50, TimeUnit.MILLISECONDS);

305

306

// Operations will fail/delay as configured

307

try {

308

LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());

309

ledger.addEntry("entry1".getBytes()); // Works

310

ledger.addEntry("entry2".getBytes()); // Works

311

ledger.addEntry("entry3".getBytes()); // Fails with WriteException

312

} catch (BKException e) {

313

System.out.println("Expected failure: " + e.getCode());

314

}

315

```

316

317

### Read Interception

318

319

```java

320

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

321

322

// Set up read interceptor

323

mockBk.setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> {

324

// Modify read results for testing

325

System.out.println("Intercepting read for ledger " + ledgerId);

326

return CompletableFuture.completedFuture(entries);

327

});

328

329

// Reads will be intercepted

330

ReadHandle readHandle = mockBk.newOpenLedgerOp()

331

.withLedgerId(ledgerId)

332

.withPassword("password".getBytes())

333

.execute().get();

334

335

// This read will trigger the interceptor

336

LedgerEntries entries = readHandle.readAsync(0, -1).get();

337

```