Comprehensive usage examples for common FoundationDB patterns.
import fdb
import fdb.tuple as fdb_tuple
fdb.api_version(740)
db = fdb.open()
# Create directory structure
users_dir = fdb.directory.create_or_open(db, ('app', 'users'))
@fdb.transactional
def create_user(tr, user_id, name, email):
"""Create a new user."""
user_subspace = users_dir.subspace((user_id,))
# Check if user exists
if tr[user_subspace.pack(('name',))]:
raise ValueError(f"User {user_id} already exists")
# Create user
tr[user_subspace.pack(('name',))] = name.encode()
tr[user_subspace.pack(('email',))] = email.encode()
# Increment user count atomically
import struct
counter_key = users_dir.pack(('_count',))
tr.add(counter_key, struct.pack('<q', 1))
@fdb.transactional
def get_user(tr, user_id):
"""Get user information."""
user_subspace = users_dir.subspace((user_id,))
name = tr[user_subspace.pack(('name',))]
email = tr[user_subspace.pack(('email',))]
if not name:
return None
return {
'id': user_id,
'name': name.decode(),
'email': email.decode()
}
@fdb.transactional
def list_users(tr):
"""List all users."""
users = []
begin, end = users_dir.range()
for kv in tr.get_range(begin, end):
key_tuple = users_dir.unpack(kv.key)
if len(key_tuple) >= 1 and isinstance(key_tuple[0], int):
user_id = key_tuple[0]
if user_id not in [u['id'] for u in users]:
user = get_user(tr, user_id)
if user:
users.append(user)
return users
# Usage
create_user(db, 1, 'Alice', 'alice@example.com')
create_user(db, 2, 'Bob', 'bob@example.com')
user = get_user(db, 1)
all_users = list_users(db)import fdb
import struct
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def increment_counter(tr, counter_key, amount=1):
"""Atomically increment a counter."""
# Use atomic add operation (no read-modify-write)
tr.add(counter_key, struct.pack('<q', amount))
@fdb.transactional
def get_counter(tr, counter_key):
"""Get counter value."""
value = tr[counter_key]
if value:
return struct.unpack('<q', value)[0]
return 0
# Usage
counter_key = b'global:counter'
increment_counter(db, counter_key, 5)
increment_counter(db, counter_key, 3)
count = get_counter(db, counter_key) # 8import fdb
import fdb.tuple as fdb_tuple
fdb.api_version(740)
db = fdb.open()
events_dir = fdb.directory.create_or_open(db, ('app', 'events'))
@fdb.transactional
def log_event(tr, event_type, data):
"""Log event with versionstamp for ordering."""
# Create incomplete versionstamp
vs = fdb_tuple.Versionstamp.incomplete()
# Pack key with versionstamp
key = events_dir.pack_with_versionstamp((event_type, vs))
# Set versionstamped key
tr.set_versionstamped_key(key, data.encode())
# Get versionstamp after commit
return tr.get_versionstamp()
# Usage
tr = db.create_transaction()
vs_future = log_event(tr, 'user_action', 'User logged in')
tr.commit().wait()
versionstamp = vs_future.wait()import fdb
import fdb.tenant_management
fdb.api_version(740)
db = fdb.open()
# Create tenant
fdb.tenant_management.create_tenant(db, b'company_a')
fdb.tenant_management.create_tenant(db, b'company_b')
# Open tenant and use
tenant_a = db.open_tenant(b'company_a')
tenant_b = db.open_tenant(b'company_b')
# Each tenant has isolated namespace
@fdb.transactional
def set_tenant_config(tr, config_key, config_value):
tr[config_key] = config_value.encode()
# Set config for tenant A
set_tenant_config(tenant_a, b'config:theme', 'dark')
# Set config for tenant B (isolated)
set_tenant_config(tenant_b, b'config:theme', 'light')import fdb
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def set_watch(tr, key):
"""Set a watch on a key."""
return tr.watch(key)
# Set up watch
tr = db.create_transaction()
watch_future = set_watch(tr, b'config:reload')
tr.commit().wait()
# Watch is now active
# When key changes, watch_future.wait() will return
print("Waiting for config change...")
watch_future.wait()
print("Config key was modified!")import fdb
fdb.api_version(740)
db = fdb.open()
def process_range_parallel(db, begin_key, end_key, chunk_size_bytes):
"""Process large range in parallel chunks."""
tr = db.create_transaction()
# Get split points
split_points = tr.get_range_split_points(begin_key, end_key, chunk_size_bytes).wait()
# Process each chunk
ranges = []
for i in range(len(split_points) - 1):
ranges.append((split_points[i], split_points[i + 1]))
# Process ranges in parallel (using threading or async)
results = []
for begin, end in ranges:
@fdb.transactional
def process_chunk(tr):
chunk_results = []
for kv in tr.get_range(begin, end):
# Process each key-value pair
chunk_results.append(process_item(kv))
return chunk_results
results.extend(process_chunk(db))
return resultsimport fdb
fdb.api_version(740)
db = fdb.open()
@fdb.transactional
def get_analytics(tr):
"""Perform analytics without read conflicts."""
# Use snapshot reads for non-conflicting analytics
total_users = 0
total_events = 0
# Snapshot reads don't add read conflicts
users_begin, users_end = users_dir.range()
for kv in tr.snapshot.get_range(users_begin, users_end):
total_users += 1
events_begin, events_end = events_dir.range()
for kv in tr.snapshot.get_range(events_begin, events_end):
total_events += 1
return {
'total_users': total_users,
'total_events': total_events
}
# Analytics won't conflict with writes
stats = get_analytics(db)See Edge Cases for advanced scenarios and error handling patterns.