CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-pulsar--testmocks

Mock implementations and test utilities for Apache Pulsar components including ZooKeeper and BookKeeper mocking functionality

Pending
Overview
Eval results
Files

bookkeeper-client-mocking.mddocs/

BookKeeper Client Mocking

In-memory BookKeeper client implementation providing full distributed ledger functionality for testing without requiring a BookKeeper cluster. Supports all ledger operations with configurable behavior injection and failure simulation.

Capabilities

PulsarMockReadHandleInterceptor

Interface for intercepting and modifying read operations on mock ledger handles, enabling advanced testing scenarios.

interface PulsarMockReadHandleInterceptor {
    CompletableFuture<LedgerEntries> interceptReadAsync(long ledgerId, long firstEntry, long lastEntry, LedgerEntries entries);
}

PulsarMockBookKeeper

Main mock BookKeeper client that maintains all ledger data in memory while providing the complete BookKeeper API.

class PulsarMockBookKeeper extends BookKeeper {
    // Constructor
    PulsarMockBookKeeper(OrderedExecutor orderedExecutor);
    
    // Configuration
    ClientConfiguration getConf();
    OrderedExecutor getMainWorkerPool();
    MetadataClientDriver getMetadataClientDriver();
    
    // Static Methods
    static Collection<BookieId> getMockEnsemble();
    
    // Ledger Creation - Synchronous
    LedgerHandle createLedger(DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
    LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
    LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws BKException, InterruptedException;
    void deleteLedger(long lId) throws InterruptedException, BKException;
    
    // Ledger Creation - Asynchronous
    void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx, Map<String, byte[]> properties);
    void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb, Object ctx);
    
    // Ledger Access
    void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
    void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx);
    void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx);
    
    // Builder Pattern APIs
    OpenBuilder newOpenLedgerOp();
    DeleteBuilder newDeleteLedgerOp();
    
    // State Access
    Set<Long> getLedgers();
    Map<Long, PulsarMockLedgerHandle> getLedgerMap();
    
    // Testing and Failure Injection
    void delay(long millis);
    void failNow(int rc);
    void failAfter(int steps, int rc);
    void addEntryFailAfter(int steps, int rc);
    synchronized void returnEmptyLedgerAfter(int steps);
    synchronized void addEntryDelay(long delay, TimeUnit unit);
    synchronized void addEntryResponseDelay(long delay, TimeUnit unit);
    
    // Interceptor Management
    void setReadHandleInterceptor(PulsarMockReadHandleInterceptor readHandleInterceptor);
    PulsarMockReadHandleInterceptor getReadHandleInterceptor();
    
    // Resource Management
    void close() throws InterruptedException, BKException;
    void shutdown();
}

PulsarMockLedgerHandle

Mock implementation of LedgerHandle providing in-memory ledger operations with full ReadHandle interface support.

class PulsarMockLedgerHandle extends LedgerHandle {
    // Constructor
    PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd);
    
    // State Access
    long getId();
    long getLastAddConfirmed();
    long getLength();
    boolean isClosed();
    boolean isFenced();
    
    // Entry Operations - Synchronous
    long addEntry(byte[] data) throws InterruptedException, BKException;
    
    // Entry Operations - Asynchronous
    void asyncAddEntry(byte[] data, AddCallback cb, Object ctx);
    void asyncAddEntry(byte[] data, int offset, int length, AddCallback cb, Object ctx);
    void asyncAddEntry(ByteBuf data, AddCallback cb, Object ctx);
    
    // Read Operations - Asynchronous (LedgerHandle interface)
    void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx);
    
    // Read Operations - CompletableFuture (ReadHandle interface)
    CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
    CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
    CompletableFuture<Long> readLastAddConfirmedAsync();
    CompletableFuture<Long> tryReadLastAddConfirmedAsync();
    CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
    
    // Lifecycle
    void asyncClose(CloseCallback cb, Object ctx);
}

PulsarMockReadHandle

Separate read-only handle implementation for reading existing ledgers.

class PulsarMockReadHandle implements ReadHandle {
    // Constructor
    PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, long lastAddConfirmed);
    
    // ReadHandle Interface
    long getId();
    CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry);
    CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry);
    CompletableFuture<Long> readLastAddConfirmedAsync();
    CompletableFuture<Long> tryReadLastAddConfirmedAsync();
    CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, long timeOutInMillis, boolean parallel);
    boolean isClosed();
    CompletableFuture<Void> closeAsync();
}

Types

BookKeeper Callback Interfaces

interface CreateCallback {
    void createComplete(int rc, LedgerHandle lh, Object ctx);
}

interface OpenCallback {
    void openComplete(int rc, LedgerHandle lh, Object ctx);
}

interface DeleteCallback {
    void deleteComplete(int rc, Object ctx);
}

interface AddCallback {
    void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx);
}

interface ReadCallback {
    void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx);
}

interface CloseCallback {
    void closeComplete(int rc, LedgerHandle lh, Object ctx);
}

BookKeeper Enums

enum DigestType {
    MAC, CRC32, CRC32C, DUMMY
}

