CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-neo4j

Neo4j Bolt driver for Python providing database connectivity and query execution

Pending
Overview
Eval results
Files

transactions-results.mddocs/

Transactions & Results

Transaction management and result handling supporting auto-commit transactions, explicit transactions with full ACID properties, and flexible result consumption patterns. The system provides both streaming and eager result consumption with comprehensive metadata access.

Capabilities

Transaction Classes

Explicit transaction management providing full control over transaction lifecycle with commit and rollback capabilities.

class Transaction:
    def run(
        self,
        query: str,
        parameters: dict = None,
        **kwparameters
    ) -> Result:
        """
        Run a Cypher query within the transaction.
        
        Parameters:
        - query: Cypher query string
        - parameters: Query parameters dictionary  
        - **kwparameters: Additional parameters as keyword arguments
        
        Returns:
        Result stream for the query execution
        """
    
    def commit(self) -> None:
        """
        Commit the transaction, making all changes permanent.
        After commit, the transaction cannot be used further.
        """
    
    def rollback(self) -> None:
        """
        Roll back the transaction, discarding all changes.
        After rollback, the transaction cannot be used further.
        """
    
    def close(self) -> None:
        """
        Close the transaction.
        If not committed, changes will be rolled back.
        """
class AsyncTransaction:
    async def run(
        self,
        query: str,
        parameters: dict = None,
        **kwparameters
    ) -> AsyncResult:
        """
        Run a Cypher query within the async transaction.
        
        Parameters:
        - query: Cypher query string
        - parameters: Query parameters dictionary
        - **kwparameters: Additional parameters as keyword arguments
        
        Returns:
        AsyncResult stream for the query execution
        """
    
    async def commit(self) -> None:
        """
        Asynchronously commit the transaction.
        """
    
    async def rollback(self) -> None:
        """
        Asynchronously roll back the transaction.
        """
    
    async def close(self) -> None:
        """
        Asynchronously close the transaction.
        """

Example transaction usage:

from neo4j import GraphDatabase, basic_auth

driver = GraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))

# Manual transaction control
with driver.session() as session:
    tx = session.begin_transaction()
    try:
        # Execute multiple operations in transaction
        tx.run("CREATE (n:Person {name: $name})", name="Alice")
        tx.run("CREATE (n:Person {name: $name})", name="Bob")
        
        # Create relationship between them
        result = tx.run("""
            MATCH (a:Person {name: $name1}), (b:Person {name: $name2})
            CREATE (a)-[:KNOWS]->(b)
            RETURN a.name, b.name
        """, name1="Alice", name2="Bob")
        
        # Process results
        for record in result:
            print(f"{record['a.name']} knows {record['b.name']}")
        
        # Commit all changes
        tx.commit()
        print("Transaction committed successfully")
        
    except Exception as e:
        # Rollback on error
        tx.rollback()
        print(f"Transaction rolled back due to error: {e}")
        raise

Result Classes

Result handling providing flexible consumption patterns for query results with both streaming and eager loading capabilities.

class Result:
    def keys(self) -> list[str]:
        """
        Get the column keys/names from the query result.
        
        Returns:
        List of column names as strings
        """
    
    def consume(self) -> ResultSummary:
        """
        Consume all remaining records and return execution summary.
        After calling consume(), no more records can be retrieved.
        
        Returns:
        ResultSummary containing execution metadata
        """
    
    def single(self, strict: bool = True) -> Record | None:
        """
        Return the single record from the result.
        
        Parameters:
        - strict: If True, raise exception if not exactly one record
        
        Returns:
        Single Record if exactly one exists, None if zero (when strict=False)
        
        Raises:
        Exception if zero records (when strict=True) or multiple records
        """
    
    def peek(self) -> Record:
        """
        Return the next record without consuming it from the stream.
        
        Returns:
        Next Record in the stream (can be retrieved again)
        
        Raises:
        StopIteration if no more records available
        """
    
    def data(self, *args) -> list[dict]:
        """
        Return all records as a list of dictionaries.
        
        Parameters:
        - *args: Optional column names to include (all if not specified)
        
        Returns:
        List of dictionaries representing records
        """
    
    def value(self, item: int | str = 0, default=None):
        """
        Return a single column from a single record.
        
        Parameters:
        - item: Column index or name to retrieve
        - default: Value to return if not found
        
        Returns:
        Single value from the specified column
        """
    
    def values(self, *items) -> list:
        """
        Return specified columns from all records as lists.
        
        Parameters:
        - *items: Column indices or names to retrieve
        
        Returns:
        List of column values for each record
        """
    
    def to_df(self, expand: bool = False) -> pandas.DataFrame:
        """
        Convert result to pandas DataFrame.
        
        Parameters:
        - expand: Whether to expand nested structures
        
        Returns:
        pandas DataFrame containing all records
        
        Requires:
        pandas package to be installed
        """
    
    def to_eager_result(self) -> EagerResult:
        """
        Convert to an EagerResult containing all records in memory.
        
        Returns:
        EagerResult with records, summary, and keys
        """
    
    def __iter__(self) -> Iterator[Record]:
        """Iterator interface for consuming records one by one."""
    
    def __next__(self) -> Record:
        """Get next record from the stream."""
