or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

examples

edge-cases.mdreal-world-scenarios.md
index.md
tile.json

async-operations.mddocs/reference/

Async Operations

Non-blocking asynchronous iteration and CompletableFuture-based operations for high-performance concurrent database access.

Capabilities

Async Iteration

Iterate over range query results asynchronously without blocking.

/**
 * Asynchronous iterable over elements.
 * Provides lazy iteration with batch fetching.
 * 
 * Type Parameters:
 * - T: Element type
 */
interface AsyncIterable<T> {
    /**
     * Get async iterator for this iterable.
     * 
     * Returns:
     * AsyncIterator<T> - Iterator over elements
     */
    AsyncIterator<T> iterator();
    
    /**
     * Convert all elements to list asynchronously.
     * Fetches all remaining elements.
     * 
     * Returns:
     * CompletableFuture<List<T>> - Future completing with all elements
     */
    CompletableFuture<List<T>> asList();
}

/**
 * Asynchronous iterator over elements.
 * Extends standard Iterator with async capabilities.
 * 
 * Type Parameters:
 * - T: Element type
 */
interface AsyncIterator<T> extends Iterator<T> {
    /**
     * Asynchronously check if more elements available.
     * Fetches next batch if needed.
     * 
     * Returns:
     * CompletableFuture<Boolean> - Future completing with true if more elements
     */
    CompletableFuture<Boolean> onHasNext();
    
    /**
     * Check if more elements available (blocking).
     * Blocks until result is available.
     * 
     * Returns:
     * boolean - True if more elements available
     */
    boolean hasNext();
    
    /**
     * Get next element (blocking).
     * Must call hasNext() first or check onHasNext().
     * 
     * Returns:
     * T - Next element
     * 
     * Throws:
     * NoSuchElementException - If no more elements
     */
    T next();
}

/**
 * Async iterator that can be closed to release resources.
 * 
 * Type Parameters:
 * - T: Element type
 */
interface CloseableAsyncIterator<T> extends AsyncIterator<T>, AutoCloseable {
    /**
     * Close iterator and release resources.
     * Cancels pending operations.
     */
    void close();
}

Usage examples:

import com.apple.foundationdb.*;
import com.apple.foundationdb.async.*;
import java.util.concurrent.CompletableFuture;

// Synchronous iteration (blocking)
db.read(tr -> {
    AsyncIterable<KeyValue> range = tr.getRange(
        "prefix:".getBytes(),
        "prefix;".getBytes()
    );
    
    // Blocking for-each loop
    for (KeyValue kv : range) {
        System.out.println(new String(kv.getKey()));
    }
    return null;
});

// Asynchronous iteration (non-blocking)
db.readAsync(tr -> {
    AsyncIterable<KeyValue> range = tr.getRange(
        "prefix:".getBytes(),
        "prefix;".getBytes()
    );
    
    AsyncIterator<KeyValue> iter = range.iterator();
    
    return processNextAsync(iter);
});

// Recursive async processing
CompletableFuture<Void> processNextAsync(AsyncIterator<KeyValue> iter) {
    return iter.onHasNext().thenCompose(hasNext -> {
        if (!hasNext) {
            return CompletableFuture.completedFuture(null);
        }
        
        KeyValue kv = iter.next();
        System.out.println(new String(kv.getKey()));
        
        // Process next element
        return processNextAsync(iter);
    });
}

// Convert to list (fetches all)
db.readAsync(tr -> {
    AsyncIterable<KeyValue> range = tr.getRange(
        "users:".getBytes(),
        "users;".getBytes()
    );
    
    return range.asList().thenAccept(list -> {
        System.out.println("Found " + list.size() + " users");
        for (KeyValue kv : list) {
            System.out.println(new String(kv.getKey()));
        }
    });
});

// Using CloseableAsyncIterator
try (CloseableAsyncIterator<KeyValue> iter = 
         TenantManagement.listTenants(db, new byte[0], new byte[]{(byte)0xFF}, 1000)) {
    
    while (iter.hasNext()) {
        KeyValue kv = iter.next();
        System.out.println("Tenant: " + new String(kv.getKey()));
    }
} // Automatically closed

CompletableFuture Operations

All async operations return CompletableFuture for composability.

// Transaction operations return CompletableFuture
CompletableFuture<byte[]> Transaction.get(byte[] key);
CompletableFuture<Void> Transaction.commit();
CompletableFuture<Long> Transaction.getReadVersion();
CompletableFuture<byte[]> Transaction.getVersionstamp();

