Mock implementations and test utilities for Apache Pulsar components including ZooKeeper and BookKeeper mocking functionality
—
In-memory BookKeeper client implementation providing full distributed ledger functionality for testing without requiring a BookKeeper cluster. Supports all ledger operations with configurable behavior injection and failure simulation.
Interface for intercepting and modifying read operations on mock ledger handles, enabling advanced testing scenarios.
interface PulsarMockReadHandleInterceptor {
CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, LedgerEntries entries);
}Main mock BookKeeper client that maintains all ledger data in memory while providing the complete BookKeeper API.
class PulsarMockBookKeeper extends BookKeeper {
// Constructor
PulsarMockBookKeeper(OrderedExecutor orderedExecutor);
// Configuration
ClientConfiguration getConf();
OrderedExecutor getMainWorkerPool();
MetadataClientDriver getMetadataClientDriver();
// Static Methods
static Collection<BookieId> getMockEnsemble();
// Ledger Creation - Synchronous
LedgerHandle createLedger(DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
void deleteLedger(long lId) throws InterruptedException, BKException;
// Ledger Creation - Asynchronous
void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx, Map<String, byte[]> properties);
void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx);
// Ledger Access
void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx);
// Builder Pattern APIs
OpenBuilder newOpenLedgerOp();
DeleteBuilder newDeleteLedgerOp();
// State Access
Set<Long> getLedgers();
Map<Long, PulsarMockLedgerHandle> getLedgerMap();
// Testing and Failure Injection
void delay(long millis);
void failNow(int rc);
void failAfter(int steps, int rc);
void addEntryFailAfter(int steps, int rc);
synchronized void returnEmptyLedgerAfter(int steps);
synchronized void addEntryDelay(long delay, TimeUnit unit);
synchronized void addEntryResponseDelay(long delay, TimeUnit unit);
// Interceptor Management
void setReadHandleInterceptor(PulsarMockReadHandleInterceptor readHandleInterceptor);
PulsarMockReadHandleInterceptor getReadHandleInterceptor();
// Resource Management
void close() throws InterruptedException, BKException;
void shutdown();
}Mock implementation of LedgerHandle providing in-memory ledger operations with full ReadHandle interface support.
class PulsarMockLedgerHandle extends LedgerHandle {
// Constructor
PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd);
// State Access
long getId();
long getLastAddConfirmed();
long getLength();
boolean isClosed();
boolean isFenced();
// Entry Operations - Synchronous
long addEntry(byte[] data) throws InterruptedException, BKException;
// Entry Operations - Asynchronous
void asyncAddEntry(byte[] data, AddCallback cb, Object ctx);
void asyncAddEntry(byte[] data, int offset, int length, AddCallback cb, Object ctx);
void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx);
// Read Operations - Asynchronous (LedgerHandle interface)
void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx);
// Read Operations - CompletableFuture (ReadHandle interface)
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
CompletableFuture<Long> readLastAddConfirmedAsync();
CompletableFuture<Long> tryReadLastAddConfirmedAsync();
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
// Lifecycle
void asyncClose(CloseCallback cb, Object ctx);
}Separate read-only handle implementation for reading existing ledgers.
class PulsarMockReadHandle implements ReadHandle {
// Constructor
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, long lastAddConfirmed);
// ReadHandle Interface
long getId();
CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
CompletableFuture<Long> readLastAddConfirmedAsync();
CompletableFuture<Long> tryReadLastAddConfirmedAsync();
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
boolean isClosed();
CompletableFuture<Void> closeAsync();
}interface CreateCallback {
void createComplete(int rc, LedgerHandle lh, Object ctx);
}
interface OpenCallback {
void openComplete(int rc, LedgerHandle lh, Object ctx);
}
interface DeleteCallback {
void deleteComplete(int rc, Object ctx);
}
interface AddCallback {
void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
}
interface ReadCallback {
void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
}
interface CloseCallback {
void closeComplete(int rc, LedgerHandle lh, Object ctx);
}enum DigestType {
MAC, CRC32, CRC32C, DUMMY
}interface OpenBuilder {
OpenBuilder withPassword(byte[] password);
OpenBuilder withDigestType(DigestType digestType);
OpenBuilder withRecovery(boolean recovery);
CompletableFuture<ReadHandle> execute();
}
interface DeleteBuilder {
CompletableFuture<Void> execute();
}class BKException extends Exception {
enum Code {
OK, ReadException, QuorumException, NoBookieAvailableException,
DigestNotInitializedException, DigestMatchException, NotEnoughBookiesException,
NoSuchLedgerExistsException, BookieHandleNotAvailableException,
ZKException, MetaStoreException, ClientClosedException,
LedgerRecoveryException, LedgerClosedException, WriteException,
NoSuchEntryException, IncorrectParameterException, InterruptedException,
ProtocolVersionException, MetadataVersionException, SecurityException,
IllegalOpException, InvalidCookieException, UnauthorizedAccessException,
UnavailableException, ReplicationException, LedgerFencedException,
TimeoutException, LedgerExistException
}
}import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import java.util.concurrent.Executors;
// Create mock BookKeeper
OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(4).name("test").build();
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
// Create ledger
LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());
// Add entries
long entryId1 = ledger.addEntry("entry1".getBytes());
long entryId2 = ledger.addEntry("entry2".getBytes());
// Read entries
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());
// Close ledger
ledger.close();
mockBk.close();PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
// Async create ledger
mockBk.asyncCreateLedger(3, 2, 2, DigestType.CRC32, "password".getBytes(),
(rc, lh, ctx) -> {
if (rc == BKException.Code.OK) {
// Async add entry
lh.asyncAddEntry("data".getBytes(), (rc2, lh2, entryId, ctx2) -> {
if (rc2 == BKException.Code.OK) {
System.out.println("Added entry: " + entryId);
}
}, null);
}
}, null);
// Async open existing ledger
mockBk.asyncOpenLedger(ledgerId, DigestType.CRC32, "password".getBytes(),
(rc, lh, ctx) -> {
if (rc == BKException.Code.OK) {
// Async read entries
lh.asyncReadEntries(0, lh.getLastAddConfirmed(), (rc2, lh2, seq, ctx2) -> {
while (seq.hasMoreElements()) {
LedgerEntry entry = seq.nextElement();
System.out.println("Entry: " + new String(entry.getEntry()));
}
}, null);
}
}, null);PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
// Using builder pattern with CompletableFuture
CompletableFuture<ReadHandle> future = mockBk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withPassword("password".getBytes())
.withDigestType(DigestType.CRC32)
.execute();
future.thenCompose(readHandle -> {
// Read entries asynchronously
return readHandle.readAsync(0, -1);
}).thenAccept(entries -> {
for (LedgerEntry entry : entries) {
System.out.println("Entry: " + new String(entry.getEntry()));
}
}).exceptionally(throwable -> {
System.err.println("Error: " + throwable.getMessage());
return null;
});PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
// Inject failure after 3 operations
mockBk.failAfter(3, BKException.Code.WriteException.getValue());
// Inject delays
mockBk.addEntryDelay(100, TimeUnit.MILLISECONDS);
mockBk.addEntryResponseDelay(50, TimeUnit.MILLISECONDS);
// Operations will fail/delay as configured
try {
LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());
ledger.addEntry("entry1".getBytes()); // Works
ledger.addEntry("entry2".getBytes()); // Works
ledger.addEntry("entry3".getBytes()); // Fails with WriteException
} catch (BKException e) {
System.out.println("Expected failure: " + e.getCode());
}PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);
// Set up read interceptor
mockBk.setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> {
// Modify read results for testing
System.out.println("Intercepting read for ledger " + ledgerId);
return CompletableFuture.completedFuture(entries);
});
// Reads will be intercepted
ReadHandle readHandle = mockBk.newOpenLedgerOp()
.withLedgerId(ledgerId)
.withPassword("password".getBytes())
.execute().get();
// This read will trigger the interceptor
LedgerEntries entries = readHandle.readAsync(0, -1).get();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--testmocks