Watches, versionstamps, blob granules, locality queries, and other advanced functionality for sophisticated FoundationDB applications.
Monitor keys for changes with efficient server-side watches.
/**
* Create watch on a key that completes when key changes.
* Watch is set at transaction's read version and triggers on any change.
*
* Parameters:
* - key: byte[] - Key to watch
*
* Returns:
* CompletableFuture<Void> - Future that completes when key changes
*/
CompletableFuture<Void> Transaction.watch(byte[] key);Usage examples:
import com.apple.foundationdb.*;
// Simple watch
db.run(tr -> {
byte[] key = "config:refresh_needed".getBytes();
// Read current value
byte[] value = tr.get(key).join();
// Create watch
CompletableFuture<Void> watch = tr.watch(key);
// Watch is registered after commit
tr.commit().join();
return watch;
}).thenAccept(v -> {
System.out.println("Config changed!");
// Reload configuration
});
// Watch with timeout
CompletableFuture<Void> watch = db.run(tr -> {
byte[] key = "status".getBytes();
CompletableFuture<Void> w = tr.watch(key);
tr.commit().join();
return w;
});
watch.orTimeout(30, TimeUnit.SECONDS)
.thenAccept(v -> System.out.println("Status changed"))
.exceptionally(ex -> {
if (ex.getCause() instanceof TimeoutException) {
System.out.println("Watch timed out");
}
return null;
});
// Watch loop
void watchLoop(Database db, byte[] key, Consumer<byte[]> handler) {
db.runAsync(tr -> {
return tr.get(key)
.thenCompose(value -> {
handler.accept(value);
return tr.watch(key);
})
.thenCompose(v -> tr.commit())
.thenCompose(v -> tr.watch(key));
}).thenAccept(v -> {
// Watch fired, restart loop
watchLoop(db, key, handler);
});
}Manage transaction read versions for consistency control.
/**
* Get transaction's read version.
* Ensures transaction sees consistent snapshot.
*
* Returns:
* CompletableFuture<Long> - Read version
*/
CompletableFuture<Long> Transaction.getReadVersion();
/**
* Set transaction's read version explicitly.
* Useful for reading at specific version or coordinating reads.
*
* Parameters:
* - version: long - Version to read at
*/
void Transaction.setReadVersion(long version);Usage examples:
// Coordinate reads across transactions
long version = db.run(tr -> {
return tr.getReadVersion().join();
});
// Use same version in another transaction
db.read(tr -> {
tr.setReadVersion(version);
// Read at same version
byte[] value = tr.get("key".getBytes()).join();
return null;
});
// Consistent multi-transaction read
long readVersion = db.run(tr1 -> {
// First transaction
tr1.get("key1".getBytes()).join();
return tr1.getReadVersion().join();
});
db.read(tr2 -> {
tr2.setReadVersion(readVersion);
// Sees same snapshot as tr1
tr2.get("key2".getBytes()).join();
return null;
});Read without adding conflict ranges.
/**
* Get snapshot view of transaction.
* Snapshot reads don't add conflict ranges.
*
* Returns:
* ReadTransaction - Snapshot view
*/
ReadTransaction Transaction.snapshot();
/**
* Check if this is a snapshot transaction.
*
* Returns:
* boolean - True if snapshot
*/
boolean ReadTransaction.isSnapshot();Usage examples:
// Snapshot read (no conflicts)
db.run(tr -> {
ReadTransaction snapshot = tr.snapshot();
// These reads don't cause conflicts
byte[] value1 = snapshot.get("key1".getBytes()).join();
byte[] value2 = snapshot.get("key2".getBytes()).join();
// Regular write
tr.set("key3".getBytes(), "value".getBytes());
return null;
});
// Read for display without conflicts
db.run(tr -> {
// Write operations
tr.set("counter".getBytes(), "1".getBytes());
// Read for logging (snapshot)
for (KeyValue kv : tr.snapshot().getRange("log:".getBytes(), "log;".getBytes())) {
System.out.println("Log: " + new String(kv.getKey()));
}
return null;
});Manually manage read and write conflict ranges.
/**
* Add read conflict range.
* Other transactions writing to this range will conflict.
*
* Parameters:
* - keyBegin: byte[] - Start of range (inclusive)
* - keyEnd: byte[] - End of range (exclusive)
*/
void Transaction.addReadConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Add read conflict for single key.
*
* Parameters:
* - key: byte[] - Key to add conflict for
*/
void Transaction.addReadConflictKey(byte[] key);
/**
* Add write conflict range.
* Other transactions reading or writing this range will conflict.
*
* Parameters:
* - keyBegin: byte[] - Start of range (inclusive)
* - keyEnd: byte[] - End of range (exclusive)
*/
void Transaction.addWriteConflictRange(byte[] keyBegin, byte[] keyEnd);
/**
* Add write conflict for single key.
*
* Parameters:
* - key: byte[] - Key to add conflict for
*/
void Transaction.addWriteConflictKey(byte[] key);
/**
* Conditionally add read conflict range (if not snapshot).
*
* Parameters:
* - keyBegin: byte[] - Start of range (inclusive)
* - keyEnd: byte[] - End of range (exclusive)
*
* Returns:
* boolean - True if conflict range was added
*/
boolean ReadTransaction.addReadConflictRangeIfNotSnapshot(
byte[] keyBegin, byte[] keyEnd
);
/**
* Conditionally add read conflict for key (if not snapshot).
*
* Parameters:
* - key: byte[] - Key to add conflict for
*
* Returns:
* boolean - True if conflict was added
*/
boolean ReadTransaction.addReadConflictKeyIfNotSnapshot(byte[] key);Usage examples:
// Reserve range for exclusive access
db.run(tr -> {
byte[] rangeStart = "users:1001:".getBytes();
byte[] rangeEnd = "users:1001;".getBytes();
// Add write conflict without writing
tr.addWriteConflictRange(rangeStart, rangeEnd);
// Other transactions will conflict even without actual writes
// Perform operations...
return null;
});
// Optimistic read (add conflict after validation)
db.run(tr -> {
byte[] key = "counter".getBytes();
// Read without automatic conflict (using snapshot)
byte[] value = tr.snapshot().get(key).join();
long count = value != null ? ByteBuffer.wrap(value).getLong() : 0;
if (count < 1000) {
// Validation passed, add conflict and write
tr.addReadConflictKey(key);
tr.set(key, ByteBuffer.allocate(8).putLong(count + 1).array());
}
return null;
});Use transaction commit version in keys or values.
/**
* Get versionstamp after successful commit.
* Returns 10-byte transaction version.
*
* Returns:
* CompletableFuture<byte[]> - Transaction versionstamp
*/
CompletableFuture<byte[]> Transaction.getVersionstamp();Usage examples:
import com.apple.foundationdb.tuple.*;
import java.nio.ByteBuffer;
// Versionstamped key for time-ordered log
db.run(tr -> {
Tuple logKey = Tuple.from("log", Versionstamp.incomplete());
byte[] key = logKey.packWithVersionstamp();
tr.mutate(MutationType.SET_VERSIONSTAMPED_KEY,
key, "log entry".getBytes());
tr.commit().join();
// Get actual versionstamp used
byte[] vstamp = tr.getVersionstamp().join();
Versionstamp committed = Versionstamp.complete(vstamp);
System.out.println("Logged at: " + committed);
return null;
});
// Versionstamped value for versioning
db.run(tr -> {
byte[] key = "document:123".getBytes();
// Value includes versionstamp
Tuple value = Tuple.from("content", Versionstamp.incomplete());
byte[] valueBytes = value.packWithVersionstamp();
tr.mutate(MutationType.SET_VERSIONSTAMPED_VALUE, key, valueBytes);
return null;
});
// Read versionstamped keys
db.read(tr -> {
Subspace logs = new Subspace(Tuple.from("log"));
Range range = logs.range();
for (KeyValue kv : tr.getRange(range)) {
Tuple key = logs.unpack(kv.getKey());
Versionstamp vs = key.getVersionstamp(1);
System.out.println("Entry at " + vs + ": " + new String(kv.getValue()));
}
return null;
});
// User versions for ordering within transaction
db.run(tr -> {
Subspace events = new Subspace(Tuple.from("events"));
for (int i = 0; i < 10; i++) {
Tuple key = Tuple.from("event", Versionstamp.incomplete(i));
byte[] keyBytes = events.packWithVersionstamp(key);
tr.mutate(MutationType.SET_VERSIONSTAMPED_KEY,
keyBytes, ("Event " + i).getBytes());
}
return null;
});Query data distribution and storage server locality information using LocalityUtil.
/**
* Get boundary keys for range splits.
* Returns keys marking boundaries between contiguous ranges stored on single servers.
* This is non-transactional and provides estimates.
*
* Parameters:
* - db: Database - Database to query
* - begin: byte[] - Start of range (inclusive)
* - end: byte[] - End of range (exclusive)
*
* Returns:
* CloseableAsyncIterator<byte[]> - Iterator over boundary keys
*/
static CloseableAsyncIterator<byte[]> LocalityUtil.getBoundaryKeys(Database db, byte[] begin, byte[] end);
/**
* Get boundary keys using transaction context.
* Non-transactional operation that uses transaction for executor and read version.
* Transaction options are not applied, but if read version already obtained, provides latency advantage.
*
* Parameters:
* - tr: Transaction - Transaction for database access
* - begin: byte[] - Start of range (inclusive)
* - end: byte[] - End of range (exclusive)
*
* Returns:
* CloseableAsyncIterator<byte[]> - Iterator over boundary keys
*/
static CloseableAsyncIterator<byte[]> LocalityUtil.getBoundaryKeys(Transaction tr, byte[] begin, byte[] end);
/**
* Get storage server addresses for a key.
* Returns public network addresses of servers storing the key.
* Ports not included unless setIncludePortInAddress() called on transaction.
*
* Parameters:
* - tr: Transaction - Transaction context
* - key: byte[] - Key to query
*
* Returns:
* CompletableFuture<String[]> - Array of address strings
*
* Throws:
* FDBException - locality_information_unavailable (1033) if unavailable
*/
static CompletableFuture<String[]> LocalityUtil.getAddressesForKey(Transaction tr, byte[] key);Usage examples:
import com.apple.foundationdb.*;
import com.apple.foundationdb.async.CloseableAsyncIterator;
// Get boundary keys for sharding/splitting data
try (CloseableAsyncIterator<byte[]> boundaries =
LocalityUtil.getBoundaryKeys(db, "user:".getBytes(), "user;".getBytes())) {
List<byte[]> splits = new ArrayList<>();
while (boundaries.hasNext()) {
splits.add(boundaries.next());
}
System.out.println("Found " + splits.size() + " boundary keys");
// Use boundaries to split processing across multiple workers
for (int i = 0; i < splits.size() - 1; i++) {
byte[] start = splits.get(i);
byte[] end = splits.get(i + 1);
// Process range [start, end) on separate worker
}
}
// Get storage locations for a key
db.run(tr -> {
// Enable port numbers in addresses
tr.options().setIncludePortInAddress();
byte[] key = "important_data".getBytes();
String[] addresses = LocalityUtil.getAddressesForKey(tr, key).join();
System.out.println("Key stored on servers:");
for (String addr : addresses) {
System.out.println(" " + addr);
}
return null;
});
// Use boundary keys with transaction for latency optimization
db.run(tr -> {
// Get read version first
tr.getReadVersion().join();
// getBoundaryKeys can reuse this read version
try (CloseableAsyncIterator<byte[]> boundaries =
LocalityUtil.getBoundaryKeys(tr, start, end)) {
while (boundaries.hasNext()) {
byte[] boundary = boundaries.next();
// Process boundary
}
}
return null;
});Work with blob storage for cold data.
/**
* Get blob granule ranges intersecting query range.
*
* Parameters:
* - begin: byte[] - Start of query range (inclusive)
* - end: byte[] - End of query range (exclusive)
* - rowLimit: int - Maximum granule ranges to return
*
* Returns:
* CompletableFuture<KeyRangeArrayResult> - Blob granule boundaries
*/
CompletableFuture<KeyRangeArrayResult> ReadTransaction.getBlobGranuleRanges(
byte[] begin, byte[] end, int rowLimit
);Usage example:
// Query blob granule structure
db.read(tr -> {
KeyRangeArrayResult granules = tr.getBlobGranuleRanges(
"archive:".getBytes(),
"archive;".getBytes(),
100
).join();
for (Range granule : granules.getRanges()) {
System.out.println("Granule: " +
new String(granule.begin) + " to " +
new String(granule.end));
}
return null;
});Experimental feature for querying with secondary index joins.
/**
* Get mapped range results (experimental).
* Retrieves primary data with associated secondary lookups.
*
* Parameters:
* - begin: KeySelector - Start position
* - end: KeySelector - End position
* - mapper: byte[] - Mapper specification
* - limit: int - Maximum results
* - reverse: boolean - Reverse order
* - mode: StreamingMode - Fetching strategy
*
* Returns:
* AsyncIterable<MappedKeyValue> - Mapped results
*/
AsyncIterable<MappedKeyValue> ReadTransaction.getMappedRange(
KeySelector begin, KeySelector end, byte[] mapper,
int limit, boolean reverse, StreamingMode mode
);Access cluster metadata and configuration through special keys.
// Enable special key space access
void TransactionOptions.setAccessSystemKeys();
// Enable relaxed special key space reads
void TransactionOptions.setSpecialKeySpaceRelaxed();
// Enable special key space writes
void TransactionOptions.setSpecialKeySpaceEnableWrites();Usage example:
// Read cluster configuration
db.read(tr -> {
tr.options().setReadSystemKeys();
// Read from special key space (0xFF prefix)
byte[] configKey = "\u00FF\u00FF/configuration/".getBytes();
byte[] config = tr.get(configKey).join();
if (config != null) {
System.out.println("Cluster config: " + new String(config));
}
return null;
});Query transaction properties and state.
/**
* Get parent database for transaction.
*
* Returns:
* Database - Parent database
*/
Database Transaction.getDatabase();
/**
* Get committed version (only valid after successful commit).
*
* Returns:
* Long - Commit version, or null if not committed
*/
Long Transaction.getCommittedVersion();
/**
* Get approximate transaction size before commit.
*
* Returns:
* CompletableFuture<Long> - Approximate size in bytes
*/
CompletableFuture<Long> Transaction.getApproximateSize();Track FDB operation metrics.
/**
* Event tracking interface for instrumentation.
*/
interface EventKeeper {
/**
* Count events.
*
* Parameters:
* - event: Event - Event type
* - amt: long - Amount to count
*/
void count(Event event, long amt);
/**
* Increment event count by 1.
*
* Parameters:
* - event: Event - Event type
*/
void increment(Event event);
/**
* Record time duration.
*
* Parameters:
* - event: Event - Event type
* - nanos: long - Duration in nanoseconds
*/
void timeNanos(Event event, long nanos);
/**
* Record time with unit.
*
* Parameters:
* - event: Event - Event type
* - duration: long - Duration value
* - unit: TimeUnit - Time unit
*/
void time(Event event, long duration, TimeUnit unit);
/**
* Get event count.
*
* Parameters:
* - event: Event - Event type
*
* Returns:
* long - Total count
*/
long getCount(Event event);
/**
* Get total time for event.
*
* Parameters:
* - event: Event - Event type
*
* Returns:
* long - Total time in nanoseconds
*/
long getTimeNanos(Event event);
}
/**
* Predefined events for tracking.
*/
enum Events implements EventKeeper.Event {
JNI_CALL,
BYTES_FETCHED,
RANGE_QUERY_DIRECT_BUFFER_HIT,
RANGE_QUERY_DIRECT_BUFFER_MISS,
RANGE_QUERY_FETCHES,
RANGE_QUERY_RECORDS_FETCHED,
RANGE_QUERY_CHUNK_FAILED,
RANGE_QUERY_FETCH_TIME_NANOS
}
/**
* Map-based EventKeeper implementation.
*/
class MapEventKeeper implements EventKeeper {
MapEventKeeper();
}Usage example:
// Create event keeper
MapEventKeeper eventKeeper = new MapEventKeeper();
// Use with database
Database db = fdb.open(null, fdb.getExecutor(), eventKeeper);
// Operations are tracked
db.read(tr -> {
for (KeyValue kv : tr.getRange("data:".getBytes(), "data;".getBytes())) {
// Process...
}
return null;
});
// Query metrics
long jniCalls = eventKeeper.getCount(Events.JNI_CALL);
long bytesFetched = eventKeeper.getCount(Events.BYTES_FETCHED);
long fetchTimeNs = eventKeeper.getTimeNanos(Events.RANGE_QUERY_FETCH_TIME_NANOS);
System.out.println("JNI calls: " + jniCalls);
System.out.println("Bytes fetched: " + bytesFetched);
System.out.println("Fetch time: " + (fetchTimeNs / 1_000_000) + "ms");Enable DirectByteBuffer for large range queries.
// Enable DirectBuffer queries
void FDB.enableDirectBufferQuery(boolean enabled);
// Check if enabled
boolean FDB.isDirectBufferQueriesEnabled();
// Resize DirectBuffer pool
void FDB.resizeDirectBufferPool(int poolSize, int bufferSize);Usage example:
FDB fdb = FDB.selectAPIVersion(740);
// Enable for large range scans
fdb.enableDirectBufferQuery(true);
fdb.resizeDirectBufferPool(16, 1024 * 1024); // 16 buffers of 1MB each
Database db = fdb.open();Query storage locations of keys for advanced distributed query optimization and workload design.
/**
* Get boundary keys that mark start of single-server ranges.
* Non-transactional query returning estimated boundaries.
* Useful for parallel processing and workload distribution.
*
* Parameters:
* - db: Database - Database to query
* - begin: byte[] - Inclusive start of range
* - end: byte[] - Exclusive end of range
*
* Returns:
* CloseableAsyncIterator<byte[]> - Iterator of boundary keys
*/
static CloseableAsyncIterator<byte[]> LocalityUtil.getBoundaryKeys(Database db, byte[] begin, byte[] end);
/**
* Get boundary keys using transaction context.
* Reuses transaction's read version if already fetched for latency optimization.
* Transaction is not used directly - Database from transaction is queried.
*
* Parameters:
* - tr: Transaction - Transaction providing database context
* - begin: byte[] - Inclusive start of range
* - end: byte[] - Exclusive end of range
*
* Returns:
* CloseableAsyncIterator<byte[]> - Iterator of boundary keys
*/
static CloseableAsyncIterator<byte[]> LocalityUtil.getBoundaryKeys(Transaction tr, byte[] begin, byte[] end);
/**
* Get public network addresses of storage servers storing a key.
* Requires tr.options().setIncludePortInAddress() to include ports.
*
* Parameters:
* - tr: Transaction - Transaction for locality query
* - key: byte[] - Key to locate
*
* Returns:
* CompletableFuture<String[]> - Array of server addresses (host or host:port)
*
* Throws:
* FDBException - With code 1033 (locality_information_unavailable) if info not available
*/
static CompletableFuture<String[]> LocalityUtil.getAddressesForKey(Transaction tr, byte[] key);Usage examples:
import com.apple.foundationdb.*;
// Get boundary keys for parallel processing
try (CloseableAsyncIterator<byte[]> boundaries =
LocalityUtil.getBoundaryKeys(db, "data:".getBytes(), "data;".getBytes())) {
List<byte[]> boundaryList = new ArrayList<>();
while (boundaries.hasNext()) {
boundaryList.add(boundaries.next());
}
// Process each range in parallel
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < boundaryList.size() - 1; i++) {
byte[] rangeBegin = boundaryList.get(i);
byte[] rangeEnd = boundaryList.get(i + 1);
tasks.add(CompletableFuture.runAsync(() -> {
processRange(db, rangeBegin, rangeEnd);
}));
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
}
// Get server addresses for a key
db.run(tr -> {
byte[] key = "important_data".getBytes();
// Include port in addresses
tr.options().setIncludePortInAddress();
return LocalityUtil.getAddressesForKey(tr, key)
.thenAccept(addresses -> {
System.out.println("Key stored on servers:");
for (String addr : addresses) {
System.out.println(" " + addr);
}
});
});
// Partition workload by server boundaries
void parallelScan(Database db, byte[] begin, byte[] end) {
try (CloseableAsyncIterator<byte[]> boundaries =
LocalityUtil.getBoundaryKeys(db, begin, end)) {
byte[] rangeStart = begin;
while (boundaries.hasNext()) {
byte[] rangeEnd = boundaries.next();
// Each range is stored on one server
executorService.submit(() -> {
scanRange(db, rangeStart, rangeEnd);
});
rangeStart = rangeEnd;
}
// Process final range
byte[] finalEnd = end;
executorService.submit(() -> {
scanRange(db, rangeStart, finalEnd);
});
}
}interface EventKeeper {
interface Event {
String name();
boolean isTimeEvent();
}
void count(Event event, long amt);
void increment(Event event);
void timeNanos(Event event, long nanos);
void time(Event event, long duration, TimeUnit unit);
long getCount(Event event);
long getTimeNanos(Event event);
long getTime(Event event, TimeUnit unit);
}
class MapEventKeeper implements EventKeeper {
MapEventKeeper();
}