// Database operations return CompletableFuture
<T> CompletableFuture<T> Database.runAsync(
    Function<? super Transaction, ? extends CompletableFuture<T>> retryable
);
<T> CompletableFuture<T> Database.readAsync(
    Function<? super ReadTransaction, ? extends CompletableFuture<T>> retryable
);

Usage examples:

// Chain async operations
db.runAsync(tr -> {
    return tr.get("key1".getBytes())
        .thenCompose(value1 -> {
            if (value1 != null) {
                tr.set("key2".getBytes(), value1);
            }
            return tr.get("key3".getBytes());
        })
        .thenCompose(value3 -> {
            tr.set("key4".getBytes(), value3);
            return tr.commit();
        });
});

// Parallel reads
db.readAsync(tr -> {
    CompletableFuture<byte[]> f1 = tr.get("key1".getBytes());
    CompletableFuture<byte[]> f2 = tr.get("key2".getBytes());
    CompletableFuture<byte[]> f3 = tr.get("key3".getBytes());
    
    return CompletableFuture.allOf(f1, f2, f3)
        .thenApply(v -> {
            // All reads complete
            byte[] v1 = f1.join();
            byte[] v2 = f2.join();
            byte[] v3 = f3.join();
            
            return processValues(v1, v2, v3);
        });
});

// Error handling
db.runAsync(tr -> {
    return tr.get("key".getBytes())
        .thenApply(value -> {
            if (value == null) {
                throw new IllegalStateException("Key not found");
            }
            return value;
        })
        .exceptionally(ex -> {
            System.err.println("Error: " + ex.getMessage());
            return new byte[0];
        })
        .thenCompose(value -> {
            tr.set("backup".getBytes(), value);
            return tr.commit();
        });
});

// Timeout handling
db.readAsync(tr -> {
    return tr.get("key".getBytes())
        .orTimeout(5, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            if (ex instanceof TimeoutException) {
                System.err.println("Read timed out");
            }
            return null;
        });
});

Async Range Processing

Process range queries asynchronously with backpressure control.

// Process range with async pipeline
db.readAsync(tr -> {
    AsyncIterable<KeyValue> range = tr.getRange(
        "items:".getBytes(),
        "items;".getBytes(),
        1000,  // Limit
        false,
        StreamingMode.ITERATOR
    );
    
    return processRangeAsync(range.iterator(), 0);
});

CompletableFuture<Integer> processRangeAsync(
        AsyncIterator<KeyValue> iter, int count) {
    
    return iter.onHasNext().thenCompose(hasNext -> {
        if (!hasNext) {
            return CompletableFuture.completedFuture(count);
        }
        
        KeyValue kv = iter.next();
        
        // Process item asynchronously
        return processItemAsync(kv)
            .thenCompose(result -> {
                // Continue with next item
                return processRangeAsync(iter, count + 1);
            });
    });
}

CompletableFuture<Void> processItemAsync(KeyValue kv) {
    // Simulate async processing
    return CompletableFuture.runAsync(() -> {
        // Process item
        System.out.println("Processing: " + new String(kv.getKey()));
    });
}

Batched Async Processing

Process large ranges in batches for better performance.

// Batch processing with async iteration
db.readAsync(tr -> {
    AsyncIterable<KeyValue> range = tr.getRange(
        "data:".getBytes(),
        "data;".getBytes()
    );
    
    return processBatchesAsync(range.iterator(), 100);
});

CompletableFuture<Void> processBatchesAsync(
        AsyncIterator<KeyValue> iter, int batchSize) {
    
    return collectBatch(iter, batchSize, new ArrayList<>())
        .thenCompose(batch -> {
            if (batch.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            
            // Process batch
            return processBatch(batch)
                .thenCompose(v -> {
                    // Process next batch
                    return processBatchesAsync(iter, batchSize);
                });
        });
}

CompletableFuture<List<KeyValue>> collectBatch(
        AsyncIterator<KeyValue> iter,
        int batchSize,
        List<KeyValue> batch) {
    
    if (batch.size() >= batchSize) {
        return CompletableFuture.completedFuture(batch);
    }
    
    return iter.onHasNext().thenCompose(hasNext -> {
        if (!hasNext) {
            return CompletableFuture.completedFuture(batch);
        }
        
        batch.add(iter.next());
        return collectBatch(iter, batchSize, batch);
    });
}

CompletableFuture<Void> processBatch(List<KeyValue> batch) {
    System.out.println("Processing batch of " + batch.size() + " items");
    
    // Process all items in batch in parallel
    List<CompletableFuture<Void>> futures = batch.stream()
        .map(kv -> CompletableFuture.runAsync(() -> {
            // Process item
        }))
        .collect(Collectors.toList());
    
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[0])
    );
}

