Advanced FoundationDB patterns and edge case handling.
import fdb
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def long_operation(tr):
"""Handle transaction timeout for long operations."""
# Set longer timeout
tr.options.set_timeout(10000) # 10 seconds
# Break into smaller operations if needed
for i in range(1000):
tr[b'key' + str(i).encode()] = b'value'
# Check approximate size periodically
if tr.get_approximate_size().wait() > 8000000: # 8 MB
# Transaction too large, commit and continue
raise fdb.FDBError(1020) # Force retry with new transaction
# Alternative: Break into multiple transactions
def process_large_dataset(db, items):
"""Process large dataset across multiple transactions."""
batch_size = 100
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
@fdb.transactional
def process_batch(tr):
for item in batch:
tr[item.key] = item.value
process_batch(db)import fdb
fdb.api_version(740)
db = fdb.open()
def idempotent_operation(db, key, value, operation_id):
"""Handle commit_unknown_result with idempotent operations."""
@fdb.transactional
def set_with_id(tr):
# Check if operation already completed
existing_op_id = tr.get(key + b':op_id')
if existing_op_id and existing_op_id.wait() == operation_id.encode():
return True # Already completed
# Perform operation
tr[key] = value
tr[key + b':op_id'] = operation_id.encode()
return False
try:
already_done = set_with_id(db, key, value, operation_id)
if already_done:
print("Operation already completed")
except fdb.FDBError as e:
if e.code == 1021: # commit_unknown_result
# Check if operation succeeded
@fdb.transactional
def check_result(tr):
return tr.get(key + b':op_id').wait() == operation_id.encode()
if check_result(db):
print("Operation succeeded")
else:
# Retry
set_with_id(db, key, value, operation_id)import fdb
import fdb.tuple as fdb_tuple
fdb.api_version(740)
db = fdb.open()
def create_ordered_sequence(db, prefix, items):
"""Create ordered sequence using versionstamps."""
results = []
for item in items:
tr = db.create_transaction()
# Create incomplete versionstamp
vs = fdb_tuple.Versionstamp.incomplete()
key = fdb_tuple.pack_with_versionstamp((prefix, vs))
# Set versionstamped key
tr.set_versionstamped_key(key, item.encode())
# Get versionstamp future
vs_future = tr.get_versionstamp()
try:
tr.commit().wait()
# Wait for versionstamp to be available
versionstamp = vs_future.wait()
results.append((versionstamp, item))
except fdb.FDBError as e:
if e.code == 1021: # commit_unknown_result
# Check if key exists with versionstamp
# Retry if needed
pass
return resultsimport fdb
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def safe_range_read(tr, begin_key, end_key):
"""Handle empty ranges safely."""
if begin_key >= end_key:
return [] # Empty range
results = []
for kv in tr.get_range(begin_key, end_key, limit=1000):
results.append((kv.key, kv.value))
return resultsimport fdb
fdb.api_version(740)
db = fdb.open()
MAX_KEY_SIZE = 10000
MAX_VALUE_SIZE = 100000
def safe_set(tr, key, value):
"""Set key-value with size validation."""
if len(key) > MAX_KEY_SIZE:
raise ValueError(f"Key too large: {len(key)} > {MAX_KEY_SIZE}")
if len(value) > MAX_VALUE_SIZE:
raise ValueError(f"Value too large: {len(value)} > {MAX_VALUE_SIZE}")
tr[key] = value
# For large values, split into chunks
def set_large_value(tr, base_key, large_value, chunk_size=90000):
"""Split large value into chunks."""
chunks = [large_value[i:i+chunk_size]
for i in range(0, len(large_value), chunk_size)]
# Store chunk count
tr[base_key + b':count'] = str(len(chunks)).encode()
# Store chunks
for i, chunk in enumerate(chunks):
tr[base_key + b':chunk:' + str(i).encode()] = chunkimport fdb
import fdb.tenant_management
fdb.api_version(740)
db = fdb.open()
def get_or_create_tenant(db, tenant_name):
"""Get tenant or create if doesn't exist."""
try:
tenant = db.open_tenant(tenant_name)
return tenant
except fdb.FDBError as e:
if e.code == 2131: # tenant_not_found
# Create tenant
fdb.tenant_management.create_tenant(db, tenant_name)
return db.open_tenant(tenant_name)
else:
raise
# Usage
tenant = get_or_create_tenant(db, b'company_a')import fdb
fdb.api_version(740)
db = fdb.open()
def safe_directory_operations(db, path):
"""Handle directory operations with error checking."""
try:
# Try to open directory
directory = fdb.directory.open(db, path)
return directory
except fdb.FDBError:
# Directory doesn't exist, create it
try:
directory = fdb.directory.create(db, path)
return directory
except fdb.FDBError as e:
if e.code == 2010: # directory_already_exists
# Race condition: directory created between open and create
return fdb.directory.open(db, path)
raise
# Handle directory removal with children
def remove_directory_safe(db, path):
"""Remove directory only if empty."""
@fdb.transactional
def check_and_remove(tr):
# Check if directory has children
children = fdb.directory.list(tr, path)
if children:
raise ValueError(f"Directory {path} has children: {children}")
# Remove directory
fdb.directory.remove(tr, path)
try:
check_and_remove(db, path)
except ValueError as e:
print(f"Cannot remove: {e}")import fdb
fdb.api_version(740)
db = fdb.open()
def watch_with_timeout(db, key, timeout_seconds=30):
"""Watch key with timeout."""
import time
tr = db.create_transaction()
watch_future = tr.watch(key)
tr.commit().wait()
start_time = time.time()
while not watch_future.is_ready():
if time.time() - start_time > timeout_seconds:
watch_future.cancel()
raise TimeoutError("Watch timeout")
time.sleep(0.1)
watch_future.wait()
return Trueimport fdb
fdb.api_version(740)
db = fdb.open()
def consistent_snapshot_read(db, key):
"""Read from consistent snapshot version."""
tr1 = db.create_transaction()
read_version = tr1.get_read_version().wait()
# Use same read version for snapshot
tr2 = db.create_transaction()
tr2.set_read_version(read_version)
# Snapshot read at specific version
value = tr2.snapshot.get(key).wait()
return valueimport fdb
fdb.api_version(740)
db = fdb.open()
def robust_operation(db, operation_func, max_retries=10):
"""Robust operation with explicit retry handling."""
for attempt in range(max_retries):
try:
return operation_func(db)
except fdb.FDBError as e:
if e.code == 1020: # not_committed
if attempt < max_retries - 1:
continue # Retry
else:
raise
elif e.code == 1021: # commit_unknown_result
# Check if operation succeeded
# Implement idempotent check
raise
else:
# Non-retryable error
raiseSee API Reference for detailed error codes and handling patterns.