class AsyncResult:
    async def keys(self) -> list[str]:
        """Get the column keys asynchronously."""
    
    async def consume(self) -> ResultSummary:
        """
        Asynchronously consume all records and return summary.
        
        Returns:
        ResultSummary containing execution metadata
        """
    
    async def single(self, strict: bool = True) -> Record | None:
        """
        Return the single record asynchronously.
        
        Parameters:
        - strict: If True, raise exception if not exactly one record
        
        Returns:
        Single Record or None
        """
    
    async def peek(self) -> Record:
        """
        Peek at next record asynchronously.
        
        Returns:
        Next Record without consuming it
        """
    
    async def data(self, *args) -> list[dict]:
        """
        Return all records as dictionaries asynchronously.
        
        Returns:
        List of dictionaries representing records
        """
    
    async def to_eager_result(self) -> EagerResult:
        """
        Convert to EagerResult asynchronously.
        
        Returns:
        EagerResult with all data loaded
        """
    
    def __aiter__(self) -> AsyncIterator[Record]:
        """Async iterator interface."""
    
    async def __anext__(self) -> Record:
        """Get next record asynchronously."""

Example result usage:

# Streaming consumption
with driver.session() as session:
    result = session.run("MATCH (n:Person) RETURN n.name AS name, n.age AS age")
    
    # Iterate over results one by one
    for record in result:
        print(f"Name: {record['name']}, Age: {record['age']}")
    
    # Alternative: consume all at once as dictionaries  
    result = session.run("MATCH (n:Person) RETURN n.name AS name")
    data = result.data()
    print(data)  # [{'name': 'Alice'}, {'name': 'Bob'}]
    
    # Get single result
    result = session.run("MATCH (n:Person {name: $name}) RETURN n", name="Alice")
    record = result.single()
    print(record["n"]["name"])
    
    # Get single value
    result = session.run("MATCH (n:Person) RETURN count(n)")
    count = result.value()
    print(f"Total persons: {count}")

EagerResult

In-memory result container providing immediate access to all query results and metadata.

class EagerResult:
    """
    NamedTuple containing complete query results in memory.
    """
    records: list[Record]
    """List of all records returned by the query."""
    
    summary: ResultSummary  
    """Query execution summary and metadata."""
    
    keys: list[str]
    """List of column names/keys."""

Example usage:

with driver.session() as session:
    result = session.run("MATCH (n:Person) RETURN n.name AS name")
    eager = result.to_eager_result()
    
    # Access all records
    for record in eager.records:
        print(record["name"])
    
    # Access summary information
    print(f"Query completed in {eager.summary.result_consumed_after}ms")
    
    # Access column keys
    print(f"Columns: {eager.keys}")

ResultSummary

Comprehensive metadata about query execution including performance statistics, database changes, and server information.

