Non-blocking asynchronous iteration and CompletableFuture-based operations for high-performance concurrent database access.
Iterate over range query results asynchronously without blocking.
/**
* Asynchronous iterable over elements.
* Provides lazy iteration with batch fetching.
*
* Type Parameters:
* - T: Element type
*/
interface AsyncIterable<T> {
/**
* Get async iterator for this iterable.
*
* Returns:
* AsyncIterator<T> - Iterator over elements
*/
AsyncIterator<T> iterator();
/**
* Convert all elements to list asynchronously.
* Fetches all remaining elements.
*
* Returns:
* CompletableFuture<List<T>> - Future completing with all elements
*/
CompletableFuture<List<T>> asList();
}
/**
* Asynchronous iterator over elements.
* Extends standard Iterator with async capabilities.
*
* Type Parameters:
* - T: Element type
*/
interface AsyncIterator<T> extends Iterator<T> {
/**
* Asynchronously check if more elements available.
* Fetches next batch if needed.
*
* Returns:
* CompletableFuture<Boolean> - Future completing with true if more elements
*/
CompletableFuture<Boolean> onHasNext();
/**
* Check if more elements available (blocking).
* Blocks until result is available.
*
* Returns:
* boolean - True if more elements available
*/
boolean hasNext();
/**
* Get next element (blocking).
* Must call hasNext() first or check onHasNext().
*
* Returns:
* T - Next element
*
* Throws:
* NoSuchElementException - If no more elements
*/
T next();
}
/**
* Async iterator that can be closed to release resources.
*
* Type Parameters:
* - T: Element type
*/
interface CloseableAsyncIterator<T> extends AsyncIterator<T>, AutoCloseable {
/**
* Close iterator and release resources.
* Cancels pending operations.
*/
void close();
}Usage examples:
import com.apple.foundationdb.*;
import com.apple.foundationdb.async.*;
import java.util.concurrent.CompletableFuture;
// Synchronous iteration (blocking)
db.read(tr -> {
AsyncIterable<KeyValue> range = tr.getRange(
"prefix:".getBytes(),
"prefix;".getBytes()
);
// Blocking for-each loop
for (KeyValue kv : range) {
System.out.println(new String(kv.getKey()));
}
return null;
});
// Asynchronous iteration (non-blocking)
db.readAsync(tr -> {
AsyncIterable<KeyValue> range = tr.getRange(
"prefix:".getBytes(),
"prefix;".getBytes()
);
AsyncIterator<KeyValue> iter = range.iterator();
return processNextAsync(iter);
});
// Recursive async processing
CompletableFuture<Void> processNextAsync(AsyncIterator<KeyValue> iter) {
return iter.onHasNext().thenCompose(hasNext -> {
if (!hasNext) {
return CompletableFuture.completedFuture(null);
}
KeyValue kv = iter.next();
System.out.println(new String(kv.getKey()));
// Process next element
return processNextAsync(iter);
});
}
// Convert to list (fetches all)
db.readAsync(tr -> {
AsyncIterable<KeyValue> range = tr.getRange(
"users:".getBytes(),
"users;".getBytes()
);
return range.asList().thenAccept(list -> {
System.out.println("Found " + list.size() + " users");
for (KeyValue kv : list) {
System.out.println(new String(kv.getKey()));
}
});
});
// Using CloseableAsyncIterator
try (CloseableAsyncIterator<KeyValue> iter =
TenantManagement.listTenants(db, new byte[0], new byte[]{(byte)0xFF}, 1000)) {
while (iter.hasNext()) {
KeyValue kv = iter.next();
System.out.println("Tenant: " + new String(kv.getKey()));
}
} // Automatically closedAll async operations return CompletableFuture for composability.
// Transaction operations return CompletableFuture
CompletableFuture<byte[]> Transaction.get(byte[] key);
CompletableFuture<Void> Transaction.commit();
CompletableFuture<Long> Transaction.getReadVersion();
CompletableFuture<byte[]> Transaction.getVersionstamp();
// Database operations return CompletableFuture
<T> CompletableFuture<T> Database.runAsync(
Function<? super Transaction, ? extends CompletableFuture<T>> retryable
);
<T> CompletableFuture<T> Database.readAsync(
Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable
);Usage examples:
// Chain async operations
db.runAsync(tr -> {
return tr.get("key1".getBytes())
.thenCompose(value1 -> {
if (value1 != null) {
tr.set("key2".getBytes(), value1);
}
return tr.get("key3".getBytes());
})
.thenCompose(value3 -> {
tr.set("key4".getBytes(), value3);
return tr.commit();
});
});
// Parallel reads
db.readAsync(tr -> {
CompletableFuture<byte[]> f1 = tr.get("key1".getBytes());
CompletableFuture<byte[]> f2 = tr.get("key2".getBytes());
CompletableFuture<byte[]> f3 = tr.get("key3".getBytes());
return CompletableFuture.allOf(f1, f2, f3)
.thenApply(v -> {
// All reads complete
byte[] v1 = f1.join();
byte[] v2 = f2.join();
byte[] v3 = f3.join();
return processValues(v1, v2, v3);
});
});
// Error handling
db.runAsync(tr -> {
return tr.get("key".getBytes())
.thenApply(value -> {
if (value == null) {
throw new IllegalStateException("Key not found");
}
return value;
})
.exceptionally(ex -> {
System.err.println("Error: " + ex.getMessage());
return new byte[0];
})
.thenCompose(value -> {
tr.set("backup".getBytes(), value);
return tr.commit();
});
});
// Timeout handling
db.readAsync(tr -> {
return tr.get("key".getBytes())
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
System.err.println("Read timed out");
}
return null;
});
});Process range queries asynchronously with backpressure control.
// Process range with async pipeline
db.readAsync(tr -> {
AsyncIterable<KeyValue> range = tr.getRange(
"items:".getBytes(),
"items;".getBytes(),
1000, // Limit
false,
StreamingMode.ITERATOR
);
return processRangeAsync(range.iterator(), 0);
});
CompletableFuture<Integer> processRangeAsync(
AsyncIterator<KeyValue> iter, int count) {
return iter.onHasNext().thenCompose(hasNext -> {
if (!hasNext) {
return CompletableFuture.completedFuture(count);
}
KeyValue kv = iter.next();
// Process item asynchronously
return processItemAsync(kv)
.thenCompose(result -> {
// Continue with next item
return processRangeAsync(iter, count + 1);
});
});
}
CompletableFuture<Void> processItemAsync(KeyValue kv) {
// Simulate async processing
return CompletableFuture.runAsync(() -> {
// Process item
System.out.println("Processing: " + new String(kv.getKey()));
});
}Process large ranges in batches for better performance.
// Batch processing with async iteration
db.readAsync(tr -> {
AsyncIterable<KeyValue> range = tr.getRange(
"data:".getBytes(),
"data;".getBytes()
);
return processBatchesAsync(range.iterator(), 100);
});
CompletableFuture<Void> processBatchesAsync(
AsyncIterator<KeyValue> iter, int batchSize) {
return collectBatch(iter, batchSize, new ArrayList<>())
.thenCompose(batch -> {
if (batch.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// Process batch
return processBatch(batch)
.thenCompose(v -> {
// Process next batch
return processBatchesAsync(iter, batchSize);
});
});
}
CompletableFuture<List<KeyValue>> collectBatch(
AsyncIterator<KeyValue> iter,
int batchSize,
List<KeyValue> batch) {
if (batch.size() >= batchSize) {
return CompletableFuture.completedFuture(batch);
}
return iter.onHasNext().thenCompose(hasNext -> {
if (!hasNext) {
return CompletableFuture.completedFuture(batch);
}
batch.add(iter.next());
return collectBatch(iter, batchSize, batch);
});
}
CompletableFuture<Void> processBatch(List<KeyValue> batch) {
System.out.println("Processing batch of " + batch.size() + " items");
// Process all items in batch in parallel
List<CompletableFuture<Void>> futures = batch.stream()
.map(kv -> CompletableFuture.runAsync(() -> {
// Process item
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
}Control thread execution with custom executors.
// Create custom executor
Executor customExecutor = Executors.newFixedThreadPool(20);
// Use with database operations
Database db = fdb.open(null, customExecutor);
// All async callbacks run on custom executor
db.runAsync(tr -> {
return tr.get("key".getBytes())
.thenApply(value -> {
// Runs on customExecutor thread
return processValue(value);
})
.thenCompose(result -> {
tr.set("result".getBytes(), result);
return tr.commit();
});
});
// Use different executor for specific operations
Executor ioExecutor = Executors.newCachedThreadPool();
db.readAsync(tr -> {
return tr.get("key".getBytes())
.thenApplyAsync(value -> {
// Runs on ioExecutor
return performIoOperation(value);
}, ioExecutor)
.thenApply(result -> {
// Back on database executor
return result;
});
});Utility functions for async operations.
/**
* Safely apply function to argument, wrapping exceptions.
*
* Type Parameters:
* - T: Result type
*
* Parameters:
* - func: Function<?, T> - Function to apply
* - arg: Object - Argument to function
*
* Returns:
* CompletableFuture<T> - Future with result or exception
*/
static <T> CompletableFuture<T> AsyncUtil.applySafely(Function<?, T> func, Object arg);
/**
* While loop for async conditions.
* Continues while condition returns true.
*
* Parameters:
* - condition: Supplier<CompletableFuture<Boolean>> - Async condition
* - e: Executor - Executor for loop execution
*
* Returns:
* CompletableFuture<Void> - Completes when condition becomes false
*/
static CompletableFuture<Void> AsyncUtil.whileTrue(
Supplier<CompletableFuture<Boolean>> condition, Executor e
);interface AsyncIterable<T> {
AsyncIterator<T> iterator();
CompletableFuture<List<T>> asList();
}
interface AsyncIterator<T> extends Iterator<T> {
CompletableFuture<Boolean> onHasNext();
boolean hasNext();
T next();
}
interface CloseableAsyncIterator<T> extends AsyncIterator<T>, AutoCloseable {
void close();
}
class AsyncUtil {
static <T> CompletableFuture<T> applySafely(Function<?, T> func, Object arg);
static CompletableFuture<Void> whileTrue(
Supplier<CompletableFuture<Boolean>> condition, Executor e
);
}