A shaded JAR library that bundles Apache Curator dependencies for ZooKeeper coordination capabilities within Apache Flink's distributed architecture.
—
Distributed locking capabilities for coordinating access to shared resources across multiple processes and JVMs. Provides various locking mechanisms including mutexes, read-write locks, and semaphores with support for revocation and automatic cleanup.
Base interface for all distributed locks providing consistent acquire/release semantics.
/**
* Base interface for distributed locks that work across processes
*/
public interface InterProcessLock {
/**
* Acquire the lock, blocking until available
* @throws Exception if the lock cannot be acquired
*/
void acquire() throws Exception;
/**
* Acquire the lock within the given time period
* @param time maximum time to wait for the lock
* @param unit time unit of the time argument
* @return true if the lock was acquired, false otherwise
* @throws Exception if an error occurs during acquisition
*/
boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* Release the lock
* @throws Exception if the lock cannot be released
*/
void release() throws Exception;
}Re-entrant mutex implementation that works across JVMs. The same thread can acquire the lock multiple times.
/**
* Re-entrant mutex that works across JVMs using ZooKeeper
*/
public class InterProcessMutex implements InterProcessLock {
/**
* Create a new InterProcessMutex
* @param client the curator client
* @param path the path to use for the lock
*/
public InterProcessMutex(CuratorFramework client, String path);
/**
* Acquire the lock, blocking until available
*/
public void acquire() throws Exception;
/**
* Acquire the lock within the given time period
* @param time maximum time to wait
* @param unit time unit
* @return true if acquired, false if timed out
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* Release the lock
*/
public void release() throws Exception;
/**
* Check if this mutex is acquired by the current thread
* @return true if acquired by current thread
*/
public boolean isAcquiredInThisProcess();
}Usage Example:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
CuratorFramework client = // ... initialize client
InterProcessMutex lock = new InterProcessMutex(client, "/app/locks/resource");
try {
if (lock.acquire(10, TimeUnit.SECONDS)) {
// Critical section - only one process can execute this
System.out.println("Processing shared resource");
// ... do work
} else {
System.out.println("Could not acquire lock within 10 seconds");
}
} finally {
lock.release(); // Always release in finally block
}Non-reentrant mutex implementation. Unlike InterProcessMutex, the same thread cannot acquire this lock multiple times.
/**
* Non-reentrant mutex that works across JVMs
*/
public class InterProcessSemaphoreMutex implements InterProcessLock {
/**
* Create a new InterProcessSemaphoreMutex
* @param client the curator client
* @param path the path to use for the lock
*/
public InterProcessSemaphoreMutex(CuratorFramework client, String path);
public void acquire() throws Exception;
public boolean acquire(long time, TimeUnit unit) throws Exception;
public void release() throws Exception;
}Read-write lock implementation allowing multiple readers or one writer across JVMs.
/**
* Re-entrant read/write mutex that works across JVMs
*/
public class InterProcessReadWriteLock {
/**
* Create a new InterProcessReadWriteLock
* @param client the curator client
* @param lockPath the path to use for the lock
*/
public InterProcessReadWriteLock(CuratorFramework client, String lockPath);
/**
* Get the read lock portion of this lock
* @return read lock instance
*/
public InterProcessLock readLock();
/**
* Get the write lock portion of this lock
* @return write lock instance
*/
public InterProcessLock writeLock();
/**
* Nested class for read lock component
*/
public static class ReadLock implements InterProcessLock {
public void acquire() throws Exception;
public boolean acquire(long time, TimeUnit unit) throws Exception;
public void release() throws Exception;
}
/**
* Nested class for write lock component
*/
public static class WriteLock implements InterProcessLock {
public void acquire() throws Exception;
public boolean acquire(long time, TimeUnit unit) throws Exception;
public void release() throws Exception;
}
}Usage Example:
InterProcessReadWriteLock rwLock = new InterProcessReadWriteLock(client, "/app/locks/data");
InterProcessLock readLock = rwLock.readLock();
InterProcessLock writeLock = rwLock.writeLock();
// Multiple readers can acquire simultaneously
readLock.acquire();
try {
// Read shared data
System.out.println("Reading data...");
} finally {
readLock.release();
}
// Only one writer can acquire (and no readers)
writeLock.acquire();
try {
// Write shared data
System.out.println("Writing data...");
} finally {
writeLock.release();
}Container for managing multiple locks as a single entity. All locks must be acquired for the multi-lock to be considered acquired.
/**
* Container holding multiple locks and treating them as a single lock
*/
public class InterProcessMultiLock implements InterProcessLock {
/**
* Create a new InterProcessMultiLock
* @param locks list of locks to manage together
*/
public InterProcessMultiLock(List<InterProcessLock> locks);
/**
* Acquire all locks in the multi-lock
*/
public void acquire() throws Exception;
/**
* Acquire all locks within the given time period
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* Release all locks in the multi-lock
*/
public void release() throws Exception;
}Counting semaphore that works across JVMs, allowing a specified number of processes to acquire the semaphore.
/**
* Counting semaphore that works across JVMs
*/
public class InterProcessSemaphoreV2 {
/**
* Create a new InterProcessSemaphoreV2
* @param client the curator client
* @param mutexPath the path to use for the semaphore
* @param maxLeases maximum number of simultaneous leases
*/
public InterProcessSemaphoreV2(CuratorFramework client, String mutexPath, int maxLeases);
/**
* Acquire a lease from the semaphore
* @return acquired lease, or null if not available
*/
public Lease acquire() throws Exception;
/**
* Acquire a lease within the given time period
*/
public Lease acquire(long time, TimeUnit unit) throws Exception;
/**
* Acquire multiple leases
* @param qty number of leases to acquire
* @return collection of acquired leases
*/
public Collection<Lease> acquire(int qty) throws Exception;
/**
* Get current available leases
* @return number of available leases
*/
public int availablePermits() throws Exception;
/**
* Return a lease to the semaphore
* @param lease the lease to return
*/
public void returnLease(Lease lease) throws Exception;
/**
* Return multiple leases
*/
public void returnAll(Collection<Lease> leases) throws Exception;
}
/**
* Represents a lease from a semaphore
*/
public class Lease implements Closeable {
/**
* Get the data associated with this lease
*/
public byte[] getData();
/**
* Get the node name for this lease
*/
public String getNodeName();
/**
* Close/return this lease
*/
public void close() throws IOException;
}Usage Example:
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/app/semaphore", 3);
Lease lease = semaphore.acquire(5, TimeUnit.SECONDS);
if (lease != null) {
try {
// Only 3 processes can be here simultaneously
System.out.println("Acquired semaphore lease");
// ... do work
} finally {
semaphore.returnLease(lease);
}
} else {
System.out.println("Could not acquire semaphore lease");
}Utility class for try-with-resources lock acquisition pattern.
/**
* Utility for using locks with try-with-resources
*/
public class Locker implements Closeable {
/**
* Create a new Locker for the given lock
* @param lock the lock to manage
* @param time maximum time to wait for acquisition
* @param unit time unit
*/
public Locker(InterProcessLock lock, long time, TimeUnit unit) throws Exception;
/**
* Release the managed lock
*/
public void close() throws IOException;
}Usage Example:
InterProcessMutex lock = new InterProcessMutex(client, "/app/locks/resource");
try (Locker locker = new Locker(lock, 10, TimeUnit.SECONDS)) {
// Lock is automatically acquired here and released when leaving the try block
System.out.println("Lock acquired, doing work...");
// ... do work
} // Lock is automatically released hereInterface for locks that can be revoked by external processes.
/**
* Interface for locks that can be revoked
*/
public interface Revocable<T> {
/**
* Make the lock revocable with the given listener
* @param listener listener to be called when revocation is requested
*/
void makeRevocable(RevocationListener<T> listener);
}
/**
* Listener interface for lock revocation events
*/
public interface RevocationListener<T> {
/**
* Called when revocation is requested
* @param forLock the lock being revoked
*/
void revocationRequested(T forLock);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-shaded-curator