or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-neo4j

Apache Airflow provider package for Neo4j graph database integration with hooks and operators

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-neo4j@3.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-neo4j@3.10.0

index.mddocs/

Apache Airflow Neo4j Provider

An Apache Airflow provider package that enables integration with Neo4j graph databases. This provider offers hooks for database connections and operators for executing Cypher queries within Airflow workflows.

Package Information

  • Package Name: apache-airflow-providers-neo4j
  • Language: Python
  • Installation: pip install apache-airflow-providers-neo4j
  • Requires: apache-airflow>=2.10.0, neo4j>=5.20.0

Core Imports

from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator

Basic Usage

from airflow import DAG
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
from datetime import datetime

# Using the operator in a DAG
with DAG(
    "neo4j_example",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
) as dag:
    
    # Execute a simple Cypher query
    query_task = Neo4jOperator(
        task_id="query_neo4j",
        neo4j_conn_id="neo4j_default",
        sql="MATCH (n:Person) RETURN n.name LIMIT 10"
    )
    
    # Execute parameterized query
    param_query_task = Neo4jOperator(
        task_id="param_query",
        neo4j_conn_id="neo4j_conn",
        sql="MATCH (p:Person {name: $name}) RETURN p",
        parameters={"name": "John Doe"}
    )

# Using the hook directly
from airflow.providers.neo4j.hooks.neo4j import Neo4jHook

hook = Neo4jHook(conn_id="neo4j_default")
results = hook.run("MATCH (n) RETURN COUNT(n) as total_nodes")
print(f"Total nodes: {results[0]['total_nodes']}")

Connection Configuration

Connection Setup

Create a Neo4j connection in Airflow with these parameters:

  • Connection Type: neo4j
  • Host: Neo4j server hostname
  • Port: Neo4j server port (default: 7687)
  • Login: Username for authentication
  • Password: Password for authentication
  • Schema: Database name (optional, for multi-database setups)

Connection Extras

Configure connection behavior through JSON extras:

{
  "neo4j_scheme": false,
  "encrypted": false,
  "certs_self_signed": false,
  "certs_trusted_ca": false
}

Capabilities

Database Connection Management

Establishes and manages connections to Neo4j databases with support for various connection schemes, encryption options, and authentication methods.

class Neo4jHook(BaseHook):
    """
    Hook for interacting with Neo4j databases.
    
    Parameters:
    - conn_id: str, connection ID (default: "neo4j_default")
    
    Attributes:
    - conn_name_attr: str = "neo4j_conn_id"
    - default_conn_name: str = "neo4j_default"
    - conn_type: str = "neo4j"
    - hook_name: str = "Neo4j"
    """
    
    def __init__(self, conn_id: str = "neo4j_default", *args, **kwargs) -> None: ...
    
    def get_conn(self) -> Driver:
        """Establish Neo4j database connection."""
    
    def get_uri(self, conn: Connection) -> str:
        """Build connection URI from connection configuration."""
    
    def run(self, query: str, parameters: dict[str, Any] | None = None) -> list[Any]:
        """Execute Neo4j query and return results."""

Query Execution

Executes Cypher queries within Airflow tasks with support for parameterized queries and template variables.

class Neo4jOperator(BaseOperator):
    """
    Operator for executing Cypher queries in Neo4j.
    
    Parameters:
    - sql: str, Cypher query to execute
    - neo4j_conn_id: str, connection ID (default: "neo4j_default")
    - parameters: dict[str, Any] | None, query parameters
    
    Attributes:
    - template_fields: Sequence[str] = ("sql", "parameters")
    """
    
    def __init__(
        self,
        *,
        sql: str,
        neo4j_conn_id: str = "neo4j_default",
        parameters: dict[str, Any] | None = None,
        **kwargs,
    ) -> None: ...
    
    def execute(self, context: Context) -> None:
        """Execute the Cypher query using Neo4jHook."""

Connection URI Schemes

The provider supports multiple Neo4j connection schemes based on connection configuration:

DEFAULT_NEO4J_PORT: int = 7687

# URI format examples:
# bolt://hostname:7687 (default)
# neo4j://hostname:7687 (routing enabled)
# bolt+ssc://hostname:7687 (self-signed certificates)
# bolt+s://hostname:7687 (trusted CA certificates)
# neo4j+ssc://hostname:7687 (routing + self-signed)
# neo4j+s://hostname:7687 (routing + trusted CA)

Types

from typing import Any
from collections.abc import Sequence
from neo4j import Driver
from airflow.models import Connection

# Context type varies by Airflow version
try:
    from airflow.sdk.definitions.context import Context
except ImportError:
    from airflow.utils.context import Context

# The provider uses version compatibility handling
from airflow.providers.neo4j.version_compat import BaseOperator

# BaseHook varies by Airflow version  
try:
    from airflow.sdk.bases.hook import BaseHook
except ImportError:
    from airflow.hooks.base import BaseHook

Usage Examples

Basic Query Execution

# Simple node query
results = hook.run("MATCH (n:Person) RETURN n.name, n.age")
for record in results:
    print(f"Name: {record['n.name']}, Age: {record['n.age']}")

# Count query
count_result = hook.run("MATCH (n) RETURN COUNT(n) as node_count")
total_nodes = count_result[0]['node_count']

Parameterized Queries

# Using parameters for safe query execution
query = "MATCH (p:Person {country: $country}) RETURN p.name"
parameters = {"country": "USA"}
results = hook.run(query, parameters)

# Creating nodes with parameters
create_query = """
CREATE (p:Person {
    name: $name,
    age: $age,
    email: $email
})
RETURN p
"""
create_params = {
    "name": "Alice Smith",
    "age": 30,
    "email": "alice@example.com"
}
hook.run(create_query, create_params)

Template Variables in Operators

# Using Airflow template variables
templated_operator = Neo4jOperator(
    task_id="daily_analysis",
    sql="""
    MATCH (event:Event)
    WHERE event.date = $analysis_date
    RETURN COUNT(event) as daily_count
    """,
    parameters={
        "analysis_date": "{{ ds }}"  # Airflow execution date
    }
)

# Multi-line templated queries
complex_query = """
MATCH (user:User)-[r:PURCHASED]->(product:Product)
WHERE r.date >= $start_date AND r.date <= $end_date
RETURN 
    user.name,
    product.category,
    COUNT(r) as purchase_count,
    SUM(r.amount) as total_spent
ORDER BY total_spent DESC
LIMIT $limit
"""

analysis_operator = Neo4jOperator(
    task_id="purchase_analysis",
    sql=complex_query,
    parameters={
        "start_date": "{{ ds }}",
        "end_date": "{{ next_ds }}",
        "limit": 100
    }
)

Connection Configuration Examples

# Standard bolt connection
{
    "conn_type": "neo4j",
    "host": "localhost", 
    "port": 7687,
    "login": "neo4j",
    "password": "password",
    "schema": "neo4j"  # database name
}

# Secure connection with routing
{
    "conn_type": "neo4j",
    "host": "neo4j.company.com",
    "port": 7687,
    "login": "app_user",
    "password": "secure_password",
    "extra": {
        "neo4j_scheme": true,        # Use neo4j:// scheme
        "certs_trusted_ca": true,    # Enable TLS with trusted CA
        "encrypted": true            # Force encryption
    }
}