Universal Python binding for the LMDB 'Lightning' Database
—
LMDB environments can contain multiple named databases for data organization and isolation. This enables logical separation of different data types while maintaining ACID transaction guarantees across all databases within a single environment.
Configure environment to support multiple databases and create named databases with specialized options.
def open(path: str, max_dbs: int = 0, **kwargs) -> Environment:
"""
Open environment with multi-database support.
Parameters:
- path: Environment path
- max_dbs: Maximum number of named databases (0 = default database only)
- **kwargs: Other environment options
Returns:
Environment instance
Note:
max_dbs must be set > 0 to use named databases
"""
class Environment:
def open_db(self, key: bytes = None, txn=None, reverse_key: bool = False,
dupsort: bool = False, create: bool = True, integerkey: bool = False,
integerdup: bool = False, dupfixed: bool = False) -> _Database:
"""
Open or create named database within environment.
Parameters:
- key: Database name (None for default database)
- txn: Transaction to use (temporary read-write transaction created if None)
- reverse_key: Keys stored in reverse byte order
- dupsort: Allow and sort duplicate keys
- create: Create database if it doesn't exist
- integerkey: Keys are native byte-order integers
- integerdup: Duplicate values are native integers (requires dupsort)
- dupfixed: All duplicate values same size (requires dupsort)
Returns:
Database handle for use in transactions
Raises:
Error if max_dbs=0 and key is not None
"""Work with database handles across transactions and manage database metadata.
class _Database:
"""
Database handle representing a named database within an LMDB environment.
Database handles are returned by Environment.open_db() and used to specify
which database to operate on in transactions and cursors. Database handles
are lightweight objects that can be safely passed between transactions.
Note:
Database handles remain valid until the environment is closed.
"""
def flags(self, *args) -> dict:
"""
Get database configuration flags.
Returns:
Dictionary containing database flags:
- reverse_key: Keys stored in reverse order
- dupsort: Duplicate keys allowed and sorted
- integerkey: Keys are native integers
- integerdup: Duplicate values are native integers
- dupfixed: All duplicate values are same size
"""Perform atomic operations across multiple databases within single transactions.
class Transaction:
def get(self, key: bytes, default=None, db=None) -> bytes:
"""
Get value from specific database.
Parameters:
- key: Key to retrieve
- default: Default value if key not found
- db: Database handle (uses transaction default if None)
"""
def put(self, key: bytes, value: bytes, db=None, **kwargs) -> bool:
"""
Store key-value pair in specific database.
Parameters:
- key: Key bytes
- value: Value bytes
- db: Database handle (uses transaction default if None)
- **kwargs: Additional put options
"""
def delete(self, key: bytes, value: bytes = b'', db=None) -> bool:
"""
Delete key from specific database.
Parameters:
- key: Key to delete
- value: Specific value for duplicate keys
- db: Database handle (uses transaction default if None)
"""
def cursor(self, db=None) -> Cursor:
"""
Create cursor for specific database.
Parameters:
- db: Database handle (uses transaction default if None)
"""
def stat(self, db) -> dict:
"""
Get statistics for specific database.
Parameters:
- db: Database handle
Returns:
Database statistics dictionary
"""
def drop(self, db, delete: bool = True) -> None:
"""
Empty or delete specific database.
Parameters:
- db: Database handle
- delete: If True delete database, if False just empty it
Note:
Cannot delete default database (key=None), only empty it
"""import lmdb
# Open environment with support for multiple databases
env = lmdb.open('/path/to/database', max_dbs=10)
# Create different databases for different data types
users_db = env.open_db(b'users')
products_db = env.open_db(b'products')
orders_db = env.open_db(b'orders')
settings_db = env.open_db(b'settings')
# Use default database (no name)
default_db = env.open_db() # or env.open_db(None)
print("Databases created successfully")
env.close()import lmdb
import json
env = lmdb.open('/path/to/database', max_dbs=5)
users_db = env.open_db(b'users')
accounts_db = env.open_db(b'accounts')
transactions_db = env.open_db(b'transactions')
# Atomic money transfer across multiple databases
def transfer_money(from_user: str, to_user: str, amount: float, tx_id: str):
with env.begin(write=True) as txn:
# Get current balances
from_account = json.loads(txn.get(from_user.encode(), db=accounts_db) or b'{"balance": 0}')
to_account = json.loads(txn.get(to_user.encode(), db=accounts_db) or b'{"balance": 0}')
# Check sufficient funds
if from_account['balance'] < amount:
raise ValueError("Insufficient funds")
# Update balances
from_account['balance'] -= amount
to_account['balance'] += amount
# Store updated balances
txn.put(from_user.encode(), json.dumps(from_account).encode(), db=accounts_db)
txn.put(to_user.encode(), json.dumps(to_account).encode(), db=accounts_db)
# Log transaction
tx_record = {
'id': tx_id,
'from': from_user,
'to': to_user,
'amount': amount,
'timestamp': time.time()
}
txn.put(tx_id.encode(), json.dumps(tx_record).encode(), db=transactions_db)
print(f"Transfer completed: {from_user} -> {to_user}, ${amount}")
# Create users and initial accounts
with env.begin(write=True) as txn:
txn.put(b'alice', b'{"name": "Alice", "email": "alice@example.com"}', db=users_db)
txn.put(b'bob', b'{"name": "Bob", "email": "bob@example.com"}', db=users_db)
txn.put(b'alice', b'{"balance": 1000.0}', db=accounts_db)
txn.put(b'bob', b'{"balance": 500.0}', db=accounts_db)
# Perform transfer
transfer_money('alice', 'bob', 100.0, 'tx001')
env.close()import lmdb
env = lmdb.open('/path/to/database', max_dbs=5)
# Create databases with different configurations
users_db = env.open_db(b'users') # Standard key-value
tags_db = env.open_db(b'tags', dupsort=True) # Allow duplicate keys
counters_db = env.open_db(b'counters', integerkey=True) # Integer keys
reverse_db = env.open_db(b'reverse', reverse_key=True) # Reverse key order
# Store data using different database configurations
with env.begin(write=True) as txn:
# Standard database
txn.put(b'user1', b'Alice', db=users_db)
txn.put(b'user2', b'Bob', db=users_db)
# Database with duplicate keys (tags for posts)
txn.put(b'post1', b'python', db=tags_db)
txn.put(b'post1', b'tutorial', db=tags_db) # Same key, different value
txn.put(b'post1', b'beginner', db=tags_db)
# Integer key database
import struct
key1 = struct.pack('i', 1)
key2 = struct.pack('i', 100)
key3 = struct.pack('i', 50)
txn.put(key1, b'counter_one', db=counters_db)
txn.put(key2, b'counter_hundred', db=counters_db)
txn.put(key3, b'counter_fifty', db=counters_db)
# Reverse key order database
txn.put(b'zebra', b'last_animal', db=reverse_db)
txn.put(b'apple', b'first_fruit', db=reverse_db)
txn.put(b'banana', b'second_fruit', db=reverse_db)
# Read and verify data
with env.begin() as txn:
print("Standard database:")
for key, value in txn.cursor(db=users_db):
print(f" {key} = {value}")
print("\\nDuplicate keys database:")
for key, value in txn.cursor(db=tags_db):
print(f" {key} = {value}")
print("\\nInteger keys database (sorted by integer value):")
for key, value in txn.cursor(db=counters_db):
int_key = struct.unpack('i', key)[0]
print(f" {int_key} = {value}")
print("\\nReverse order database:")
for key, value in txn.cursor(db=reverse_db):
print(f" {key} = {value}")
env.close()import lmdb
env = lmdb.open('/path/to/database', max_dbs=3)
users_db = env.open_db(b'users')
posts_db = env.open_db(b'posts')
comments_db = env.open_db(b'comments')
# Add sample data
with env.begin(write=True) as txn:
# Users
for i in range(100):
txn.put(f'user{i}'.encode(), f'User {i}'.encode(), db=users_db)
# Posts
for i in range(50):
txn.put(f'post{i}'.encode(), f'Post {i} content'.encode(), db=posts_db)
# Comments
for i in range(200):
post_id = i % 50
txn.put(f'comment{i}'.encode(), f'Comment on post{post_id}'.encode(), db=comments_db)
# Get statistics for each database
with env.begin() as txn:
databases = [
('users', users_db),
('posts', posts_db),
('comments', comments_db)
]
for name, db in databases:
stats = txn.stat(db)
print(f"\\n{name.capitalize()} database statistics:")
print(f" Entries: {stats['entries']}")
print(f" Leaf pages: {stats['leaf_pages']}")
print(f" Branch pages: {stats['branch_pages']}")
print(f" Overflow pages: {stats['overflow_pages']}")
print(f" Tree depth: {stats['depth']}")
# Empty a database
with env.begin(write=True) as txn:
print(f"\\nComments before drop: {txn.stat(comments_db)['entries']}")
txn.drop(comments_db, delete=False) # Empty but don't delete
print(f"Comments after drop: {txn.stat(comments_db)['entries']}")
env.close()import lmdb
import threading
import time
env = lmdb.open('/path/to/database', max_dbs=2)
db1 = env.open_db(b'database1')
db2 = env.open_db(b'database2')
def worker(worker_id: int, database):
"""Worker function that operates on specific database"""
with env.begin(write=True) as txn:
for i in range(10):
key = f'worker{worker_id}_item{i}'.encode()
value = f'data from worker {worker_id}'.encode()
txn.put(key, value, db=database)
time.sleep(0.1) # Simulate work
print(f"Worker {worker_id} completed")
# Create threads working on different databases
thread1 = threading.Thread(target=worker, args=(1, db1))
thread2 = threading.Thread(target=worker, args=(2, db2))
# Start concurrent operations on different databases
thread1.start()
thread2.start()
thread1.join()
thread2.join()
# Verify data isolation
with env.begin() as txn:
print("Database 1 contents:")
for key, value in txn.cursor(db=db1):
print(f" {key} = {value}")
print("\\nDatabase 2 contents:")
for key, value in txn.cursor(db=db2):
print(f" {key} = {value}")
env.close()Install with Tessl CLI
npx tessl i tessl/pypi-lmdb