or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

examples

edge-cases.mdreal-world-scenarios.md
index.md
tile.json

real-world-scenarios.mddocs/examples/

Real-World Scenarios

Comprehensive usage examples for common FoundationDB patterns.

User Management System

Python Example

import fdb
import fdb.tuple as fdb_tuple

fdb.api_version(740)
db = fdb.open()

# Create directory structure
users_dir = fdb.directory.create_or_open(db, ('app', 'users'))

@fdb.transactional
def create_user(tr, user_id, name, email):
    """Create a new user."""
    user_subspace = users_dir.subspace((user_id,))

    # Check if user exists
    if tr[user_subspace.pack(('name',))]:
        raise ValueError(f"User {user_id} already exists")

    # Create user
    tr[user_subspace.pack(('name',))] = name.encode()
    tr[user_subspace.pack(('email',))] = email.encode()

    # Increment user count atomically
    import struct
    counter_key = users_dir.pack(('_count',))
    tr.add(counter_key, struct.pack('<q', 1))

@fdb.transactional
def get_user(tr, user_id):
    """Get user information."""
    user_subspace = users_dir.subspace((user_id,))

    name = tr[user_subspace.pack(('name',))]
    email = tr[user_subspace.pack(('email',))]

    if not name:
        return None

    return {
        'id': user_id,
        'name': name.decode(),
        'email': email.decode()
    }

@fdb.transactional
def list_users(tr):
    """List all users."""
    users = []
    begin, end = users_dir.range()
    for kv in tr.get_range(begin, end):
        key_tuple = users_dir.unpack(kv.key)
        if len(key_tuple) >= 1 and isinstance(key_tuple[0], int):
            user_id = key_tuple[0]
            if user_id not in [u['id'] for u in users]:
                user = get_user(tr, user_id)
                if user:
                    users.append(user)
    return users

# Usage
create_user(db, 1, 'Alice', 'alice@example.com')
create_user(db, 2, 'Bob', 'bob@example.com')
user = get_user(db, 1)
all_users = list_users(db)

Counter with Atomic Operations

Python Example

import fdb
import struct

fdb.api_version(740)
db = fdb.open()

@fdb.transactional
def increment_counter(tr, counter_key, amount=1):
    """Atomically increment a counter."""
    # Use atomic add operation (no read-modify-write)
    tr.add(counter_key, struct.pack('<q', amount))

@fdb.transactional
def get_counter(tr, counter_key):
    """Get counter value."""
    value = tr[counter_key]
    if value:
        return struct.unpack('<q', value)[0]
    return 0

# Usage
counter_key = b'global:counter'
increment_counter(db, counter_key, 5)
increment_counter(db, counter_key, 3)
count = get_counter(db, counter_key)  # 8

Event Logging with Versionstamps

Python Example

import fdb
import fdb.tuple as fdb_tuple

fdb.api_version(740)
db = fdb.open()

events_dir = fdb.directory.create_or_open(db, ('app', 'events'))

@fdb.transactional
def log_event(tr, event_type, data):
    """Log event with versionstamp for ordering."""
    # Create incomplete versionstamp
    vs = fdb_tuple.Versionstamp.incomplete()
    
    # Pack key with versionstamp
    key = events_dir.pack_with_versionstamp((event_type, vs))
    
    # Set versionstamped key
    tr.set_versionstamped_key(key, data.encode())
    
    # Get versionstamp after commit
    return tr.get_versionstamp()

# Usage
tr = db.create_transaction()
vs_future = log_event(tr, 'user_action', 'User logged in')
tr.commit().wait()
versionstamp = vs_future.wait()

Multi-Tenant Application

Python Example

import fdb
import fdb.tenant_management

fdb.api_version(740)
db = fdb.open()

# Create tenant
fdb.tenant_management.create_tenant(db, b'company_a')
fdb.tenant_management.create_tenant(db, b'company_b')

# Open tenant and use
tenant_a = db.open_tenant(b'company_a')
tenant_b = db.open_tenant(b'company_b')

# Each tenant has isolated namespace
@fdb.transactional
def set_tenant_config(tr, config_key, config_value):
    tr[config_key] = config_value.encode()

# Set config for tenant A
set_tenant_config(tenant_a, b'config:theme', 'dark')

# Set config for tenant B (isolated)
set_tenant_config(tenant_b, b'config:theme', 'light')

Watch-Based Coordination

Python Example

import fdb

fdb.api_version(740)
db = fdb.open()

@fdb.transactional
def set_watch(tr, key):
    """Set a watch on a key."""
    return tr.watch(key)

# Set up watch
tr = db.create_transaction()
watch_future = set_watch(tr, b'config:reload')
tr.commit().wait()

# Watch is now active
# When key changes, watch_future.wait() will return
print("Waiting for config change...")
watch_future.wait()
print("Config key was modified!")

Batch Processing with Range Splits

Python Example

import fdb

fdb.api_version(740)
db = fdb.open()

def process_range_parallel(db, begin_key, end_key, chunk_size_bytes):
    """Process large range in parallel chunks."""
    tr = db.create_transaction()
    
    # Get split points
    split_points = tr.get_range_split_points(begin_key, end_key, chunk_size_bytes).wait()
    
    # Process each chunk
    ranges = []
    for i in range(len(split_points) - 1):
        ranges.append((split_points[i], split_points[i + 1]))
    
    # Process ranges in parallel (using threading or async)
    results = []
    for begin, end in ranges:
        @fdb.transactional
        def process_chunk(tr):
            chunk_results = []
            for kv in tr.get_range(begin, end):
                # Process each key-value pair
                chunk_results.append(process_item(kv))
            return chunk_results
        
        results.extend(process_chunk(db))
    
    return results

Snapshot Reads for Analytics

Python Example

import fdb

fdb.api_version(740)
db = fdb.open()

@fdb.transactional
def get_analytics(tr):
    """Perform analytics without read conflicts."""
    # Use snapshot reads for non-conflicting analytics
    total_users = 0
    total_events = 0
    
    # Snapshot reads don't add read conflicts
    users_begin, users_end = users_dir.range()
    for kv in tr.snapshot.get_range(users_begin, users_end):
        total_users += 1
    
    events_begin, events_end = events_dir.range()
    for kv in tr.snapshot.get_range(events_begin, events_end):
        total_events += 1
    
    return {
        'total_users': total_users,
        'total_events': total_events
    }

# Analytics won't conflict with writes
stats = get_analytics(db)

See Edge Cases for advanced scenarios and error handling patterns.