Neo4j Bolt driver for Python providing database connectivity and query execution
—
Transaction management and result handling supporting auto-commit transactions, explicit transactions with full ACID properties, and flexible result consumption patterns. The system provides both streaming and eager result consumption with comprehensive metadata access.
Explicit transaction management providing full control over transaction lifecycle with commit and rollback capabilities.
class Transaction:
def run(
self,
query: str,
parameters: dict = None,
**kwparameters
) -> Result:
"""
Run a Cypher query within the transaction.
Parameters:
- query: Cypher query string
- parameters: Query parameters dictionary
- **kwparameters: Additional parameters as keyword arguments
Returns:
Result stream for the query execution
"""
def commit(self) -> None:
"""
Commit the transaction, making all changes permanent.
After commit, the transaction cannot be used further.
"""
def rollback(self) -> None:
"""
Roll back the transaction, discarding all changes.
After rollback, the transaction cannot be used further.
"""
def close(self) -> None:
"""
Close the transaction.
If not committed, changes will be rolled back.
"""class AsyncTransaction:
async def run(
self,
query: str,
parameters: dict = None,
**kwparameters
) -> AsyncResult:
"""
Run a Cypher query within the async transaction.
Parameters:
- query: Cypher query string
- parameters: Query parameters dictionary
- **kwparameters: Additional parameters as keyword arguments
Returns:
AsyncResult stream for the query execution
"""
async def commit(self) -> None:
"""
Asynchronously commit the transaction.
"""
async def rollback(self) -> None:
"""
Asynchronously roll back the transaction.
"""
async def close(self) -> None:
"""
Asynchronously close the transaction.
"""Example transaction usage:
from neo4j import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))
# Manual transaction control
with driver.session() as session:
tx = session.begin_transaction()
try:
# Execute multiple operations in transaction
tx.run("CREATE (n:Person {name: $name})", name="Alice")
tx.run("CREATE (n:Person {name: $name})", name="Bob")
# Create relationship between them
result = tx.run("""
MATCH (a:Person {name: $name1}), (b:Person {name: $name2})
CREATE (a)-[:KNOWS]->(b)
RETURN a.name, b.name
""", name1="Alice", name2="Bob")
# Process results
for record in result:
print(f"{record['a.name']} knows {record['b.name']}")
# Commit all changes
tx.commit()
print("Transaction committed successfully")
except Exception as e:
# Rollback on error
tx.rollback()
print(f"Transaction rolled back due to error: {e}")
raiseResult handling providing flexible consumption patterns for query results with both streaming and eager loading capabilities.
class Result:
def keys(self) -> list[str]:
"""
Get the column keys/names from the query result.
Returns:
List of column names as strings
"""
def consume(self) -> ResultSummary:
"""
Consume all remaining records and return execution summary.
After calling consume(), no more records can be retrieved.
Returns:
ResultSummary containing execution metadata
"""
def single(self, strict: bool = True) -> Record | None:
"""
Return the single record from the result.
Parameters:
- strict: If True, raise exception if not exactly one record
Returns:
Single Record if exactly one exists, None if zero (when strict=False)
Raises:
Exception if zero records (when strict=True) or multiple records
"""
def peek(self) -> Record:
"""
Return the next record without consuming it from the stream.
Returns:
Next Record in the stream (can be retrieved again)
Raises:
StopIteration if no more records available
"""
def data(self, *args) -> list[dict]:
"""
Return all records as a list of dictionaries.
Parameters:
- *args: Optional column names to include (all if not specified)
Returns:
List of dictionaries representing records
"""
def value(self, item: int | str = 0, default=None):
"""
Return a single column from a single record.
Parameters:
- item: Column index or name to retrieve
- default: Value to return if not found
Returns:
Single value from the specified column
"""
def values(self, *items) -> list:
"""
Return specified columns from all records as lists.
Parameters:
- *items: Column indices or names to retrieve
Returns:
List of column values for each record
"""
def to_df(self, expand: bool = False) -> pandas.DataFrame:
"""
Convert result to pandas DataFrame.
Parameters:
- expand: Whether to expand nested structures
Returns:
pandas DataFrame containing all records
Requires:
pandas package to be installed
"""
def to_eager_result(self) -> EagerResult:
"""
Convert to an EagerResult containing all records in memory.
Returns:
EagerResult with records, summary, and keys
"""
def __iter__(self) -> Iterator[Record]:
"""Iterator interface for consuming records one by one."""
def __next__(self) -> Record:
"""Get next record from the stream."""class AsyncResult:
async def keys(self) -> list[str]:
"""Get the column keys asynchronously."""
async def consume(self) -> ResultSummary:
"""
Asynchronously consume all records and return summary.
Returns:
ResultSummary containing execution metadata
"""
async def single(self, strict: bool = True) -> Record | None:
"""
Return the single record asynchronously.
Parameters:
- strict: If True, raise exception if not exactly one record
Returns:
Single Record or None
"""
async def peek(self) -> Record:
"""
Peek at next record asynchronously.
Returns:
Next Record without consuming it
"""
async def data(self, *args) -> list[dict]:
"""
Return all records as dictionaries asynchronously.
Returns:
List of dictionaries representing records
"""
async def to_eager_result(self) -> EagerResult:
"""
Convert to EagerResult asynchronously.
Returns:
EagerResult with all data loaded
"""
def __aiter__(self) -> AsyncIterator[Record]:
"""Async iterator interface."""
async def __anext__(self) -> Record:
"""Get next record asynchronously."""Example result usage:
# Streaming consumption
with driver.session() as session:
result = session.run("MATCH (n:Person) RETURN n.name AS name, n.age AS age")
# Iterate over results one by one
for record in result:
print(f"Name: {record['name']}, Age: {record['age']}")
# Alternative: consume all at once as dictionaries
result = session.run("MATCH (n:Person) RETURN n.name AS name")
data = result.data()
print(data) # [{'name': 'Alice'}, {'name': 'Bob'}]
# Get single result
result = session.run("MATCH (n:Person {name: $name}) RETURN n", name="Alice")
record = result.single()
print(record["n"]["name"])
# Get single value
result = session.run("MATCH (n:Person) RETURN count(n)")
count = result.value()
print(f"Total persons: {count}")In-memory result container providing immediate access to all query results and metadata.
class EagerResult:
"""
NamedTuple containing complete query results in memory.
"""
records: list[Record]
"""List of all records returned by the query."""
summary: ResultSummary
"""Query execution summary and metadata."""
keys: list[str]
"""List of column names/keys."""Example usage:
with driver.session() as session:
result = session.run("MATCH (n:Person) RETURN n.name AS name")
eager = result.to_eager_result()
# Access all records
for record in eager.records:
print(record["name"])
# Access summary information
print(f"Query completed in {eager.summary.result_consumed_after}ms")
# Access column keys
print(f"Columns: {eager.keys}")Comprehensive metadata about query execution including performance statistics, database changes, and server information.
class ResultSummary:
@property
def query(self) -> str:
"""The Cypher query that was executed."""
@property
def parameters(self) -> dict:
"""Parameters that were passed to the query."""
@property
def query_type(self) -> str:
"""Type of query (r for read, w for write, etc.)."""
@property
def plan(self) -> dict | None:
"""Query execution plan (if requested)."""
@property
def profile(self) -> dict | None:
"""Query execution profile (if requested)."""
@property
def notifications(self) -> list[SummaryNotification]:
"""Server notifications about the query."""
@property
def counters(self) -> SummaryCounters:
"""Statistics about database changes made."""
@property
def result_available_after(self) -> int:
"""Time in milliseconds until first record was available."""
@property
def result_consumed_after(self) -> int:
"""Time in milliseconds to consume all records."""
@property
def server(self) -> Address:
"""Address of the server that executed the query."""
@property
def database(self) -> str:
"""Name of the database where query was executed."""Statistics about database modifications made by write operations.
class SummaryCounters:
@property
def nodes_created(self) -> int:
"""Number of nodes created."""
@property
def nodes_deleted(self) -> int:
"""Number of nodes deleted."""
@property
def relationships_created(self) -> int:
"""Number of relationships created."""
@property
def relationships_deleted(self) -> int:
"""Number of relationships deleted."""
@property
def properties_set(self) -> int:
"""Number of properties set."""
@property
def labels_added(self) -> int:
"""Number of labels added to nodes."""
@property
def labels_removed(self) -> int:
"""Number of labels removed from nodes."""
@property
def indexes_added(self) -> int:
"""Number of indexes created."""
@property
def indexes_removed(self) -> int:
"""Number of indexes dropped."""
@property
def constraints_added(self) -> int:
"""Number of constraints created."""
@property
def constraints_removed(self) -> int:
"""Number of constraints dropped."""
@property
def system_updates(self) -> int:
"""Number of system updates performed."""Server notifications providing information about query execution, warnings, and optimization opportunities.
class SummaryNotification:
@property
def code(self) -> str:
"""Notification code identifier."""
@property
def title(self) -> str:
"""Human-readable notification title."""
@property
def description(self) -> str:
"""Detailed description of the notification."""
@property
def severity(self) -> str:
"""Severity level (WARNING, INFORMATION, etc.)."""
@property
def category(self) -> str:
"""Notification category."""
@property
def position(self) -> SummaryInputPosition | None:
"""Position in query where notification applies."""
class SummaryInputPosition:
@property
def offset(self) -> int:
"""Character offset in the query."""
@property
def line(self) -> int:
"""Line number in the query."""
@property
def column(self) -> int:
"""Column number in the query."""import asyncio
from neo4j import AsyncGraphDatabase, basic_auth
async def async_example():
driver = AsyncGraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))
async with driver.session() as session:
result = await session.run("MATCH (n:Person) RETURN n.name AS name")
# Async iteration
async for record in result:
print(record["name"])
# Async single result
result = await session.run("MATCH (n:Person {name: $name}) RETURN n", name="Alice")
record = await result.single()
# Async summary
result = await session.run("CREATE (n:Person {name: $name})", name="Charlie")
summary = await result.consume()
print(f"Nodes created: {summary.counters.nodes_created}")
await driver.close()
asyncio.run(async_example())with driver.session() as session:
result = session.run("EXPLAIN MATCH (n:Person) RETURN n")
summary = result.consume()
if summary.plan:
print("Query execution plan:")
print(summary.plan)
print(f"Query type: {summary.query_type}")
print(f"Time to first record: {summary.result_available_after}ms")
print(f"Total execution time: {summary.result_consumed_after}ms")
# Check for notifications
for notification in summary.notifications:
print(f"Notification: {notification.title} - {notification.description}")Install with Tessl CLI
npx tessl i tessl/pypi-neo4j