Neo4j Bolt driver for Python providing database connectivity and query execution
—
Session management providing contexts for executing work with transaction control, bookmark handling for causal consistency, and support for both read and write operations. Sessions manage the logical connection to the database and handle transaction lifecycle, retry logic, and connection pooling.
Synchronous session providing the main interface for executing Cypher queries and managing transactions within a logical database connection context.
class Session:
def run(
self,
query: str,
parameters: dict = None,
**kwargs
) -> Result:
"""
Run a Cypher query in an auto-commit transaction.
Parameters:
- query: Cypher query string
- parameters: Query parameters dictionary
- **kwargs: Additional query configuration options
Returns:
Result stream for consuming query results
"""
def execute_read(
self,
work: Callable[[ManagedTransaction], Any],
*args,
**kwargs
) -> Any:
"""
Execute a read transaction function with automatic retry.
Parameters:
- work: Transaction function taking ManagedTransaction parameter
- *args: Arguments to pass to transaction function
- **kwargs: Keyword arguments for transaction function
Returns:
Return value of the transaction function
"""
def execute_write(
self,
work: Callable[[ManagedTransaction], Any],
*args,
**kwargs
) -> Any:
"""
Execute a write transaction function with automatic retry.
Parameters:
- work: Transaction function taking ManagedTransaction parameter
- *args: Arguments to pass to transaction function
- **kwargs: Keyword arguments for transaction function
Returns:
Return value of the transaction function
"""
def begin_transaction(
self,
*,
metadata: dict = None,
timeout: float = None
) -> Transaction:
"""
Begin an explicit transaction for manual control.
Parameters:
- metadata: Transaction metadata dictionary
- timeout: Transaction timeout in seconds
Returns:
Transaction object for manual transaction management
"""
def close(self) -> None:
"""
Close the session and release resources.
Should be called when session is no longer needed.
"""
@property
def last_bookmarks(self) -> list[Bookmark]:
"""
Get bookmarks from the last completed transaction.
Used for causal consistency across sessions.
Returns:
List of bookmark objects
"""Example usage:
from neo4j import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))
# Using session context manager (recommended)
with driver.session() as session:
# Auto-commit transaction
result = session.run("CREATE (n:Person {name: $name}) RETURN n", name="Alice")
# Read transaction with retry
def read_people(tx):
result = tx.run("MATCH (n:Person) RETURN n.name AS name")
return [record["name"] for record in result]
names = session.execute_read(read_people)
# Write transaction with retry
def create_person(tx, name):
return tx.run("CREATE (n:Person {name: $name}) RETURN id(n)", name=name).single().value()
person_id = session.execute_write(create_person, "Bob")
# Manual transaction control
tx = session.begin_transaction()
try:
tx.run("CREATE (n:Person {name: $name})", name="Charlie")
tx.commit()
except Exception:
tx.rollback()
raiseAsynchronous session providing async versions of all session operations for non-blocking database interactions.
class AsyncSession:
async def run(
self,
query: str,
parameters: dict = None,
**kwargs
) -> AsyncResult:
"""
Run a Cypher query in an async auto-commit transaction.
Parameters:
- query: Cypher query string
- parameters: Query parameters dictionary
- **kwargs: Additional query configuration options
Returns:
AsyncResult stream for consuming query results
"""
async def execute_read(
self,
work: Callable[[AsyncManagedTransaction], Awaitable[Any]],
*args,
**kwargs
) -> Any:
"""
Execute an async read transaction function with automatic retry.
Parameters:
- work: Async transaction function
- *args: Arguments to pass to transaction function
- **kwargs: Keyword arguments for transaction function
Returns:
Return value of the transaction function
"""
async def execute_write(
self,
work: Callable[[AsyncManagedTransaction], Awaitable[Any]],
*args,
**kwargs
) -> Any:
"""
Execute an async write transaction function with automatic retry.
Parameters:
- work: Async transaction function
- *args: Arguments to pass to transaction function
- **kwargs: Keyword arguments for transaction function
Returns:
Return value of the transaction function
"""
async def begin_transaction(
self,
*,
metadata: dict = None,
timeout: float = None
) -> AsyncTransaction:
"""
Begin an async explicit transaction.
Parameters:
- metadata: Transaction metadata dictionary
- timeout: Transaction timeout in seconds
Returns:
AsyncTransaction object for manual transaction management
"""
async def close(self) -> None:
"""
Asynchronously close the session and release resources.
"""
@property
def last_bookmarks(self) -> list[Bookmark]:
"""
Get bookmarks from the last completed transaction.
Returns:
List of bookmark objects for causal consistency
"""Example async usage:
import asyncio
from neo4j import AsyncGraphDatabase, basic_auth
async def example():
driver = AsyncGraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))
# Using async session context manager
async with driver.session() as session:
# Async auto-commit transaction
result = await session.run("CREATE (n:Person {name: $name}) RETURN n", name="Alice")
# Async read transaction
async def read_people(tx):
result = await tx.run("MATCH (n:Person) RETURN n.name AS name")
return [record["name"] async for record in result]
names = await session.execute_read(read_people)
# Async write transaction
async def create_person(tx, name):
result = await tx.run("CREATE (n:Person {name: $name}) RETURN id(n)", name=name)
record = await result.single()
return record.value()
person_id = await session.execute_write(create_person, "Bob")
# Manual async transaction
tx = await session.begin_transaction()
try:
await tx.run("CREATE (n:Person {name: $name})", name="Charlie")
await tx.commit()
except Exception:
await tx.rollback()
raise
await driver.close()
asyncio.run(example())Sessions can be configured with various options when created from a driver:
# Session with specific database
session = driver.session(database="mydb")
# Session with default access mode
session = driver.session(default_access_mode=neo4j.WRITE_ACCESS)
# Session with initial bookmarks for causal consistency
session = driver.session(bookmarks=[bookmark1, bookmark2])
# Session with custom bookmark manager
session = driver.session(bookmark_manager=custom_bookmark_manager)
# Session with session-level authentication
session = driver.session(auth=basic_auth("different_user", "password"))Transaction functions provide automatic retry logic for handling transient errors and are the recommended way to execute database operations:
def create_friendship(tx, person1, person2):
"""Transaction function that creates a friendship relationship."""
query = """
MATCH (p1:Person {name: $name1}), (p2:Person {name: $name2})
CREATE (p1)-[:FRIENDS_WITH]->(p2)
RETURN p1.name, p2.name
"""
result = tx.run(query, name1=person1, name2=person2)
record = result.single()
return record.values()
# Execute with automatic retry
with driver.session() as session:
result = session.execute_write(create_friendship, "Alice", "Bob")
print(f"Created friendship: {result}")Sessions support causal consistency through bookmarks, ensuring read operations can see the effects of previous write operations:
# Write in one session and capture bookmarks
with driver.session() as write_session:
write_session.run("CREATE (n:Person {name: 'Alice'})")
bookmarks = write_session.last_bookmarks
# Read from another session using bookmarks
with driver.session(bookmarks=bookmarks) as read_session:
result = read_session.run("MATCH (n:Person {name: 'Alice'}) RETURN n")
# This read will see the Alice node created aboveclass Bookmark:
"""
Represents a bookmark for causal consistency.
Bookmarks are opaque values returned by transactions.
"""
class ManagedTransaction:
"""
Transaction managed by transaction functions.
Cannot be committed or rolled back manually.
"""
def run(
self,
query: str,
parameters: dict = None,
**kwparameters
) -> Result:
"""Run Cypher within the managed transaction."""
class AsyncManagedTransaction:
"""
Async transaction managed by transaction functions.
Cannot be committed or rolled back manually.
"""
async def run(
self,
query: str,
parameters: dict = None,
**kwparameters
) -> AsyncResult:
"""Run Cypher within the async managed transaction."""Install with Tessl CLI
npx tessl i tessl/pypi-neo4j