An object mapper for the neo4j graph database.
—
Database connection management, schema operations, transaction support, and administrative functions for maintaining Neo4j database state in neomodel. Includes both synchronous and asynchronous database interfaces.
Core database interface classes providing connection management and query execution.
class Database:
"""
Main database interface for synchronous operations.
Provides connection management, query execution, and transaction support.
"""
def cypher_query(self, query, params=None, resolve_objects=False):
"""
Execute a raw Cypher query.
Args:
query (str): Cypher query string
params (dict, optional): Query parameters
resolve_objects (bool): Whether to resolve results to neomodel objects
Returns:
tuple: (results, meta) - query results and metadata
"""
def transaction(self):
"""
Start a new database transaction.
Returns:
Transaction: Transaction context manager
"""
def set_connection(self, url):
"""
Set database connection URL.
Args:
url (str): Neo4j connection URL (bolt://...)
"""
def close_connection(self):
"""Close current database connection."""
class AsyncDatabase:
"""
Main database interface for asynchronous operations.
Same interface as Database but all methods are async.
"""
async def cypher_query(self, query, params=None, resolve_objects=False):
"""Asynchronously execute a raw Cypher query."""
async def transaction(self):
"""Start a new async database transaction."""
async def set_connection(self, url):
"""Asynchronously set database connection URL."""
async def close_connection(self):
"""Asynchronously close current database connection."""
# Global database instances
db: Database
adb: AsyncDatabaseAdministrative functions for database schema management and maintenance.
def change_neo4j_password(new_password):
"""
Change the Neo4j database password.
Args:
new_password (str): New password for Neo4j user
Returns:
bool: True if password change was successful
"""
def clear_neo4j_database():
"""
Clear all nodes and relationships from the database.
Warning: This permanently deletes all data in the database.
Returns:
bool: True if database was cleared successfully
"""
def drop_constraints():
"""
Drop all database constraints.
Removes unique constraints, existence constraints, and other schema constraints.
Returns:
bool: True if constraints were dropped successfully
"""
def drop_indexes():
"""
Drop all database indexes.
Removes all property indexes, fulltext indexes, and vector indexes.
Returns:
bool: True if indexes were dropped successfully
"""
def install_all_labels():
"""
Install constraints and indexes for all defined node models.
Scans all StructuredNode subclasses and creates appropriate
database constraints and indexes based on property definitions.
Returns:
bool: True if installation was successful
"""
def install_labels(*models):
"""
Install constraints and indexes for specific node models.
Args:
*models: StructuredNode classes to install labels for
Returns:
bool: True if installation was successful
"""
def remove_all_labels():
"""
Remove all constraints and indexes from the database.
Equivalent to calling both drop_constraints() and drop_indexes().
Returns:
bool: True if removal was successful
"""Classes and functions for managing database transactions.
class Transaction:
"""
Database transaction context manager for synchronous operations.
Provides commit and rollback functionality for grouped operations.
"""
def __enter__(self):
"""Enter transaction context."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit transaction context with automatic commit/rollback."""
def commit(self):
"""Commit the current transaction."""
def rollback(self):
"""Rollback the current transaction."""
class AsyncTransaction:
"""
Database transaction context manager for asynchronous operations.
"""
async def __aenter__(self):
"""Enter async transaction context."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async transaction context with automatic commit/rollback."""
async def commit(self):
"""Asynchronously commit the current transaction."""
async def rollback(self):
"""Asynchronously rollback the current transaction."""from neomodel import config, db
# Set database connection
config.DATABASE_URL = 'bolt://neo4j:password@localhost:7687'
# Alternative: set specific database name
config.DATABASE_NAME = 'myapp'
# Use custom driver settings
config.DRIVER = {
'max_connection_lifetime': 1000,
'max_connection_pool_size': 50,
'connection_acquisition_timeout': 60
}
# Test connection
try:
db.set_connection(config.DATABASE_URL)
print("Connected to Neo4j successfully")
except Exception as e:
print(f"Connection failed: {e}")from neomodel import db
# Execute raw Cypher query
query = "MATCH (n:Person) WHERE n.age > $age RETURN n.name, n.age"
results, meta = db.cypher_query(query, {'age': 30})
print(f"Found {len(results)} results")
for row in results:
name, age = row
print(f"{name}: {age} years old")
# Query with object resolution
query = "MATCH (n:Person) RETURN n"
results, meta = db.cypher_query(query, resolve_objects=True)
# Results are now Person instances
for person in results:
print(f"Person: {person.name}")from neomodel import db, StructuredNode, StringProperty
class Person(StructuredNode):
name = StringProperty(required=True)
# Synchronous transaction
with db.transaction():
person1 = Person(name="Alice").save()
person2 = Person(name="Bob").save()
# Both saves committed together
# Manual transaction control
tx = db.transaction()
try:
person = Person(name="Charlie").save()
# Do more operations...
tx.commit()
except Exception as e:
tx.rollback()
print(f"Transaction failed: {e}")from neomodel import adb, AsyncStructuredNode, StringProperty
class AsyncPerson(AsyncStructuredNode):
name = StringProperty(required=True)
async def database_operations():
# Async raw query
query = "MATCH (n:Person) RETURN count(n) as total"
results, meta = await adb.cypher_query(query)
total_people = results[0][0]
# Async transaction
async with adb.transaction():
person1 = await AsyncPerson(name="Dave").save()
person2 = await AsyncPerson(name="Eve").save()
# Both saves committed together
return total_peoplefrom neomodel import (
install_all_labels, install_labels, drop_constraints,
drop_indexes, clear_neo4j_database
)
# Install constraints and indexes for all models
install_all_labels()
# Install for specific models only
from myapp.models import Person, Company
install_labels(Person, Company)
# Clean up schema
drop_constraints() # Remove all constraints
drop_indexes() # Remove all indexes
# Nuclear option - clear everything
# WARNING: This deletes all data!
clear_neo4j_database()from neomodel import change_neo4j_password, db
# Change database password
try:
change_neo4j_password("new_secure_password")
print("Password changed successfully")
except Exception as e:
print(f"Password change failed: {e}")
# Check database status
try:
results, meta = db.cypher_query("CALL dbms.components()")
for component in results:
print(f"Component: {component}")
except Exception as e:
print(f"Status check failed: {e}")
# Get database statistics
stats_query = """
MATCH (n)
RETURN labels(n) as label, count(n) as count
ORDER BY count DESC
"""
results, meta = db.cypher_query(stats_query)
print("Node counts by label:")
for label, count in results:
print(f" {label}: {count}")from neomodel import db, config
# Multiple database connections
def switch_database(db_name):
"""Switch to a different database."""
original_url = config.DATABASE_URL
# Parse and modify URL for different database
base_url = original_url.rsplit('/', 1)[0]
new_url = f"{base_url}/{db_name}"
# Close current connection and connect to new database
db.close_connection()
db.set_connection(new_url)
return original_url
# Usage
original = switch_database("testdb")
# ... work with test database ...
db.set_connection(original) # Switch backfrom neomodel import db
from neomodel.exceptions import (
DoesNotExist, UniqueProperty, ConstraintValidationFailed
)
def safe_database_operation():
try:
# Potentially failing operation
results, meta = db.cypher_query(
"MATCH (n:Person {name: $name}) RETURN n",
{'name': 'NonexistentPerson'}
)
if not results:
raise DoesNotExist("Person not found")
except ConstraintValidationFailed as e:
print(f"Constraint violation: {e}")
except UniqueProperty as e:
print(f"Unique constraint violated: {e}")
except Exception as e:
print(f"Database error: {e}")
# Log error details
print(f"Query meta: {meta}")# Database result types
CypherResult = Tuple[List[List[Any]], Dict[str, Any]]
QueryParams = Dict[str, Any]
# Transaction types
TransactionContext = Union[Transaction, AsyncTransaction]
# Connection types
DatabaseURL = str
DriverConfig = Dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-neomodel