Mock implementations and test utilities for Apache Pulsar components including ZooKeeper and BookKeeper mocking functionality
—
Enhanced testing components providing specialized test clients, in-memory statistics providers, and server testing infrastructure for comprehensive BookKeeper testing scenarios beyond basic mocking.
Enhanced BookKeeper client with access to internal components and specialized testing methods for integration testing scenarios.
class BookKeeperTestClient extends BookKeeper {
// Constructors
BookKeeperTestClient(ClientConfiguration conf) throws InterruptedException, BKException;
BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider) throws InterruptedException, BKException;
BookKeeperTestClient(ClientConfiguration conf, ZooKeeper zkc) throws InterruptedException, BKException;
// Internal Component Access
ZooKeeper getZkHandle();
ClientConfiguration getConf();
BookieClient getBookieClient();
TestStatsProvider getTestStatsProvider();
// Testing Operations
void waitForReadOnlyBookie(BookieId id) throws Exception;
void waitForWritableBookie(BookieId id) throws Exception;
void readBookiesBlocking() throws BKException;
}In-memory statistics provider for testing with comprehensive metrics collection and analysis capabilities.
class TestStatsProvider implements StatsProvider {
// StatsProvider Interface
void start(Configuration conf);
void stop();
TestStatsLogger getStatsLogger(String scope);
String getStatsName(String... statsComponents);
// Access Methods
TestOpStatsLogger getOpStatsLogger(String path);
TestCounter getCounter(String path);
Gauge<? extends Number> getGauge(String path);
void forEachOpStatLogger(BiConsumer<String, TestOpStatsLogger> f);
void clear();
}In-memory counter for tracking numeric metrics during testing.
class TestCounter implements Counter {
// Counter Operations
void clear();
void inc();
void dec();
void addCount(long delta);
void addLatency(long eventLatency, TimeUnit unit);
// Value Access
Long get();
Long getMax();
}In-memory operation statistics logger for tracking operation latencies and success/failure rates.
class TestOpStatsLogger implements OpStatsLogger {
// Event Registration
void registerFailedEvent(long eventLatency, TimeUnit unit);
void registerSuccessfulEvent(long eventLatency, TimeUnit unit);
synchronized void registerSuccessfulValue(long value);
synchronized void registerFailedValue(long value);
// Statistics Access
OpStatsData toOpStatsData();
synchronized void clear();
synchronized double getSuccessAverage();
synchronized long getSuccessCount();
}In-memory hierarchical statistics logger for organizing metrics by scope.
class TestStatsLogger implements StatsLogger {
// Metric Creation
OpStatsLogger getOpStatsLogger(String name);
Counter getCounter(String name);
Gauge<? extends Number> getGauge(String name);
// Gauge Management
<T extends Number> void registerGauge(String name, Gauge<T> gauge);
<T extends Number> void unregisterGauge(String name, Gauge<T> gauge);
// Scope Management
StatsLogger scope(String name);
void removeScope(String name, StatsLogger statsLogger);
// Thread-Scoped Metrics
OpStatsLogger getThreadScopedOpStatsLogger(String name);
Counter getThreadScopedCounter(String name);
}interface StatsProvider {
void start(Configuration conf);
void stop();
StatsLogger getStatsLogger(String scope);
String getStatsName(String... statsComponents);
}
interface StatsLogger {
OpStatsLogger getOpStatsLogger(String name);
Counter getCounter(String name);
<T extends Number> Gauge<T> getGauge(String name);
<T extends Number> void registerGauge(String name, Gauge<T> gauge);
<T extends Number> void unregisterGauge(String name, Gauge<T> gauge);
StatsLogger scope(String name);
void removeScope(String name, StatsLogger statsLogger);
OpStatsLogger getThreadScopedOpStatsLogger(String name);
Counter getThreadScopedCounter(String name);
}
interface OpStatsLogger {
void registerFailedEvent(long eventLatency, TimeUnit unit);
void registerSuccessfulEvent(long eventLatency, TimeUnit unit);
void registerSuccessfulValue(long value);
void registerFailedValue(long value);
OpStatsData toOpStatsData();
void clear();
}
interface Counter {
void clear();
void inc();
void dec();
void addCount(long delta);
void addLatency(long eventLatency, TimeUnit unit);
Long get();
}
interface Gauge<T extends Number> {
T getDefaultValue();
T getSample();
}class ClientConfiguration extends AbstractConfiguration {
// BookKeeper client configuration
// (Part of BookKeeper API)
}
class BookieClient {
// Internal BookKeeper bookie client
// (Part of BookKeeper internal API)
}
class BookieId {
// Bookie identifier
// (Part of BookKeeper API)
}import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.TestStatsProvider;
import org.apache.bookkeeper.conf.ClientConfiguration;
// Create client configuration
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri("zk+null://localhost/ledgers");
// Create test client with custom statistics provider
TestStatsProvider statsProvider = new TestStatsProvider();
BookKeeperTestClient testClient = new BookKeeperTestClient(conf, statsProvider);
// Access internal components for testing
ZooKeeper zkHandle = testClient.getZkHandle();
BookieClient bookieClient = testClient.getBookieClient();
// Perform operations and collect statistics
LedgerHandle ledger = testClient.createLedger(DigestType.CRC32, "password".getBytes());
ledger.addEntry("test data".getBytes());
// Analyze collected statistics
TestStatsProvider.TestOpStatsLogger createStats = statsProvider.getOpStatsLogger("ledger.create");
System.out.println("Create operations: " + createStats.getSuccessCount());
testClient.close();TestStatsProvider statsProvider = new TestStatsProvider();
// Create loggers for different scopes
TestStatsProvider.TestStatsLogger rootLogger = statsProvider.getStatsLogger("bookkeeper");
TestStatsProvider.TestStatsLogger clientLogger = rootLogger.scope("client");
// Create metrics
TestStatsProvider.TestOpStatsLogger readOps = clientLogger.getOpStatsLogger("read");
TestStatsProvider.TestCounter totalRequests = clientLogger.getCounter("requests.total");
// Simulate operations
totalRequests.inc();
readOps.registerSuccessfulEvent(50, TimeUnit.MILLISECONDS);
totalRequests.inc();
readOps.registerSuccessfulEvent(75, TimeUnit.MILLISECONDS);
totalRequests.inc();
readOps.registerFailedEvent(100, TimeUnit.MILLISECONDS);
// Analyze results
System.out.println("Total requests: " + totalRequests.get());
System.out.println("Read success count: " + readOps.getSuccessCount());
System.out.println("Average read latency: " + readOps.getSuccessAverage() + "ms");
// Clear metrics for next test
statsProvider.clear();BookKeeperTestClient testClient = new BookKeeperTestClient(conf);
// Test bookie availability
BookieId bookieId = new BookieId("127.0.0.1", 3181);
// Wait for bookie to become read-only
Future<?> readOnlyFuture = testClient.waitForReadOnlyBookie(bookieId);
readOnlyFuture.get(30, TimeUnit.SECONDS);
// Wait for bookie to become writable again
Future<?> writableFuture = testClient.waitForWritableBookie(bookieId);
writableFuture.get(30, TimeUnit.SECONDS);
// Force reading bookie information
testClient.readBookiesBlocking();
testClient.close();TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger logger = statsProvider.getStatsLogger("test");
// Register custom gauge
AtomicLong queueSize = new AtomicLong(0);
Gauge<Long> queueGauge = new Gauge<Long>() {
public Long getDefaultValue() { return 0L; }
public Long getSample() { return queueSize.get(); }
};
logger.registerGauge("queue.size", queueGauge);
// Simulate queue operations
queueSize.set(10);
Gauge<? extends Number> retrievedGauge = logger.getGauge("queue.size");
System.out.println("Queue size: " + retrievedGauge.getSample());
// Clean up
logger.unregisterGauge("queue.size", queueGauge);public class MultiClientTest {
private List<BookKeeperTestClient> clients = new ArrayList<>();
private TestStatsProvider globalStats = new TestStatsProvider();
@Before
public void setUp() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri("zk+null://localhost/ledgers");
// Create multiple test clients
for (int i = 0; i < 3; i++) {
BookKeeperTestClient client = new BookKeeperTestClient(conf, globalStats);
clients.add(client);
}
}
@Test
public void testConcurrentOperations() throws Exception {
// Perform concurrent operations with different clients
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < clients.size(); i++) {
final int clientIndex = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
BookKeeperTestClient client = clients.get(clientIndex);
LedgerHandle ledger = client.createLedger(DigestType.CRC32, "password".getBytes());
for (int j = 0; j < 10; j++) {
ledger.addEntry(("data-" + clientIndex + "-" + j).getBytes());
}
ledger.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
futures.add(future);
}
// Wait for all operations to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
// Analyze global statistics
globalStats.forEachOpStatLogger((path, opStats) -> {
System.out.println(path + ": " + opStats.getSuccessCount() + " operations");
});
}
@After
public void tearDown() throws Exception {
for (BookKeeperTestClient client : clients) {
client.close();
}
}
}TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsProvider.TestStatsLogger logger = statsProvider.getStatsLogger("thread-test");
// Create thread-scoped metrics
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Future<?> future = executor.submit(() -> {
// Each thread gets its own scoped metrics
TestStatsProvider.TestOpStatsLogger threadOpStats = logger.getThreadScopedOpStatsLogger("operation");
TestStatsProvider.TestCounter threadCounter = (TestStatsProvider.TestCounter) logger.getThreadScopedCounter("requests");
// Perform operations
for (int j = 0; j < 5; j++) {
threadCounter.inc();
threadOpStats.registerSuccessfulEvent(j * 10, TimeUnit.MILLISECONDS);
}
});
futures.add(future);
}
// Wait for completion
for (Future<?> future : futures) {
future.get();
}
executor.shutdown();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--testmocks