BookKeeper Builder Interfaces

interface OpenBuilder {
    OpenBuilder withPassword(byte[] password);
    OpenBuilder withDigestType(DigestType digestType);
    OpenBuilder withRecovery(boolean recovery);
    CompletableFuture<ReadHandle> execute();
}

interface DeleteBuilder {
    CompletableFuture<Void> execute();
}

BookKeeper Exception Types

class BKException extends Exception {
    enum Code {
        OK, ReadException, QuorumException, NoBookieAvailableException,
        DigestNotInitializedException, DigestMatchException, NotEnoughBookiesException,
        NoSuchLedgerExistsException, BookieHandleNotAvailableException,
        ZKException, MetaStoreException, ClientClosedException,
        LedgerRecoveryException, LedgerClosedException, WriteException,
        NoSuchEntryException, IncorrectParameterException, InterruptedException,
        ProtocolVersionException, MetadataVersionException, SecurityException,
        IllegalOpException, InvalidCookieException, UnauthorizedAccessException,
        UnavailableException, ReplicationException, LedgerFencedException,
        TimeoutException, LedgerExistException
    }
}

Usage Examples

Basic Ledger Operations

import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import java.util.concurrent.Executors;

// Create mock BookKeeper
OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(4).name("test").build();
PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

// Create ledger
LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());

// Add entries
long entryId1 = ledger.addEntry("entry1".getBytes());
long entryId2 = ledger.addEntry("entry2".getBytes());

// Read entries
Enumeration<LedgerEntry> entries = ledger.readEntries(0, ledger.getLastAddConfirmed());

// Close ledger
ledger.close();
mockBk.close();

Asynchronous Operations

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

// Async create ledger
mockBk.asyncCreateLedger(3, 2, 2, DigestType.CRC32, "password".getBytes(),
    (rc, lh, ctx) -> {
        if (rc == BKException.Code.OK) {
            // Async add entry
            lh.asyncAddEntry("data".getBytes(), (rc2, lh2, entryId, ctx2) -> {
                if (rc2 == BKException.Code.OK) {
                    System.out.println("Added entry: " + entryId);
                }
            }, null);
        }
    }, null);

// Async open existing ledger
mockBk.asyncOpenLedger(ledgerId, DigestType.CRC32, "password".getBytes(),
    (rc, lh, ctx) -> {
        if (rc == BKException.Code.OK) {
            // Async read entries
            lh.asyncReadEntries(0, lh.getLastAddConfirmed(), (rc2, lh2, seq, ctx2) -> {
                while (seq.hasMoreElements()) {
                    LedgerEntry entry = seq.nextElement();
                    System.out.println("Entry: " + new String(entry.getEntry()));
                }
            }, null);
        }
    }, null);

CompletableFuture API

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

// Using builder pattern with CompletableFuture
CompletableFuture<ReadHandle> future = mockBk.newOpenLedgerOp()
    .withLedgerId(ledgerId)
    .withPassword("password".getBytes())
    .withDigestType(DigestType.CRC32)
    .execute();

future.thenCompose(readHandle -> {
    // Read entries asynchronously
    return readHandle.readAsync(0, -1);
}).thenAccept(entries -> {
    for (LedgerEntry entry : entries) {
        System.out.println("Entry: " + new String(entry.getEntry()));
    }
}).exceptionally(throwable -> {
    System.err.println("Error: " + throwable.getMessage());
    return null;
});

Failure Injection Testing

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

// Inject failure after 3 operations
mockBk.failAfter(3, BKException.Code.WriteException.getValue());

// Inject delays
mockBk.addEntryDelay(100, TimeUnit.MILLISECONDS);
mockBk.addEntryResponseDelay(50, TimeUnit.MILLISECONDS);

// Operations will fail/delay as configured
try {
    LedgerHandle ledger = mockBk.createLedger(DigestType.CRC32, "password".getBytes());
    ledger.addEntry("entry1".getBytes()); // Works
    ledger.addEntry("entry2".getBytes()); // Works
    ledger.addEntry("entry3".getBytes()); // Fails with WriteException
} catch (BKException e) {
    System.out.println("Expected failure: " + e.getCode());
}

Read Interception

PulsarMockBookKeeper mockBk = new PulsarMockBookKeeper(executor);

// Set up read interceptor
mockBk.setReadHandleInterceptor((ledgerId, firstEntry, lastEntry, entries) -> {
    // Modify read results for testing
    System.out.println("Intercepting read for ledger " + ledgerId);
    return CompletableFuture.completedFuture(entries);
});

// Reads will be intercepted
ReadHandle readHandle = mockBk.newOpenLedgerOp()
    .withLedgerId(ledgerId)
    .withPassword("password".getBytes())
    .execute().get();

// This read will trigger the interceptor
LedgerEntries entries = readHandle.readAsync(0, -1).get();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-pulsar--testmocks

docs

bookkeeper-client-mocking.md

bookkeeper-server-testing.md

bookkeeper-testing-utilities.md

index.md

zookeeper-mocking.md

tile.json