Complete reference for the Python binding of FoundationDB, a distributed transactional key-value database with ACID guarantees.
Package: fdb
Location: bindings/python/fdb/
Entry Point: fdb/__init__.py
Supported API Versions: 13-740
Current Version: 7.4.5
pip install foundationdbimport fdbBefore using any FoundationDB functionality, you must select an API version. This ensures compatibility between your application and the FoundationDB client library.
fdb.api_version(version: int) -> NoneSelect the FoundationDB API version. Required first call before any other operations.
Parameters:
version: API version number (13-740 for version 7.4.5)Raises:
FDBError: If version is invalid or API version already selectedExample:
import fdb
fdb.api_version(740)fdb.get_api_version() -> intGet the currently selected API version.
Returns: Selected API version number
fdb.is_api_version_selected() -> boolCheck if API version has been selected.
Returns: True if version selected, False otherwise
fdb.init(event_model: Optional[str] = None) -> NoneInitialize the FoundationDB network thread.
Parameters:
event_model: Event model for async operations (optional, for advanced use)Note: This is called automatically by fdb.open() if not already initialized. Explicit calls are only needed for advanced network configuration.
fdb.open(cluster_file: Optional[str] = None, event_model: Optional[str] = None) -> DatabaseOpen a connection to the FoundationDB database.
Parameters:
cluster_file: Path to cluster file. If None, uses default location or FDB_CLUSTER_FILE environment variableevent_model: Event model for async operations (optional, for advanced use)Returns: Database object for performing operations
Example:
import fdb
fdb.api_version(740)
# Open default database
db = fdb.open()
# Open with specific cluster file
db = fdb.open("/etc/foundationdb/fdb.cluster")The Database class represents a connection to FoundationDB and provides methods for creating transactions and performing transactional operations.
Module: fdb.impl
Database.create_transaction() -> TransactionCreate a new transaction.
Returns: New Transaction object
Example:
tr = db.create_transaction()
try:
tr.set(b'key', b'value')
tr.commit().wait()
except fdb.FDBError as e:
print(f"Error: {e}")The Database class does not have an explicit transact() method. Instead, use the @fdb.transactional decorator (see Decorator section below) to create functions that automatically retry on retryable errors. The Database class provides direct transactional wrapper methods like get(), set(), clear(), etc. that handle transactions automatically.
Example with Direct Methods:
# Direct transactional methods
value = db.get(b'mykey') # Automatically wrapped in transaction
db.set(b'mykey', b'myvalue') # Automatically wrapped in transaction
# Or use @transactional decorator
@fdb.transactional
def increment_counter(tr, key):
value = tr[key]
if value:
count = int(value) + 1
else:
count = 1
tr[key] = str(count).encode()
return count
# Call with Database as first argument
result = increment_counter(db, b'counter')Database.get(key: bytes) -> Optional[bytes]Get a value (transactional wrapper with automatic retry).
Parameters:
key: Key to readReturns: Value bytes or None if key doesn't exist
Database.get_key(key_selector: KeySelector) -> bytesResolve a key selector (transactional wrapper).
Parameters:
key_selector: KeySelector objectReturns: Resolved key bytes
Database.get_range(
begin: Union[bytes, KeySelector],
end: Union[bytes, KeySelector],
limit: int = 0,
reverse: bool = False,
streaming_mode: int = StreamingMode.want_all
) -> List[KeyValue]Read a range of key-value pairs (transactional wrapper).
Parameters:
begin: Start key or key selector (inclusive for keys, exclusive for selectors)end: End key or key selector (exclusive)limit: Maximum number of results (0 = no limit)reverse: Read in reverse orderstreaming_mode: Streaming mode constant (default: StreamingMode.want_all)Returns: List of KeyValue objects
Database.get_range_startswith(
prefix: bytes,
limit: int = 0,
reverse: bool = False,
streaming_mode: int = StreamingMode.want_all
) -> List[KeyValue]Read all keys with a given prefix (transactional wrapper).
Parameters:
prefix: Key prefixlimit: Maximum number of resultsreverse: Read in reverse orderstreaming_mode: Streaming mode constant (default: StreamingMode.want_all)Returns: List of KeyValue objects
Database.set(key: bytes, value: bytes) -> NoneSet a key-value pair (transactional wrapper).
Parameters:
key: Key bytesvalue: Value bytesDatabase.clear(key: bytes) -> NoneDelete a key (transactional wrapper).
Parameters:
key: Key to deleteDatabase.clear_range(begin: bytes, end: bytes) -> NoneDelete a range of keys (transactional wrapper).
Parameters:
begin: Start key (inclusive)end: End key (exclusive)Database.clear_range_startswith(prefix: bytes) -> NoneDelete all keys with a given prefix (transactional wrapper).
Parameters:
prefix: Key prefixDatabase.open_tenant(name: bytes) -> TenantOpen a tenant connection for multi-tenant operations.
Parameters:
name: Tenant name as bytesReturns: Tenant object
Example:
tenant = db.open_tenant(b'tenant_name')
tenant.set(b'key', b'value')Note: Locality operations are provided through the fdb.locality module, not as Database methods. See the Locality Module section below for details.
Database.get_and_watch(key: bytes) -> Tuple[Optional[bytes], Future]Get a value and create a watch in a single operation (transactional wrapper).
Parameters:
key: Key to read and watchReturns: Tuple of (value, watch_future) where value is the current value (or None) and watch_future triggers when key changes
Example:
value, watch = db.get_and_watch(b'config:reload')
print(f"Current value: {value}")
watch.wait() # Blocks until key changesDatabase.set_and_watch(key: bytes, value: bytes) -> FutureSet a value and create a watch in a single operation (transactional wrapper).
Parameters:
key: Key to write and watchvalue: Value to writeReturns: Future that triggers when key changes
Database.clear_and_watch(key: bytes) -> FutureClear a key and create a watch in a single operation (transactional wrapper).
Parameters:
key: Key to clear and watchReturns: Future that triggers when key changes
Database.get_client_status() -> FutureGet client status information including connection status and performance metrics.
Returns: Future that resolves to a JSON string containing client status data
Example:
import json
status_json = db.get_client_status().wait()
status = json.loads(status_json)
print(f"Connected: {status.get('connected', False)}")Database.options -> DatabaseOptionsDatabase options object for configuration.
Example:
db.options.set_transaction_timeout(5000) # 5 second timeout
db.options.set_transaction_retry_limit(100)The Tenant class provides a scoped view of the database for multi-tenant applications. All operations are isolated to the tenant's key space.
Module: fdb.impl
Tenant.create_transaction() -> TransactionCreate a new transaction in the tenant context.
Returns: New Transaction object scoped to this tenant
The Tenant class does not have an explicit transact() method. Instead, use the @fdb.transactional decorator to create functions that automatically retry on retryable errors. The Tenant class provides direct transactional wrapper methods like get(), set(), clear(), etc. that handle transactions automatically within the tenant's namespace.
Example:
# Direct transactional methods
value = tenant.get(b'mykey') # Automatically wrapped in transaction
tenant.set(b'mykey', b'myvalue') # Automatically wrapped in transaction
# Or use @transactional decorator
@fdb.transactional
def update_tenant_data(tr, key, value):
tr[key] = value
return tr[key]
# Call with Tenant as first argument
result = update_tenant_data(tenant, b'mykey', b'myvalue')The Tenant class provides the same direct operation methods as Database:
Tenant.get(key: bytes) -> Optional[bytes]
Tenant.get_key(key_selector: KeySelector) -> bytes
Tenant.get_range(begin, end, limit=0, reverse=False, streaming_mode=StreamingMode.iterator) -> List[KeyValue]
Tenant.set(key: bytes, value: bytes) -> None
Tenant.clear(key: bytes) -> None
Tenant.clear_range(begin: bytes, end: bytes) -> NoneAll operations are scoped to the tenant's key space.
Tenant.get_id() -> FutureInt64Get the unique numeric identifier for the tenant.
Returns: FutureInt64 that resolves to the tenant ID
Example:
tenant = db.open_tenant(b'tenant_name')
tenant_id = tenant.get_id().wait()
print(f"Tenant ID: {tenant_id}")Tenant.list_blobbified_ranges(begin: bytes, end: bytes, limit: int) -> FutureKeyValueArrayList blobbified (blob storage) ranges within the tenant.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)limit: Maximum number of ranges to returnReturns: FutureKeyValueArray that resolves to a list of key-value pairs representing blobbified ranges
Note: This is used with blob granule feature for storing historical data in blob storage. You must call .wait() on the returned future to get the actual list.
The Transaction class represents a single ACID transaction. Transactions provide serializable isolation by default.
Module: fdb.impl
Transaction.get(key: bytes) -> FutureRead a single key value.
Parameters:
key: Key to readReturns: Future that resolves to value bytes or None if key doesn't exist
Example:
tr = db.create_transaction()
future = tr.get(b'mykey')
value = future.wait() # Block until result available
if value:
print(f"Value: {value.decode()}")Transaction.get_key(key_selector: KeySelector) -> FutureResolve a key selector to an actual key.
Parameters:
key_selector: KeySelector object specifying selection criteriaReturns: Future that resolves to key bytes
Transaction.get_range(
begin: Union[bytes, KeySelector],
end: Union[bytes, KeySelector],
limit: int = 0,
reverse: bool = False,
streaming_mode: int = StreamingMode.iterator
) -> FDBRangeRead a range of key-value pairs.
Parameters:
begin: Start key or key selector (inclusive for keys)end: End key or key selector (exclusive)limit: Maximum number of results (0 = no limit)reverse: Read in reverse orderstreaming_mode: Streaming mode constant (see StreamingMode)Returns: FDBRange iterator for lazy iteration
Example:
tr = db.create_transaction()
# Read all keys from 'user/' to 'user0'
for kv in tr.get_range(b'user/', b'user0'):
print(f"{kv.key}: {kv.value}")Transaction.get_range_startswith(
prefix: bytes,
limit: int = 0,
reverse: bool = False,
streaming_mode: int = StreamingMode.iterator
) -> FDBRangeRead all keys with a given prefix.
Parameters:
prefix: Key prefixlimit: Maximum number of resultsreverse: Read in reverse orderstreaming_mode: Streaming mode constantReturns: FDBRange iterator
Transaction.get_estimated_range_size_bytes(begin: bytes, end: bytes) -> FutureEstimate the storage size of a key range in bytes.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)Returns: Future that resolves to estimated size in bytes (int)
Transaction.get_range_split_points(begin: bytes, end: bytes, chunk_size: int) -> FutureGet suggested split points for parallel processing of a range.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)chunk_size: Target chunk size in bytesReturns: Future that resolves to list of key bytes representing split points
Transaction.set(key: bytes, value: bytes) -> NoneWrite a key-value pair.
Parameters:
key: Key bytes (max 10,000 bytes)value: Value bytes (max 100,000 bytes)Example:
tr = db.create_transaction()
tr.set(b'user:123:name', b'Alice')
tr.set(b'user:123:email', b'alice@example.com')
tr.commit().wait()Transaction.clear(key: bytes) -> NoneDelete a key.
Parameters:
key: Key to deleteTransaction.clear_range(begin: bytes, end: bytes) -> NoneDelete a range of keys.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)Example:
# Delete all keys from 'temp/' to 'temp0'
tr.clear_range(b'temp/', b'temp0')Transaction.clear_range_startswith(prefix: bytes) -> NoneDelete all keys with a given prefix.
Parameters:
prefix: Key prefixAtomic operations perform read-modify-write operations atomically without conflicts.
Transaction.add(key: bytes, param: bytes) -> NoneAtomically add a little-endian integer to a value.
Parameters:
key: Key containing integer valueparam: Little-endian integer to addExample:
import struct
tr.add(b'counter', struct.pack('<q', 1)) # Add 1 to 64-bit counterTransaction.bit_and(key: bytes, param: bytes) -> None
Transaction.bit_or(key: bytes, param: bytes) -> None
Transaction.bit_xor(key: bytes, param: bytes) -> NoneAtomically perform bitwise operations.
Parameters:
key: Key containing byte valueparam: Byte value for operationTransaction.max(key: bytes, param: bytes) -> None
Transaction.min(key: bytes, param: bytes) -> NoneAtomically set key to maximum or minimum of current and new value.
Parameters:
key: Key containing little-endian integerparam: Little-endian integer to compareTransaction.byte_max(key: bytes, param: bytes) -> None
Transaction.byte_min(key: bytes, param: bytes) -> NoneAtomically set key to maximum or minimum using bytewise comparison.
Parameters:
key: Key containing byte valueparam: Byte value to compareTransaction.set_versionstamped_key(key: bytes, param: bytes) -> NoneSet a key-value pair where the key contains an incomplete versionstamp.
Parameters:
key: Key bytes containing 10-byte incomplete versionstamp (all zeros) and 2-byte position suffixparam: Value bytesNote: The versionstamp placeholder will be replaced with the transaction's commit version.
Transaction.set_versionstamped_value(key: bytes, param: bytes) -> NoneSet a key-value pair where the value contains an incomplete versionstamp.
Parameters:
key: Key bytesparam: Value bytes containing 10-byte incomplete versionstamp (all zeros) and 2-byte position suffixTransaction.compare_and_clear(key: bytes, param: bytes) -> NoneAtomically clear a key if its value matches the parameter.
Parameters:
key: Key to potentially clearparam: Value to compare againstTransaction.commit() -> FutureCommit the transaction.
Returns: Future that completes when commit finishes
Example:
tr = db.create_transaction()
tr.set(b'key', b'value')
commit_future = tr.commit()
commit_future.wait() # Block until committedTransaction.on_error(error: Union[FDBError, int]) -> FutureHandle an error with automatic retry logic.
Parameters:
error: FDBError exception or integer error codeReturns: Future that completes after retry delay
Example:
tr = db.create_transaction()
while True:
try:
tr.set(b'key', b'value')
tr.commit().wait()
break
except fdb.FDBError as e:
tr.on_error(e).wait() # Wait for retry delayTransaction.reset() -> NoneReset the transaction to its initial state. All operations are discarded.
Transaction.cancel() -> NoneCancel the transaction. The transaction cannot be used after cancellation.
Transaction.watch(key: bytes) -> FutureCreate a watch that triggers when a key changes.
Parameters:
key: Key to watchReturns: Future that completes when key is modified
Example:
watch_future = tr.watch(b'config:reload')
tr.commit().wait()
# Watch is now active
watch_future.wait() # Block until key changes
print("Config key was modified!")Transaction.get_read_version() -> FutureGet the transaction's read version.
Returns: Future that resolves to read version (int64)
Transaction.set_read_version(version: int) -> NoneSet the transaction's read version for snapshot reads.
Parameters:
version: Read version (int64)Example:
# Read from a specific version
tr1 = db.create_transaction()
version = tr1.get_read_version().wait()
tr2 = db.create_transaction()
tr2.set_read_version(version) # Read at same version as tr1
value = tr2.get(b'key').wait()Transaction.get_committed_version() -> intGet the version at which the transaction was committed.
Returns: Committed version (int64)
Note: Must be called after successful commit.
Transaction.get_versionstamp() -> FutureGet the versionstamp for the transaction.
Returns: Future that resolves to 10-byte versionstamp
Note: Must be called after commit. The future completes when versionstamp is available.
Example:
tr = db.create_transaction()
tr.set(b'key', b'value')
vs_future = tr.get_versionstamp()
tr.commit().wait()
versionstamp = vs_future.wait() # 10-byte versionstamp
print(f"Versionstamp: {versionstamp.hex()}")Transaction.get_approximate_size() -> FutureGet the approximate transaction size in bytes.
Returns: Future that resolves to size in bytes (int64)
Transaction.add_read_conflict_range(begin: bytes, end: bytes) -> NoneManually add a read conflict range.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)Transaction.add_read_conflict_key(key: bytes) -> NoneManually add a read conflict for a single key.
Parameters:
key: Key to add as read conflictTransaction.add_write_conflict_range(begin: bytes, end: bytes) -> NoneManually add a write conflict range.
Parameters:
begin: Start key (inclusive)end: End key (exclusive)Transaction.add_write_conflict_key(key: bytes) -> NoneManually add a write conflict for a single key.
Parameters:
key: Key to add as write conflictTransaction.options -> TransactionOptionsTransaction options object for configuration.
Example:
tr = db.create_transaction()
tr.options.set_timeout(5000) # 5 second timeout
tr.options.set_retry_limit(10)Transaction.snapshot -> TransactionReadGet a snapshot view of the transaction for reads without conflicts.
Returns: TransactionRead interface with read-only operations
Example:
tr = db.create_transaction()
# Snapshot reads don't add read conflicts
value = tr.snapshot.get(b'key').wait()
tr.set(b'other_key', b'value')
tr.commit().wait()Transaction.db -> DatabaseGet the parent database object.
Returns: Database object
The Transaction class supports Python dictionary-style operations:
transaction[key: bytes] -> bytes
transaction[key: bytes] = value: bytes
del transaction[key: bytes]
transaction[begin: bytes:end: bytes] -> FDBRange
del transaction[begin: bytes:end: bytes]Example:
tr = db.create_transaction()
# Get value
value = tr[b'mykey']
# Set value
tr[b'mykey'] = b'myvalue'
# Delete key
del tr[b'mykey']
# Range read
for kv in tr[b'user/':b'user0']:
print(kv.key, kv.value)
# Clear range
del tr[b'temp/':b'temp0']The TransactionRead class provides a read-only interface to transaction operations, used primarily for snapshot reads.
Module: fdb.impl
The TransactionRead interface provides all read methods from Transaction but no write methods:
TransactionRead.get(key: bytes) -> Future
TransactionRead.get_key(key_selector: KeySelector) -> Future
TransactionRead.get_range(...) -> FDBRange
TransactionRead.get_range_startswith(...) -> FDBRange
TransactionRead.get_estimated_range_size_bytes(...) -> Future
TransactionRead.get_range_split_points(...) -> FutureThe Future class represents an asynchronous operation result.
Module: fdb.impl
Future.wait() -> AnyBlock until the future is ready and return the result.
Returns: Result value (type depends on operation)
Raises: FDBError if operation failed
Example:
future = tr.get(b'key')
value = future.wait() # Blocks until readyFuture.is_ready() -> boolCheck if the future is ready without blocking.
Returns: True if ready, False otherwise
Future.block_until_ready() -> NoneBlock until the future is ready without returning the result.
Future.cancel() -> NoneCancel the future operation.
Future.on_ready(callback: Callable) -> NoneRegister a callback to be called when the future completes.
Parameters:
callback: Function to call when ready (receives future as parameter)Example:
def handle_result(future):
try:
value = future.wait()
print(f"Got value: {value}")
except fdb.FDBError as e:
print(f"Error: {e}")
future = tr.get(b'key')
future.on_ready(handle_result)@staticmethod
Future.wait_for_any(*futures) -> intWait for at least one of the given futures to become ready.
Parameters:
*futures: Variable number of Future objects to wait onReturns: Index (0-based) in the parameter list of a ready future
Example:
# Start multiple concurrent reads
future1 = tr.get(b'key1')
future2 = tr.get(b'key2')
future3 = tr.get(b'key3')
# Wait for first one to complete
ready_index = fdb.Future.wait_for_any(future1, future2, future3)
print(f"Future {ready_index} completed first")
# Get the ready future's value
if ready_index == 0:
value = future1.wait()
elif ready_index == 1:
value = future2.wait()
else:
value = future3.wait()The FDBRange class provides lazy iteration over range query results.
Module: fdb.impl
FDBRange.__iter__() -> Iterator[KeyValue]Iterate over key-value pairs in the range.
Returns: Iterator of KeyValue objects
Example:
for kv in tr.get_range(b'user/', b'user0', limit=100):
print(f"Key: {kv.key}, Value: {kv.value}")The KeyValue class represents a key-value pair returned from range queries.
Module: fdb.impl
KeyValue.key -> bytesThe key bytes.
KeyValue.value -> bytesThe value bytes.
Example:
for kv in tr.get_range(b'user/', b'user0'):
user_id = kv.key.decode().split('/')[-1]
user_name = kv.value.decode()
print(f"User {user_id}: {user_name}")The KeySelector class specifies a key selection with offset for range boundaries.
Module: fdb.impl
KeySelector(key: bytes, or_equal: bool, offset: int)Create a key selector.
Parameters:
key: Reference keyor_equal: Include the reference key if it existsoffset: Offset from the reference keyKeySelector.first_greater_than(key: bytes) -> KeySelectorCreate selector for first key strictly greater than the given key.
Parameters:
key: Reference keyReturns: KeySelector object
KeySelector.first_greater_or_equal(key: bytes) -> KeySelectorCreate selector for first key greater than or equal to the given key.
Parameters:
key: Reference keyReturns: KeySelector object
KeySelector.last_less_than(key: bytes) -> KeySelectorCreate selector for last key strictly less than the given key.
Parameters:
key: Reference keyReturns: KeySelector object
KeySelector.last_less_or_equal(key: bytes) -> KeySelectorCreate selector for last key less than or equal to the given key.
Parameters:
key: Reference keyReturns: KeySelector object
KeySelector.__add__(offset: int) -> KeySelector
KeySelector.__sub__(offset: int) -> KeySelectorAdjust the key selector offset.
Parameters:
offset: Offset to add or subtractReturns: New KeySelector with adjusted offset
Example:
# Get range from first key >= 'user/' to first key >= 'user0'
begin = fdb.KeySelector.first_greater_or_equal(b'user/')
end = fdb.KeySelector.first_greater_or_equal(b'user0')
for kv in tr.get_range(begin, end):
print(kv.key, kv.value)
# Use offset adjustment
begin = fdb.KeySelector.first_greater_than(b'user/') - 1 # Includes 'user/' if existsThe FDBError class is the exception type for all FoundationDB errors.
Module: fdb.impl
FDBError.code -> intThe error code.
Example:
try:
tr.commit().wait()
except fdb.FDBError as e:
print(f"Error code: {e.code}")
print(f"Description: {e.description}")
if e.code == 1020: # not_committed
print("Transaction not committed due to conflict")FDBError.description -> strHuman-readable error description.
Options classes provide configuration methods for network, database, tenant, and transaction settings.
Module: fdb.options
NetworkOptions.set_*(...) -> NoneNetwork-level configuration options. Available via fdb.options (must be set before opening database).
Common Methods:
set_trace_enable(path: str) - Enable tracing to directoryset_trace_roll_size(size: int) - Set trace file roll sizeset_client_threads_per_version(count: int) - Set thread pool sizeExample:
import fdb
fdb.api_version(740)
fdb.options.set_trace_enable('/var/log/fdb-trace')
db = fdb.open()DatabaseOptions.set_*(...) -> NoneDatabase-level configuration options.
Common Methods:
set_transaction_timeout(milliseconds: int) - Default transaction timeoutset_transaction_retry_limit(count: int) - Default retry limitset_transaction_max_retry_delay(milliseconds: int) - Maximum retry delayExample:
db.options.set_transaction_timeout(5000) # 5 second default timeoutTransactionOptions.set_*(...) -> NoneTransaction-level configuration options.
Common Methods:
set_timeout(milliseconds: int) - Transaction timeoutset_retry_limit(count: int) - Retry limitset_max_retry_delay(milliseconds: int) - Maximum retry delayset_read_your_writes_disable() - Disable read-your-writesset_read_ahead_disable() - Disable read-aheadset_access_system_keys() - Enable access to system keysset_priority_system_immediate() - Set system immediate priorityset_priority_batch() - Set batch priorityset_causal_read_risky() - Enable causal read risky modeset_snapshot_ryw_enable() - Enable snapshot read-your-writesExample:
tr = db.create_transaction()
tr.options.set_timeout(3000) # 3 second timeout
tr.options.set_priority_batch() # Low priorityThe @transactional decorator provides automatic transaction retry logic.
@fdb.transactional
def function(tr: Transaction, *args, **kwargs) -> AnyDecorator that wraps a function to automatically handle transaction retry.
Parameters:
Transaction, Database, or Tenant as first parameterReturns: Wrapped function that automatically retries on retryable errors
Example:
import fdb
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def transfer_money(tr, from_account, to_account, amount):
"""Transfer money between accounts."""
# Read balances
from_balance_bytes = tr[from_account]
to_balance_bytes = tr[to_account]
if not from_balance_bytes:
raise ValueError("From account does not exist")
if not to_balance_bytes:
raise ValueError("To account does not exist")
from_balance = int(from_balance_bytes)
to_balance = int(to_balance_bytes)
if from_balance < amount:
raise ValueError("Insufficient funds")
# Update balances
tr[from_account] = str(from_balance - amount).encode()
tr[to_account] = str(to_balance + amount).encode()
# Use with Database
transfer_money(db, b'account:alice', b'account:bob', 100)
# Use with Transaction
tr = db.create_transaction()
transfer_money(tr, b'account:alice', b'account:bob', 100)
tr.commit().wait()The tuple layer provides encoding and decoding of Python tuples to byte strings, enabling structured keys.
Module: fdb.tuple
fdb.tuple.pack(t: Tuple, prefix: Optional[bytes] = None) -> bytesEncode a tuple to bytes with optional prefix.
Parameters:
t: Tuple of values to encodeprefix: Optional byte prefix to prepend (default: None)Returns: Encoded byte string
Supported Types:
Nonebytesstr (Unicode)intfloatbooluuid.UUIDVersionstampSingleFloatExample:
import fdb.tuple as fdb_tuple
key = fdb_tuple.pack(('users', 123, 'profile'))
# b'\x01users\x00\x15{\x01profile\x00'
tr[key] = b'{"name": "Alice"}'fdb.tuple.unpack(key: bytes, prefix_len: int = 0) -> TupleDecode bytes to a tuple, removing optional prefix.
Parameters:
key: Encoded byte stringprefix_len: Number of bytes to skip before decoding (int, not prefix bytes)Returns: Decoded tuple
Example:
key = b'\x01users\x00\x15{\x01profile\x00'
items = fdb_tuple.unpack(key)
# ('users', 123, 'profile')fdb.tuple.pack_with_versionstamp(t: Tuple, prefix: bytes = b'') -> bytesPack a tuple containing an incomplete Versionstamp.
Parameters:
t: Tuple containing a Versionstamp with tr_version=Noneprefix: Byte prefix to prependReturns: Encoded byte string with incomplete versionstamp placeholder
Example:
import fdb.tuple as fdb_tuple
vs = fdb_tuple.Versionstamp() # Incomplete versionstamp
key = fdb_tuple.pack_with_versionstamp(('events', vs, 'data'))
tr.set_versionstamped_key(key, b'event_data')fdb.tuple.range(t: Tuple = ()) -> Tuple[bytes, bytes]Get the key range that contains all keys with the given tuple prefix.
Parameters:
t: Tuple prefixReturns: Tuple of (begin_key, end_key) for range
Example:
import fdb.tuple as fdb_tuple
# Get all keys starting with ('users', 123)
begin, end = fdb_tuple.range(('users', 123))
for kv in tr.get_range(begin, end):
print(kv.key, kv.value)fdb.tuple.has_incomplete_versionstamp(t: Tuple) -> boolCheck if a tuple contains an incomplete versionstamp.
Parameters:
t: Tuple to checkReturns: True if tuple contains an incomplete Versionstamp, False otherwise
Example:
import fdb.tuple as fdb_tuple
incomplete_vs = fdb_tuple.Versionstamp.incomplete()
t1 = ('users', incomplete_vs, 'data')
print(fdb_tuple.has_incomplete_versionstamp(t1)) # True
complete_vs = fdb_tuple.Versionstamp(tr_version=b'\x00'*10)
t2 = ('users', complete_vs, 'data')
print(fdb_tuple.has_incomplete_versionstamp(t2)) # Falsefdb.tuple.compare(t1: Tuple, t2: Tuple) -> intCompare two tuples using FoundationDB tuple ordering.
Parameters:
t1: First tuplet2: Second tupleReturns: Negative if t1 < t2, zero if t1 == t2, positive if t1 > t2
Example:
import fdb.tuple as fdb_tuple
result = fdb_tuple.compare(('a', 1), ('a', 2))
# result < 0 because ('a', 1) < ('a', 2)
result = fdb_tuple.compare(('b',), ('a', 999))
# result > 0 because ('b',) > ('a', 999)class fdb.tuple.Versionstamp(tr_version: Optional[bytes] = None, user_version: int = 0)Represents a versionstamp value for tuple encoding.
Parameters:
tr_version: Transaction version bytes (10 bytes) or None for incompleteuser_version: User-specified version (0-65535)Properties:
tr_version: Transaction version bytesuser_version: User version integeris_complete: Whether versionstamp is completeStatic Methods:
Versionstamp.incomplete(user_version: int = 0) -> VersionstampCreate an incomplete versionstamp.
Parameters:
user_version: User version (0-65535)Returns: Incomplete Versionstamp object
Versionstamp.from_bytes(data: bytes) -> VersionstampCreate a versionstamp from raw bytes.
Parameters:
data: 12 bytes (10 bytes transaction version + 2 bytes user version)Returns: Complete Versionstamp object
Methods:
Versionstamp.to_bytes() -> bytesConvert versionstamp to bytes representation.
Returns: 12 bytes (10 bytes transaction version + 2 bytes user version)
versionstamp_instance.completed(tr_version: bytes) -> VersionstampCreate a completed versionstamp from an incomplete one (instance method).
Parameters:
tr_version: Transaction version bytes (10 bytes)Returns: Complete Versionstamp object with the same user_version
Note: This is an instance method, not a static method. Must be called on an existing incomplete Versionstamp object.
Example:
import fdb.tuple as fdb_tuple
# Incomplete versionstamp for use in keys/values
incomplete_vs = fdb_tuple.Versionstamp.incomplete()
# Or: incomplete_vs = fdb_tuple.Versionstamp()
# Complete versionstamp for reading
complete_vs = fdb_tuple.Versionstamp(
tr_version=b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09',
user_version=42
)
# From bytes
vs_bytes = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x00\x2a'
vs = fdb_tuple.Versionstamp.from_bytes(vs_bytes)
# To bytes
raw = vs.to_bytes()
# Complete an incomplete versionstamp
incomplete = fdb_tuple.Versionstamp.incomplete(42)
completed = incomplete.completed(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09')class fdb.tuple.SingleFloat(value: float)Represents a single-precision (32-bit) float for tuple encoding.
Parameters:
value: Float valueNote: By default, Python floats are encoded as double-precision (64-bit). Use SingleFloat to explicitly encode as 32-bit.
Example:
import fdb.tuple as fdb_tuple
# Encode as 32-bit float
key = fdb_tuple.pack(('metrics', fdb_tuple.SingleFloat(3.14)))The Subspace class provides key prefixing and tuple encoding for namespace isolation.
Module: fdb.subspace_impl
class fdb.Subspace(prefixTuple: Tuple = tuple(), rawPrefix: bytes = b"")Create a subspace with tuple or raw byte prefix.
Parameters:
prefixTuple: Tuple to encode as prefixrawPrefix: Raw byte prefix (added to encoded prefixTuple)Example:
import fdb
import fdb.tuple as fdb_tuple
# Create subspace with tuple prefix
users_subspace = fdb.Subspace(prefixTuple=('users',))
# Create subspace with raw byte prefix
raw_subspace = fdb.Subspace(rawPrefix=b'\x01users\x00')
# Create subspace with both
combined_subspace = fdb.Subspace(prefixTuple=('app',), rawPrefix=b'\x00')Subspace.key() -> bytesGet the raw byte prefix of the subspace.
Returns: Prefix bytes
Subspace.pack(items: Tuple = ()) -> bytesPack a tuple with the subspace prefix.
Parameters:
items: Tuple to encodeReturns: Encoded byte string with subspace prefix
Example:
users = fdb.Subspace(('users',))
key = users.pack((123, 'name'))
# Equivalent to fdb.tuple.pack(('users', 123, 'name'))
tr[key] = b'Alice'Subspace.unpack(key: bytes) -> TupleUnpack a key to tuple, removing the subspace prefix.
Parameters:
key: Encoded key with subspace prefixReturns: Decoded tuple without prefix
Raises: ValueError if key is not in subspace
Example:
users = fdb.Subspace(('users',))
key = users.pack((123, 'name'))
items = users.unpack(key)
# (123, 'name')Subspace.pack_with_versionstamp(t: Tuple = ()) -> bytesPack a tuple containing an incomplete versionstamp with the subspace prefix.
Parameters:
t: Tuple containing a Versionstamp with tr_version=NoneReturns: Encoded byte string with subspace prefix and incomplete versionstamp placeholder
Example:
import fdb.tuple as fdb_tuple
events = fdb.Subspace(prefixTuple=('events',))
vs = fdb_tuple.Versionstamp.incomplete()
key = events.pack_with_versionstamp((vs, 'data'))
tr.set_versionstamped_key(key, b'event_data')Subspace.range(items: Tuple = ()) -> sliceGet the key range for a tuple prefix within the subspace.
Parameters:
items: Tuple prefix (relative to subspace)Returns: A slice object representing the range (can be used directly in get_range or unpacked to begin/end keys)
Example:
users = fdb.Subspace(('users',))
# Get all keys for user 123 - use slice directly
for kv in tr.get_range(users.range((123,))):
field = users.unpack(kv.key)[-1]
print(f"Field {field}: {kv.value}")
# Or unpack slice to begin, end keys
key_range = users.range((123,))
begin, end = key_range.start, key_range.stopSubspace.contains(key: bytes) -> boolCheck if a key is within the subspace.
Parameters:
key: Key to checkReturns: True if key starts with subspace prefix
Subspace.subspace(items: Tuple) -> SubspaceCreate a nested subspace.
Parameters:
items: Tuple to append to current prefixReturns: New Subspace object
Example:
users = fdb.Subspace(('users',))
user_123 = users.subspace((123,))
# These are equivalent:
key1 = users.pack((123, 'name'))
key2 = user_123.pack(('name',))
# key1 == key2The directory layer provides hierarchical namespace management with automatic prefix allocation.
Module: fdb.directory_impl
class fdb.directory.DirectoryLayer(
node_subspace: Optional[Subspace] = None,
content_subspace: Optional[Subspace] = None,
allow_manual_prefixes: bool = False
)Create a directory layer instance.
Parameters:
node_subspace: Subspace for directory metadata (default: system subspace)content_subspace: Subspace for directory content (default: empty prefix)allow_manual_prefixes: Allow manually specified prefixesDirectoryLayer.create_or_open(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]],
layer: Optional[bytes] = None
) -> DirectorySubspaceCreate or open a directory.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory path (string or list of strings)layer: Layer identifier for application-specific directory types (optional)Returns: DirectorySubspace object
Note: The internal implementation supports additional parameters (prefix, allow_create, allow_open), but these are not exposed in the public API.
Example:
import fdb
fdb.api_version(740)
db = fdb.open()
# Create or open directory
users_dir = fdb.directory.create_or_open(db, ('app', 'users'))
# Use directory as subspace
tr = db.create_transaction()
key = users_dir.pack((123, 'name'))
tr[key] = b'Alice'
tr.commit().wait()DirectoryLayer.create(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]],
layer: bytes = b'',
prefix: Optional[bytes] = None
) -> DirectorySubspaceCreate a new directory.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory pathlayer: Layer identifierprefix: Manual prefixReturns: DirectorySubspace object
Raises: Error if directory already exists
DirectoryLayer.open(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]],
layer: bytes = b''
) -> DirectorySubspaceOpen an existing directory.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory pathlayer: Expected layer identifierReturns: DirectorySubspace object
Raises: Error if directory doesn't exist or layer mismatch
DirectoryLayer.exists(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]] = ()
) -> List[str]List subdirectories at the given path (functions as exists check).
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory path (empty tuple for root)Returns: List of subdirectory names (empty list if path doesn't exist)
Note: This method is actually implemented as a wrapper around list() and returns subdirectory names rather than a boolean. To check if a directory exists, check if the returned list is not None.
DirectoryLayer.list(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]] = ()
) -> List[str]List subdirectories.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory path (empty for root)Returns: List of subdirectory names
Example:
# Create directory structure
fdb.directory.create_or_open(db, ('app', 'users'))
fdb.directory.create_or_open(db, ('app', 'posts'))
# List subdirectories
subdirs = fdb.directory.list(db, ('app',))
# ['users', 'posts']DirectoryLayer.move(
db_or_tr: Union[Database, Tenant, Transaction],
old_path: Union[str, List[str]],
new_path: Union[str, List[str]]
) -> DirectorySubspaceMove a directory to a new path.
Parameters:
db_or_tr: Database, Tenant, or Transaction to useold_path: Current directory pathnew_path: New directory pathReturns: DirectorySubspace object at new path
Raises: Error if old path doesn't exist or new path already exists
DirectoryLayer.remove(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]]
) -> boolRemove a directory and all its contents.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory pathReturns: True if removed
Raises: Error if directory doesn't exist
DirectoryLayer.remove_if_exists(
db_or_tr: Union[Database, Tenant, Transaction],
path: Union[str, List[str]]
) -> boolRemove a directory if it exists.
Parameters:
db_or_tr: Database, Tenant, or Transaction to usepath: Directory pathReturns: True if directory existed and was removed
class fdb.DirectorySubspaceA directory that is also a Subspace. Provides all Subspace methods plus directory-specific methods.
Inherits: Subspace, Directory
DirectorySubspace.get_layer() -> bytesGet the layer identifier for the directory.
Returns: Layer bytes
DirectorySubspace.get_path() -> List[str]Get the directory path.
Returns: List of path components
fdb.directory -> DirectoryLayerDefault directory layer instance using standard subspaces.
Example:
import fdb
fdb.api_version(740)
db = fdb.open()
# Use default directory instance
users = fdb.directory.create_or_open(db, ('users',))
posts = fdb.directory.create_or_open(db, ('posts',))
# Store data in directories
@fdb.transactional
def create_user(tr, user_id, name):
key = users.pack((user_id,))
tr[key] = name.encode()
create_user(db, 123, 'Alice')The locality module provides information about physical data distribution.
Module: fdb.locality
fdb.locality.get_boundary_keys(
db_or_tr: Union[Database, Transaction],
begin: bytes,
end: bytes
) -> Generator[bytes, None, None]Get boundary keys that partition a range across servers.
Parameters:
db_or_tr: Database or Transaction to usebegin: Start key (inclusive)end: End key (exclusive)Returns: Generator that yields boundary key bytes
Example:
import fdb.locality
# Get boundary keys for parallel processing
boundaries = fdb.locality.get_boundary_keys(db, b'', b'\xff')
print(f"Database has {len(boundaries)} partitions")
# Process each partition in parallel
for i in range(len(boundaries) - 1):
begin = boundaries[i]
end = boundaries[i + 1]
# Process range [begin, end)
process_range(db, begin, end)fdb.locality.get_addresses_for_key(
tr: Transaction,
key: bytes
) -> FutureStringArrayGet the network addresses of servers storing a key.
Parameters:
tr: Transaction to usekey: Key to queryReturns: FutureStringArray that resolves to a list of server address strings
Note: This function is decorated with @transactional, so you can also pass a Database or Tenant as the first argument.
Example:
import fdb.locality
# Can be used with Database (transactional wrapper)
addresses = fdb.locality.get_addresses_for_key(db, b'mykey')
print(f"Key stored on servers: {addresses}")
# Or with explicit transaction
tr = db.create_transaction()
addresses_future = fdb.locality.get_addresses_for_key(tr, b'mykey')
addresses = addresses_future.wait()
print(f"Key stored on servers: {addresses}")The tenant management module provides administrative functions for multi-tenancy.
Module: fdb.tenant_management
fdb.tenant_management.create_tenant(
db_or_tr: Union[Database, Transaction],
name: bytes
) -> NoneCreate a new tenant.
Parameters:
db_or_tr: Database or Transaction to usename: Tenant name as bytesRaises: Error if tenant already exists
Example:
import fdb.tenant_management
db = fdb.open()
# Create tenant
fdb.tenant_management.create_tenant(db, b'tenant_a')
# Open tenant and use it
tenant = db.open_tenant(b'tenant_a')
tenant.set(b'key', b'value')fdb.tenant_management.delete_tenant(
db_or_tr: Union[Database, Transaction],
name: bytes
) -> NoneDelete a tenant.
Parameters:
db_or_tr: Database or Transaction to usename: Tenant name as bytesRaises: Error if tenant doesn't exist or is not empty
fdb.tenant_management.list_tenants(
db_or_tr: Union[Database, Transaction],
begin: bytes,
end: bytes,
limit: int
) -> FDBTenantListList tenants in a range.
Parameters:
db_or_tr: Database or Transaction to usebegin: Start tenant name (inclusive)end: End tenant name (exclusive)limit: Maximum number of results (required)Returns: FDBTenantList iterator that yields KeyValue objects where keys are tenant names
Example:
import fdb.tenant_management
# List all tenants
tenant_list = fdb.tenant_management.list_tenants(db, b'', b'\xff', 1000)
for kv in tenant_list:
tenant_name = kv.key.decode()
print(f"Tenant: {tenant_name}")
# List tenants with prefix
app_tenants = fdb.tenant_management.list_tenants(
db,
begin=b'app_',
end=b'app_\xff',
limit=100
)Streaming modes control how range reads fetch data from the database.
Module: fdb
StreamingMode.iterator: intBalanced iteration mode (recommended default). Fetches data in reasonably sized batches.
StreamingMode.want_all: intFetch the entire range in a single request. Use for small ranges.
StreamingMode.exact: intFetch exactly the specified limit. Useful when limit is known to be small.
StreamingMode.small: intHint that range is small. Fetches smaller initial batch.
StreamingMode.medium: intHint that range is medium-sized. Fetches medium initial batch.
StreamingMode.large: intHint that range is large. Fetches larger initial batch.
StreamingMode.serial: intMinimize latency by fetching one result at a time. Use for interactive queries.
Example:
import fdb
# Default (iterator)
for kv in tr.get_range(b'user/', b'user0'):
print(kv.key)
# Want all (small range)
for kv in tr.get_range(b'config/', b'config0', streaming_mode=fdb.StreamingMode.want_all):
print(kv.key)
# Serial (minimize latency)
for kv in tr.get_range(b'user/', b'user0', limit=10, streaming_mode=fdb.StreamingMode.serial):
print(kv.key)
# Process immediately with minimal latencyHere's a comprehensive example demonstrating key Python API features:
import fdb
import fdb.tuple as fdb_tuple
# Initialize
fdb.api_version(740)
db = fdb.open()
# Create directory structure
users_dir = fdb.directory.create_or_open(db, ('app', 'users'))
posts_dir = fdb.directory.create_or_open(db, ('app', 'posts'))
# Transactional function with decorator
@fdb.transactional
def create_user(tr, user_id, name, email):
"""Create a new user."""
user_subspace = users_dir.subspace((user_id,))
# Check if user exists
if tr[user_subspace.pack(('name',))]:
raise ValueError(f"User {user_id} already exists")
# Create user
tr[user_subspace.pack(('name',))] = name.encode()
tr[user_subspace.pack(('email',))] = email.encode()
# Increment user count atomically
import struct
counter_key = users_dir.pack(('_count',))
tr.add(counter_key, struct.pack('<q', 1))
@fdb.transactional
def get_user(tr, user_id):
"""Get user information."""
user_subspace = users_dir.subspace((user_id,))
name = tr[user_subspace.pack(('name',))]
email = tr[user_subspace.pack(('email',))]
if not name:
return None
return {
'id': user_id,
'name': name.decode(),
'email': email.decode()
}
@fdb.transactional
def list_users(tr):
"""List all users."""
users = []
# Iterate over user IDs
begin, end = users_dir.range()
for kv in tr.get_range(begin, end):
key_tuple = users_dir.unpack(kv.key)
if len(key_tuple) >= 1 and isinstance(key_tuple[0], int):
user_id = key_tuple[0]
if user_id not in [u['id'] for u in users]:
user = get_user(tr, user_id)
if user:
users.append(user)
return users
@fdb.transactional
def watch_user_changes(tr, user_id):
"""Set up watch for user changes."""
user_subspace = users_dir.subspace((user_id,))
watch_key = user_subspace.pack(('name',))
return tr.watch(watch_key)
# Use the API
try:
# Create users
create_user(db, 1, 'Alice', 'alice@example.com')
create_user(db, 2, 'Bob', 'bob@example.com')
# Get user
user = get_user(db, 1)
print(f"User: {user}")
# List all users
all_users = list_users(db)
print(f"All users: {all_users}")
# Set up watch
tr = db.create_transaction()
watch_future = watch_user_changes(tr, 1)
tr.commit().wait()
# Watch triggers when user 1's name changes
print("Waiting for changes...")
# watch_future.wait() # Blocks until name changes
# Range operations with slices
tr = db.create_transaction()
begin, end = users_dir.range((1,))
for kv in tr[begin:end]:
field = users_dir.unpack(kv.key)[-1]
value = kv.value.decode()
print(f" {field}: {value}")
# Snapshot reads (no conflicts)
tr = db.create_transaction()
value = tr.snapshot.get(users_dir.pack((1, 'name'))).wait()
print(f"Snapshot read: {value}")
except fdb.FDBError as e:
print(f"FDB Error {e.code}: {e.description}")
except ValueError as e:
print(f"Validation error: {e}")Always use the @transactional decorator or db.transact() method for automatic retry:
# Good
@fdb.transactional
def my_operation(tr):
tr.set(b'key', b'value')
my_operation(db)
# Avoid (manual retry required)
tr = db.create_transaction()
tr.set(b'key', b'value')
tr.commit().wait()Use the tuple layer for structured keys:
# Good
import fdb.tuple as fdb_tuple
key = fdb_tuple.pack(('users', user_id, 'profile'))
# Avoid (manual encoding)
key = b'users:' + str(user_id).encode() + b':profile'Use subspaces or directories for namespace isolation:
# Good
users_dir = fdb.directory.create_or_open(db, ('users',))
key = users_dir.pack((user_id,))
# Avoid (no isolation)
key = b'users:' + str(user_id).encode()Use snapshot reads when you don't need conflict checking:
@fdb.transactional
def get_stats(tr):
# Reads don't conflict with writes
count = tr.snapshot.get(b'user_count').wait()
# Write can still conflict
tr.set(b'last_access', b'timestamp')
return countHandle both retryable and non-retryable errors:
@fdb.transactional
def my_operation(tr):
try:
tr.set(b'key', b'value')
except fdb.FDBError as e:
if e.code == 1020: # not_committed
# Transaction will retry automatically
raise
else:
# Non-retryable error
print(f"Fatal error: {e}")
raiseThe Python binding supports API versions 13-740. Key version milestones:
Always use the latest API version for new applications:
fdb.api_version(740)