Mock implementations and test utilities for Apache Pulsar components including ZooKeeper and BookKeeper mocking functionality
—
Complete in-memory ZooKeeper implementation for testing coordination service interactions without requiring an external ZooKeeper cluster. Provides full API compatibility with configurable failure injection and session management.
Enumeration of ZooKeeper operations for testing and failure injection.
enum Op {
CREATE, GET, SET, GET_CHILDREN, DELETE, EXISTS, SYNC
}Main mock ZooKeeper implementation that extends the standard ZooKeeper client with in-memory data storage and testing controls.
class MockZooKeeper extends ZooKeeper {
// Static Factory Methods
static MockZooKeeper newInstance();
static MockZooKeeper newInstance(int readOpDelayMs);
// Configuration Methods
int getSessionTimeout();
void setSessionTimeout(int sessionTimeout);
void setSessionId(long id);
void overrideSessionId(long sessionId);
void removeSessionIdOverride();
// State and Session Management
States getState();
void register(Watcher watcher);
long getSessionId();
// Node Operations - Synchronous
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException;
byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException;
List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException;
List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;
Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException;
Stat exists(String path, boolean watch) throws KeeperException, InterruptedException;
Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException;
void delete(String path, int version) throws KeeperException, InterruptedException;
List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException;
// Node Operations - Asynchronous
void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx);
void getData(String path, Watcher watcher, DataCallback cb, Object ctx);
void getData(String path, boolean watch, DataCallback cb, Object ctx);
void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx);
void getChildren(String path, boolean watcher, Children2Callback cb, Object ctx);
void exists(String path, boolean watch, StatCallback cb, Object ctx);
void exists(String path, Watcher watcher, StatCallback cb, Object ctx);
void sync(String path, VoidCallback cb, Object ctx);
void setData(String path, byte[] data, int version, StatCallback cb, Object ctx);
void delete(String path, int version, VoidCallback cb, Object ctx);
void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx);
// Watch Management
void addWatch(String basePath, Watcher watcher, AddWatchMode mode);
void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx);
// Testing and Failure Injection
void failConditional(KeeperException.Code rc, BiPredicate<Op, String> predicate);
void delay(long millis, BiPredicate<Op, String> predicate);
void setAlwaysFail(KeeperException.Code rc);
void unsetAlwaysFail();
void deleteEphemeralNodes(long sessionId);
void deleteWatchers(long sessionId);
// Resource Management
synchronized void increaseRefCount();
synchronized MockZooKeeper registerCloseable(AutoCloseable closeable);
synchronized void close() throws InterruptedException;
void shutdown() throws InterruptedException;
}Session-based wrapper that provides isolated ZooKeeper instances for multi-client testing scenarios with automatic session management.
class MockZooKeeperSession extends ZooKeeper {
// Static Factory Methods
static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper);
static MockZooKeeperSession newInstance(MockZooKeeper mockZooKeeper, boolean closeMockZooKeeperOnClose);
// Configuration Methods
int getSessionTimeout();
void setSessionTimeout(int sessionTimeout);
void setSessionId(long id);
long getSessionId();
// State Management
States getState();
void register(Watcher watcher);
// Node Operations - Synchronous (delegates to underlying MockZooKeeper)
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException;
byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException;
List<String> getChildren(String path, Watcher watcher) throws KeeperException, InterruptedException;
List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;
Stat exists(String path, boolean watch) throws KeeperException, InterruptedException;
Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException;
Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException;
void delete(String path, int version) throws InterruptedException, KeeperException;
List<OpResult> multi(Iterable<org.apache.zookeeper.Op> ops) throws InterruptedException, KeeperException;
// Node Operations - Asynchronous (delegates to underlying MockZooKeeper)
void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsyncCallback.StringCallback cb, Object ctx);
void getData(String path, boolean watch, DataCallback cb, Object ctx);
void getData(String path, Watcher watcher, DataCallback cb, Object ctx);
void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx);
void getChildren(String path, boolean watcher, Children2Callback cb, Object ctx);
void exists(String path, boolean watch, StatCallback cb, Object ctx);
void exists(String path, Watcher watcher, StatCallback cb, Object ctx);
void sync(String path, VoidCallback cb, Object ctx);
void setData(String path, byte[] data, int version, StatCallback cb, Object ctx);
void delete(String path, int version, VoidCallback cb, Object ctx);
void multi(Iterable<org.apache.zookeeper.Op> ops, AsyncCallback.MultiCallback cb, Object ctx);
// Watch Management
void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx);
void addWatch(String basePath, Watcher watcher, AddWatchMode mode) throws KeeperException, InterruptedException;
void addWatch(String basePath, AddWatchMode mode) throws KeeperException, InterruptedException;
void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx);
// Testing Methods
void failConditional(KeeperException.Code rc, BiPredicate<MockZooKeeper.Op, String> predicate);
void setAlwaysFail(KeeperException.Code rc);
void unsetAlwaysFail();
// Resource Management
void close() throws InterruptedException;
void shutdown() throws InterruptedException;
}interface StringCallback {
void processResult(int rc, String path, Object ctx, String name);
}
interface DataCallback {
void processResult(int rc, String path, Object ctx, byte[] data, Stat stat);
}
interface ChildrenCallback {
void processResult(int rc, String path, Object ctx, List<String> children);
}
interface Children2Callback {
void processResult(int rc, String path, Object ctx, List<String> children, Stat stat);
}
interface StatCallback {
void processResult(int rc, String path, Object ctx, Stat stat);
}
interface VoidCallback {
void processResult(int rc, String path, Object ctx);
}enum CreateMode {
PERSISTENT, PERSISTENT_SEQUENTIAL, EPHEMERAL, EPHEMERAL_SEQUENTIAL,
CONTAINER, PERSISTENT_WITH_TTL, PERSISTENT_SEQUENTIAL_WITH_TTL
}
enum AddWatchMode {
PERSISTENT, PERSISTENT_RECURSIVE
}class KeeperException extends Exception {
enum Code {
OK, SYSTEMERROR, RUNTIMEINCONSISTENCY, DATAINCONSISTENCY, CONNECTIONLOSS,
MARSHALLINGERROR, UNIMPLEMENTED, OPERATIONTIMEOUT, BADARGUMENTS,
APIERROR, NONODE, NOAUTH, BADVERSION, NOCHILDRENFOREPHEMERALS,
NODEEXISTS, NOTEMPTY, SESSIONEXPIRED, INVALIDCALLBACK, INVALIDACL,
AUTHFAILED, CLOSING, NOTHING, SESSIONMOVED, NOTREADONLY, EPHEMERALONLOCALSESSION,
NOWATCHER, RECONFIGDISABLED, REQUESTTIMEOUT, QUOTAEXCEEDED, THROTTLEDOP
}
}import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
// Create mock instance
MockZooKeeper mockZk = MockZooKeeper.newInstance();
// Create persistent node
String path = mockZk.create("/config", "initial".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Read data
byte[] data = mockZk.getData("/config", null, null);
// Update data
mockZk.setData("/config", "updated".getBytes(), -1);
// List children
List<String> children = mockZk.getChildren("/", null);
// Clean up
mockZk.close();import org.apache.zookeeper.AsyncCallback;
MockZooKeeper mockZk = MockZooKeeper.newInstance();
// Async create with callback
mockZk.create("/async", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Created: " + name);
}
}, null);
// Async read with callback
mockZk.getData("/async", null, (rc, path, ctx, data, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
System.out.println("Data: " + new String(data));
}
}, null);import org.apache.zookeeper.MockZooKeeperSession;
// Create shared mock ZooKeeper instance
MockZooKeeper sharedMock = MockZooKeeper.newInstance();
// Create isolated session clients
MockZooKeeperSession client1 = MockZooKeeperSession.newInstance(sharedMock);
MockZooKeeperSession client2 = MockZooKeeperSession.newInstance(sharedMock);
// Each client has its own session but shares the same data
client1.create("/shared", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
byte[] data = client2.getData("/shared", null, null); // Can read data created by client1
// Clean up - closes session but not shared mock
client1.close();
client2.close();
sharedMock.close();MockZooKeeper mockZk = MockZooKeeper.newInstance();
// Always fail create operations
mockZk.setAlwaysFail(KeeperException.Code.CONNECTIONLOSS);
try {
mockZk.create("/test", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// Will throw CONNECTIONLOSS exception
}
// Remove failure injection
mockZk.unsetAlwaysFail();
// Conditional failure - fail only GET operations on specific paths
mockZk.failConditional(KeeperException.Code.NONODE,
(op, path) -> op == MockZooKeeper.Op.GET && path.startsWith("/test"));
// Add delays to operations
mockZk.delay(1000, (op, path) -> path.startsWith("/slow"));import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
MockZooKeeper mockZk = MockZooKeeper.newInstance();
// Set up watcher
Watcher watcher = new Watcher() {
public void process(WatchedEvent event) {
System.out.println("Event: " + event.getType() + " on " + event.getPath());
}
};
// Watch for node changes
mockZk.exists("/watched", watcher);
// Create node - triggers watcher
mockZk.create("/watched", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// Add persistent watcher
mockZk.addWatch("/persistent", watcher, AddWatchMode.PERSISTENT);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-pulsar--testmocks