Fast Python bindings for the UnQLite embedded NoSQL database.
High-level document store interface for managing JSON documents with querying, filtering, and schema support. Collections provide a NoSQL document database interface built on top of UnQLite's Jx9 scripting engine.
Create and manage JSON document collections.
def collection(self, name):
"""Create a wrapper for working with Jx9 collections.
Returns Collection object."""
...
class Collection:
def __init__(self, unqlite, name):
"""Initialize collection with database and collection name."""
...
def create(self):
"""Create the named collection. Returns True if created, False if exists."""
...
def drop(self):
"""Drop the collection and all associated records. Returns True if dropped."""
...
def exists(self):
"""Return boolean indicating whether the collection exists."""
...Usage Example:
db = unqlite.UnQLite('documents.db')
# Create collection wrapper
users = db.collection('users')
# Create collection if it doesn't exist
if not users.exists():
users.create()
print("Users collection created")
# Work with collection
print(f"Collection exists: {users.exists()}")
# Drop collection when done
# users.drop()Store, fetch, update, and delete JSON documents.
def store(self, record, return_id=True):
"""Create a new JSON document in the collection.
Returns record ID if return_id=True, otherwise returns boolean."""
...
def fetch(self, record_id):
"""Fetch the document associated with the given ID."""
...
def update(self, record_id, record):
"""Update the record identified by the given ID."""
...
def delete(self, record_id):
"""Delete the document associated with the given ID."""
...
def all(self):
"""Retrieve all records in the given collection."""
...Usage Example:
db = unqlite.UnQLite('blog.db')
posts = db.collection('posts')
if not posts.exists():
posts.create()
# Store new documents
post1_id = posts.store({
'title': 'Getting Started with UnQLite',
'content': 'UnQLite is a lightweight NoSQL database...',
'author': 'Alice',
'tags': ['database', 'nosql', 'tutorial'],
'published': True
})
print(f"Created post with ID: {post1_id}")
post2_id = posts.store({
'title': 'Advanced Database Techniques',
'content': 'Learn advanced patterns for database design...',
'author': 'Bob',
'tags': ['database', 'advanced'],
'published': False
})
# Fetch specific document
post = posts.fetch(post1_id)
print(f"Retrieved post: {post['title']}")
# Update document
posts.update(post1_id, {
'title': 'Getting Started with UnQLite (Updated)',
'content': post['content'] + ' Updated with new examples.',
'author': 'Alice',
'tags': ['database', 'nosql', 'tutorial', 'updated'],
'published': True
})
# Get all documents
all_posts = posts.all()
print(f"Total posts: {len(all_posts)}")
# Delete document
posts.delete(post2_id)Access documents using dictionary-like syntax.
def __getitem__(self, record_id):
"""Fetch document using collection[record_id] syntax."""
...
def __setitem__(self, record_id, record):
"""Update document using collection[record_id] = record syntax."""
...
def __delitem__(self, record_id):
"""Delete document using del collection[record_id] syntax."""
...
def __len__(self):
"""Return the number of records in the document collection."""
...Usage Example:
db = unqlite.UnQLite(':mem:')
products = db.collection('products')
products.create()
# Store initial document
product_id = products.store({
'name': 'Laptop',
'price': 999.99,
'category': 'electronics',
'in_stock': True
})
# Dictionary-style access
product = products[product_id]
print(f"Product: {product['name']}")
# Dictionary-style update
products[product_id] = {
'name': 'Gaming Laptop',
'price': 1299.99,
'category': 'electronics',
'in_stock': True,
'features': ['RGB lighting', 'High refresh rate']
}
# Check collection size
print(f"Products in collection: {len(products)}")
# Dictionary-style deletion
del products[product_id]
print(f"Products after deletion: {len(products)}")Filter documents using Python callback functions.
def filter(self, filter_fn):
"""Filter the records in the collection using the provided Python callback.
Returns list of matching documents."""
...Usage Example:
db = unqlite.UnQLite(':mem:')
employees = db.collection('employees')
employees.create()
# Add sample data
sample_employees = [
{'name': 'Alice', 'department': 'Engineering', 'salary': 85000, 'active': True},
{'name': 'Bob', 'department': 'Marketing', 'salary': 65000, 'active': True},
{'name': 'Charlie', 'department': 'Engineering', 'salary': 95000, 'active': False},
{'name': 'Diana', 'department': 'Sales', 'salary': 70000, 'active': True},
{'name': 'Eve', 'department': 'Engineering', 'salary': 90000, 'active': True}
]
for emp in sample_employees:
employees.store(emp)
# Filter by department
def is_engineer(employee):
return employee.get('department') == 'Engineering'
engineers = employees.filter(is_engineer)
print(f"Engineers: {len(engineers)}")
for eng in engineers:
print(f" {eng['name']}: ${eng['salary']}")
# Filter by salary and status
def high_earning_active(employee):
return employee.get('salary', 0) > 80000 and employee.get('active', False)
high_earners = employees.filter(high_earning_active)
print(f"High-earning active employees: {len(high_earners)}")
# Complex filter with multiple conditions
def senior_engineer(employee):
return (employee.get('department') == 'Engineering' and
employee.get('salary', 0) > 90000 and
employee.get('active', False))
senior_engineers = employees.filter(senior_engineer)
print(f"Senior engineers: {len(senior_engineers)}")Define and manage document schemas for data validation.
def set_schema(self, _schema=None, **kwargs):
"""Set collection schema for document validation."""
...
def get_schema(self):
"""Get the current collection schema."""
...Usage Example:
db = unqlite.UnQLite('structured.db')
users = db.collection('users')
users.create()
# Define schema
user_schema = {
'name': {'type': 'string', 'required': True},
'email': {'type': 'string', 'required': True},
'age': {'type': 'number', 'required': False},
'active': {'type': 'boolean', 'default': True}
}
# Set schema
users.set_schema(user_schema)
# Alternative syntax using kwargs
users.set_schema(
name={'type': 'string', 'required': True},
email={'type': 'string', 'required': True}
)
# Retrieve schema
current_schema = users.get_schema()
print(f"Current schema: {current_schema}")
# Store documents (schema validation depends on UnQLite/Jx9 implementation)
try:
user_id = users.store({
'name': 'Alice Johnson',
'email': 'alice@example.com',
'age': 30
})
print(f"User stored with ID: {user_id}")
except Exception as e:
print(f"Schema validation failed: {e}")Access collection metadata and status information.
def last_record_id(self):
"""Return the ID of the last document to be stored."""
...
def current_record_id(self):
"""Return the ID of the current JSON document."""
...
def creation_date(self):
"""Return collection creation date."""
...
def error_log(self):
"""Return collection error log."""
...Usage Example:
db = unqlite.UnQLite('metadata.db')
logs = db.collection('logs')
logs.create()
# Store some documents
log1_id = logs.store({'level': 'info', 'message': 'Application started'})
log2_id = logs.store({'level': 'warning', 'message': 'Low disk space'})
log3_id = logs.store({'level': 'error', 'message': 'Database connection failed'})
# Get metadata
print(f"Last record ID: {logs.last_record_id()}")
print(f"Current record ID: {logs.current_record_id()}")
print(f"Creation date: {logs.creation_date()}")
# Check for errors
error_log = logs.error_log()
if error_log:
print(f"Errors: {error_log}")Navigate through collection documents with cursor-like operations.
def fetch_current(self):
"""Return current document at cursor position."""
...
def reset_cursor(self):
"""Reset collection cursor to beginning."""
...Usage Example:
db = unqlite.UnQLite(':mem:')
news = db.collection('news')
news.create()
# Add articles
articles = [
{'headline': 'Tech News Update', 'category': 'technology'},
{'headline': 'Sports Results', 'category': 'sports'},
{'headline': 'Weather Forecast', 'category': 'weather'}
]
for article in articles:
news.store(article)
# Reset cursor and fetch current
news.reset_cursor()
current = news.fetch_current()
if current:
print(f"Current article: {current['headline']}")Iterate through all documents in a collection.
def iterator(self):
"""Return CollectionIterator object."""
...
def __iter__(self):
"""Return iterator for collection documents."""
...
class CollectionIterator:
def __iter__(self):
"""Setup iteration."""
...
def __next__(self):
"""Get next document. Raises StopIteration at end."""
...Usage Example:
db = unqlite.UnQLite(':mem:')
tasks = db.collection('tasks')
tasks.create()
# Add tasks
task_data = [
{'title': 'Write documentation', 'priority': 'high', 'completed': False},
{'title': 'Review code', 'priority': 'medium', 'completed': True},
{'title': 'Deploy application', 'priority': 'high', 'completed': False},
{'title': 'Update dependencies', 'priority': 'low', 'completed': False}
]
for task in task_data:
tasks.store(task)
# Iterate through all tasks
print("All tasks:")
for task in tasks:
status = "✓" if task['completed'] else "○"
print(f" {status} {task['title']} ({task['priority']})")
# Use explicit iterator
print("High-priority tasks:")
task_iter = tasks.iterator()
for task in task_iter:
if task['priority'] == 'high':
print(f" {task['title']}")Work with nested documents and complex data structures:
db = unqlite.UnQLite('complex.db')
orders = db.collection('orders')
orders.create()
# Complex nested document
order = {
'order_id': 'ORD-2024-001',
'customer': {
'name': 'Alice Johnson',
'email': 'alice@example.com',
'address': {
'street': '123 Main St',
'city': 'Anytown',
'zip': '12345'
}
},
'items': [
{'product': 'Laptop', 'quantity': 1, 'price': 999.99},
{'product': 'Mouse', 'quantity': 2, 'price': 29.99}
],
'total': 1059.97,
'status': 'pending',
'created_at': '2024-01-15T10:30:00Z'
}
order_id = orders.store(order)
# Filter by nested properties
def high_value_orders(order):
return order.get('total', 0) > 500
valuable_orders = orders.filter(high_value_orders)
print(f"High-value orders: {len(valuable_orders)}")Efficiently handle multiple documents:
db = unqlite.UnQLite('bulk.db')
inventory = db.collection('inventory')
inventory.create()
# Bulk insert
products = [
{'sku': 'LAPTOP-001', 'name': 'Gaming Laptop', 'stock': 25},
{'sku': 'MOUSE-001', 'name': 'Wireless Mouse', 'stock': 100},
{'sku': 'KEYBOARD-001', 'name': 'Mechanical Keyboard', 'stock': 50}
]
# Store all products
product_ids = []
for product in products:
product_id = inventory.store(product, return_id=True)
product_ids.append(product_id)
print(f"Stored {len(product_ids)} products")
# Bulk update using filter
def needs_restock(item):
return item.get('stock', 0) < 30
low_stock_items = inventory.filter(needs_restock)
print(f"Items needing restock: {len(low_stock_items)}")
# Update low stock items
for item in low_stock_items:
# Note: This requires knowing the record ID
# In practice, you'd need to track IDs or use custom logic
updated_item = item.copy()
updated_item['status'] = 'reorder_needed'
# inventory.update(item_id, updated_item)# Efficient: Simple filter conditions
def active_users(user):
return user.get('active', False)
# Less efficient: Complex calculations in filter
def complex_scoring(user):
score = sum(user.get('metrics', {}).values()) * 0.8
return score > calculate_threshold(user.get('category'))
# Better: Pre-calculate and store scores
def pre_calculated_filter(user):
return user.get('precalculated_score', 0) > user.get('threshold', 0)Install with Tessl CLI
npx tessl i tessl/pypi-unqlite