class ResultSummary:
    @property
    def query(self) -> str:
        """The Cypher query that was executed."""
    
    @property
    def parameters(self) -> dict:
        """Parameters that were passed to the query."""
    
    @property
    def query_type(self) -> str:
        """Type of query (r for read, w for write, etc.)."""
    
    @property
    def plan(self) -> dict | None:
        """Query execution plan (if requested)."""
    
    @property
    def profile(self) -> dict | None:
        """Query execution profile (if requested)."""
    
    @property
    def notifications(self) -> list[SummaryNotification]:
        """Server notifications about the query."""
    
    @property
    def counters(self) -> SummaryCounters:
        """Statistics about database changes made."""
    
    @property
    def result_available_after(self) -> int:
        """Time in milliseconds until first record was available."""
    
    @property  
    def result_consumed_after(self) -> int:
        """Time in milliseconds to consume all records."""
    
    @property
    def server(self) -> Address:
        """Address of the server that executed the query."""
    
    @property
    def database(self) -> str:
        """Name of the database where query was executed."""

SummaryCounters

Statistics about database modifications made by write operations.

class SummaryCounters:
    @property
    def nodes_created(self) -> int:
        """Number of nodes created."""
    
    @property
    def nodes_deleted(self) -> int:
        """Number of nodes deleted."""
    
    @property
    def relationships_created(self) -> int:
        """Number of relationships created."""
    
    @property
    def relationships_deleted(self) -> int:
        """Number of relationships deleted."""
    
    @property
    def properties_set(self) -> int:
        """Number of properties set."""
    
    @property
    def labels_added(self) -> int:
        """Number of labels added to nodes."""
    
    @property
    def labels_removed(self) -> int:
        """Number of labels removed from nodes."""
    
    @property
    def indexes_added(self) -> int:
        """Number of indexes created."""
    
    @property
    def indexes_removed(self) -> int:
        """Number of indexes dropped."""
    
    @property
    def constraints_added(self) -> int:
        """Number of constraints created."""
    
    @property
    def constraints_removed(self) -> int:
        """Number of constraints dropped."""
    
    @property
    def system_updates(self) -> int:
        """Number of system updates performed."""

SummaryNotification

Server notifications providing information about query execution, warnings, and optimization opportunities.

class SummaryNotification:
    @property
    def code(self) -> str:
        """Notification code identifier."""
    
    @property
    def title(self) -> str:
        """Human-readable notification title."""
    
    @property
    def description(self) -> str:
        """Detailed description of the notification."""
    
    @property
    def severity(self) -> str:
        """Severity level (WARNING, INFORMATION, etc.)."""
    
    @property
    def category(self) -> str:
        """Notification category."""
    
    @property
    def position(self) -> SummaryInputPosition | None:
        """Position in query where notification applies."""

class SummaryInputPosition:
    @property
    def offset(self) -> int:
        """Character offset in the query."""
    
    @property
    def line(self) -> int:
        """Line number in the query."""
    
    @property
    def column(self) -> int:
        """Column number in the query."""

Usage Examples

Async Results

import asyncio
from neo4j import AsyncGraphDatabase, basic_auth

async def async_example():
    driver = AsyncGraphDatabase.driver("bolt://localhost:7687", auth=basic_auth("neo4j", "password"))
    
    async with driver.session() as session:
        result = await session.run("MATCH (n:Person) RETURN n.name AS name")
        
        # Async iteration
        async for record in result:
            print(record["name"])
        
        # Async single result
        result = await session.run("MATCH (n:Person {name: $name}) RETURN n", name="Alice")
        record = await result.single()
        
        # Async summary
        result = await session.run("CREATE (n:Person {name: $name})", name="Charlie")
        summary = await result.consume()
        print(f"Nodes created: {summary.counters.nodes_created}")
    
    await driver.close()

asyncio.run(async_example())

Performance Analysis

with driver.session() as session:
    result = session.run("EXPLAIN MATCH (n:Person) RETURN n")
    summary = result.consume()
    
    if summary.plan:
        print("Query execution plan:")
        print(summary.plan)
    
    print(f"Query type: {summary.query_type}")
    print(f"Time to first record: {summary.result_available_after}ms")
    print(f"Total execution time: {summary.result_consumed_after}ms")
    
    # Check for notifications
    for notification in summary.notifications:
        print(f"Notification: {notification.title} - {notification.description}")

Install with Tessl CLI

npx tessl i tessl/pypi-neo4j

docs

authentication.md

configuration.md

data-types.md

drivers.md

index.md

sessions.md

transactions-results.md

tile.json