Custom Executor Usage

Control thread execution with custom executors.

// Create custom executor
Executor customExecutor = Executors.newFixedThreadPool(20);

// Use with database operations
Database db = fdb.open(null, customExecutor);

// All async callbacks run on custom executor
db.runAsync(tr -> {
    return tr.get("key".getBytes())
        .thenApply(value -> {
            // Runs on customExecutor thread
            return processValue(value);
        })
        .thenCompose(result -> {
            tr.set("result".getBytes(), result);
            return tr.commit();
        });
});

// Use different executor for specific operations
Executor ioExecutor = Executors.newCachedThreadPool();

db.readAsync(tr -> {
    return tr.get("key".getBytes())
        .thenApplyAsync(value -> {
            // Runs on ioExecutor
            return performIoOperation(value);
        }, ioExecutor)
        .thenApply(result -> {
            // Back on database executor
            return result;
        });
});

Async Utilities

Utility functions for async operations.

/**
 * Safely apply function to argument, wrapping exceptions.
 * 
 * Type Parameters:
 * - T: Result type
 * 
 * Parameters:
 * - func: Function<?, T> - Function to apply
 * - arg: Object - Argument to function
 * 
 * Returns:
 * CompletableFuture<T> - Future with result or exception
 */
static <T> CompletableFuture<T> AsyncUtil.applySafely(Function<?, T> func, Object arg);

/**
 * While loop for async conditions.
 * Continues while condition returns true.
 * 
 * Parameters:
 * - condition: Supplier<CompletableFuture<Boolean>> - Async condition
 * - e: Executor - Executor for loop execution
 * 
 * Returns:
 * CompletableFuture<Void> - Completes when condition becomes false
 */
static CompletableFuture<Void> AsyncUtil.whileTrue(
    Supplier<CompletableFuture<Boolean>> condition, Executor e
);

Types

interface AsyncIterable<T> {
    AsyncIterator<T> iterator();
    CompletableFuture<List<T>> asList();
}

interface AsyncIterator<T> extends Iterator<T> {
    CompletableFuture<Boolean> onHasNext();
    boolean hasNext();
    T next();
}

interface CloseableAsyncIterator<T> extends AsyncIterator<T>, AutoCloseable {
    void close();
}

class AsyncUtil {
    static <T> CompletableFuture<T> applySafely(Function<?, T> func, Object arg);
    static CompletableFuture<Void> whileTrue(
        Supplier<CompletableFuture<Boolean>> condition, Executor e
    );
}

Important Notes

Async Iteration Benefits

  • Non-blocking range queries
  • Memory efficient (batch fetching)
  • Better throughput for large ranges
  • Enables concurrent processing

AsyncIterator Behavior

  • hasNext() blocks until result available
  • onHasNext() returns immediately with future
  • next() must be called after hasNext()
  • Iterator fetches batches automatically

CompletableFuture Best Practices

  • Use thenCompose() for chaining async operations
  • Use thenApply() for transforming results
  • Use thenAccept() for side effects
  • Handle exceptions with exceptionally()

Executor Selection

  • FDB.DEFAULT_EXECUTOR: Default daemon thread pool
  • Custom executor: Control thread behavior
  • Async callbacks run on specified executor
  • Don't shut down executor while DB active

Backpressure Control

  • AsyncIterator provides natural backpressure
  • Batch processing prevents memory exhaustion
  • onHasNext() controls fetch rate
  • asList() fetches all (use cautiously)

Resource Management

  • Close CloseableAsyncIterator when done
  • Closing cancels pending fetches
  • Use try-with-resources for safety
  • Don't leak iterators

Performance Tips

  • Use WANT_ALL streaming mode for small ranges
  • Use ITERATOR mode for large ranges
  • Batch processing improves throughput
  • Parallel reads with CompletableFuture.allOf()

Thread Safety

  • AsyncIterator is NOT thread-safe
  • Each thread needs own iterator
  • CompletableFuture callbacks are thread-safe
  • Executor controls callback threads