0
# BookKeeper Client Mocking
1
2
In-memory BookKeeper client implementation providing full distributed ledger functionality for testing without requiring a BookKeeper cluster. Supports all ledger operations with configurable behavior injection and failure simulation.
3
4
## Capabilities
5
6
### PulsarMockReadHandleInterceptor
7
8
Interface for intercepting and modifying read operations on mock ledger handles, enabling advanced testing scenarios.
9
10
```java { .api }
11
interface PulsarMockReadHandleInterceptor {
12
CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, LedgerEntries entries);
13
}
14
```
15
16
### PulsarMockBookKeeper
17
18
Main mock BookKeeper client that maintains all ledger data in memory while providing the complete BookKeeper API.
19
20
```java { .api }
21
class PulsarMockBookKeeper extends BookKeeper {
22
// Constructor
23
PulsarMockBookKeeper(OrderedExecutor orderedExecutor);
24
25
// Configuration
26
ClientConfiguration getConf();
27
OrderedExecutor getMainWorkerPool();
28
MetadataClientDriver getMetadataClientDriver();
29
30
// Static Methods
31
static Collection<BookieId> getMockEnsemble();
32
33
// Ledger Creation - Synchronous
34
LedgerHandle createLedger(DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
35
LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
36
LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
37
void deleteLedger(long lId) throws InterruptedException, BKException;
38
39
// Ledger Creation - Asynchronous
40
void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx, Map<String, byte[]> properties);
41
void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx);
42
43
// Ledger Access
44
void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
45
void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
46
void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx);
47
48
// Builder Pattern APIs
49
OpenBuilder newOpenLedgerOp();
50
DeleteBuilder newDeleteLedgerOp();
51
52
// State Access
53
Set<Long> getLedgers();
54
Map<Long, PulsarMockLedgerHandle> getLedgerMap();
55
56
// Testing and Failure Injection
57
void delay(long millis);
58
void failNow(int rc);
59
void failAfter(int steps, int rc);
60
void addEntryFailAfter(int steps, int rc);
61
synchronized void returnEmptyLedgerAfter(int steps);
62
synchronized void addEntryDelay(long delay, TimeUnit unit);
63
synchronized void addEntryResponseDelay(long delay, TimeUnit unit);
64
65
// Interceptor Management
66
void setReadHandleInterceptor(PulsarMockReadHandleInterceptor readHandleInterceptor);
67
PulsarMockReadHandleInterceptor getReadHandleInterceptor();
68
69
// Resource Management
70
void close() throws InterruptedException, BKException;
71
void shutdown();
72
}
73
```
74
75
### PulsarMockLedgerHandle
76
77
Mock implementation of LedgerHandle providing in-memory ledger operations with full ReadHandle interface support.
78
79
```java { .api }
80
class PulsarMockLedgerHandle extends LedgerHandle {
81
// Constructor
82
PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd);
83
84
// State Access
85
long getId();
86
long getLastAddConfirmed();
87
long getLength();
88
boolean isClosed();
89
boolean isFenced();
90
91
// Entry Operations - Synchronous
92
long addEntry(byte[] data) throws InterruptedException, BKException;
93
94
// Entry Operations - Asynchronous
95
void asyncAddEntry(byte[] data, AddCallback cb, Object ctx);
96
void asyncAddEntry(byte[] data, int offset, int length, AddCallback cb, Object ctx);
97
void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx);
98
99
// Read Operations - Asynchronous (LedgerHandle interface)
100
void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx);
101
102
// Read Operations - CompletableFuture (ReadHandle interface)
103
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
104
CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
105
CompletableFuture<Long> readLastAddConfirmedAsync();
106
CompletableFuture<Long> tryReadLastAddConfirmedAsync();
107
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
108
109
// Lifecycle
110
void asyncClose(CloseCallback cb, Object ctx);
111
}
112
```
113
114
### PulsarMockReadHandle
115
116
Separate read-only handle implementation for reading existing ledgers.
117
118
```java { .api }
119
class PulsarMockReadHandle implements ReadHandle {
120
// Constructor
121
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, long lastAddConfirmed);
122
123
// ReadHandle Interface
124
long getId();
125
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
126
CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
127
CompletableFuture<Long> readLastAddConfirmedAsync();
128
CompletableFuture<Long> tryReadLastAddConfirmedAsync();
129
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
130
boolean isClosed();
131
CompletableFuture<Void> closeAsync();
132
}
133
```
134
135
## Types
136
137
### BookKeeper Callback Interfaces
138
139
```java { .api }
140
interface CreateCallback {
141
void createComplete(int rc, LedgerHandle lh, Object ctx);
142
}
143
144
interface OpenCallback {
145
void openComplete(int rc, LedgerHandle lh, Object ctx);
146
}
147
148
interface DeleteCallback {
149
void deleteComplete(int rc, Object ctx);
150
}
151
152
interface AddCallback {
153
void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
154
}
155
156
interface ReadCallback {
157
void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
158
}
159
160
interface CloseCallback {
161
void closeComplete(int rc, LedgerHandle lh, Object ctx);
162
}
163
```
164
165
### BookKeeper Enums
166
167
```java { .api }
168
enum DigestType {
169
MAC, CRC32, CRC32C, DUMMY
170
}
171
```
172
173
### BookKeeper Builder Interfaces
174
175
```java { .api }
176
interface OpenBuilder {
177
OpenBuilder withPassword(byte[] password);
178
OpenBuilder withDigestType(DigestType digestType);
179
OpenBuilder withRecovery(boolean recovery);
180
CompletableFuture<ReadHandle> execute();
181
}
182
183
interface DeleteBuilder {
184
CompletableFuture<Void> execute();
185
}
186
```
187
188
### BookKeeper Exception Types
189
190
```java { .api }
191
class BKException extends Exception {
192
enum Code {
193
OK, ReadException, QuorumException, NoBookieAvailableException,
194
DigestNotInitializedException, DigestMatchException, NotEnoughBookiesException,
195
NoSuchLedgerExistsException, BookieHandleNotAvailableException,
196
ZKException, MetaStoreException, ClientClosedException,
197
LedgerRecoveryException, LedgerClosedException, WriteException,
198
NoSuchEntryException, IncorrectParameterException, InterruptedException,
199
ProtocolVersionException, MetadataVersionException, SecurityException,
200
IllegalOpException, InvalidCookieException, UnauthorizedAccessException,
201
UnavailableException, ReplicationException, LedgerFencedException,
202
TimeoutException, LedgerExistException
203
}
204
}
205
```
206
207
## Usage Examples
208
209
### Basic Ledger Operations
210
211
```java
212
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
213
import org.apache.bookkeeper.client.LedgerHandle;
214
import org.apache.bookkeeper.client.api.DigestType;
215
import java.util.concurrent.Executors;
216
217
// Create mock BookKeeper
218
OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(4).name("test").build();
219
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
220
221
// Create ledger
222
LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());
223
224
// Add entries
225
long entryId1 = ledger.addEntry("entry1".getBytes());
226
long entryId2 = ledger.addEntry("entry2".getBytes());
227
228
// Read entries
229
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
230
231
// Close ledger
232
ledger.close();
233
mockBk.close();
234
```
235
236
### Asynchronous Operations
237
238
```java
239
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
240
241
// Async create ledger
242
mockBk.asyncCreateLedger(3, 2, 2, DigestType.CRC32, "password".getBytes(),
243
(rc, lh, ctx) -> {
244
if (rc == BKException.Code.OK) {
245
// Async add entry
246
lh.asyncAddEntry("data".getBytes(), (rc2, lh2, entryId, ctx2) -> {
247
if (rc2 == BKException.Code.OK) {
248
System.out.println("Added entry: " + entryId);
249
}
250
}, null);
251
}
252
}, null);
253
254
// Async open existing ledger
255
mockBk.asyncOpenLedger(ledgerId, DigestType.CRC32, "password".getBytes(),
256
(rc, lh, ctx) -> {
257
if (rc == BKException.Code.OK) {
258
// Async read entries
259
lh.asyncReadEntries(0, lh.getLastAddConfirmed(), (rc2, lh2, seq, ctx2) -> {
260
while (seq.hasMoreElements()) {
261
LedgerEntry entry = seq.nextElement();
262
System.out.println("Entry: " + new String(entry.getEntry()));
263
}
264
}, null);
265
}
266
}, null);
267
```
268
269
### CompletableFuture API
270
271
```java
272
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
273
274
// Using builder pattern with CompletableFuture
275
CompletableFuture<ReadHandle> future = mockBk.newOpenLedgerOp()
276
.withLedgerId(ledgerId)
277
.withPassword("password".getBytes())
278
.withDigestType(DigestType.CRC32)
279
.execute();
280
281
future.thenCompose(readHandle -> {
282
// Read entries asynchronously
283
return readHandle.readAsync(0, -1);
284
}).thenAccept(entries -> {
285
for (LedgerEntry entry : entries) {
286
System.out.println("Entry: " + new String(entry.getEntry()));
287
}
288
}).exceptionally(throwable -> {
289
System.err.println("Error: " + throwable.getMessage());
290
return null;
291
});
292
```
293
294
### Failure Injection Testing
295
296
```java
297
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
298
299
// Inject failure after 3 operations
300
mockBk.failAfter(3, BKException.Code.WriteException.getValue());
301
302
// Inject delays
303
mockBk.addEntryDelay(100, TimeUnit.MILLISECONDS);
304
mockBk.addEntryResponseDelay(50, TimeUnit.MILLISECONDS);
305
306
// Operations will fail/delay as configured
307
try {
308
LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());
309
ledger.addEntry("entry1".getBytes()); // Works
310
ledger.addEntry("entry2".getBytes()); // Works
311
ledger.addEntry("entry3".getBytes()); // Fails with WriteException
312
} catch (BKException e) {
313
System.out.println("Expected failure: " + e.getCode());
314
}
315
```
316
317
### Read Interception
318
319
```java
320
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
321
322
// Set up read interceptor
323
mockBk.setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> {
324
// Modify read results for testing
325
System.out.println("Intercepting read for ledger " + ledgerId);
326
return CompletableFuture.completedFuture(entries);
327
});
328
329
// Reads will be intercepted
330
ReadHandle readHandle = mockBk.newOpenLedgerOp()
331
.withLedgerId(ledgerId)
332
.withPassword("password".getBytes())
333
.execute().get();
334
335
// This read will trigger the interceptor
336
LedgerEntries entries = readHandle.readAsync(0, -1).get();
337
```