Apache Airflow provider package for Neo4j graph database integration with hooks and operators
npx @tessl/cli install tessl/pypi-apache-airflow-providers-neo4j@3.10.0An Apache Airflow provider package that enables integration with Neo4j graph databases. This provider offers hooks for database connections and operators for executing Cypher queries within Airflow workflows.
pip install apache-airflow-providers-neo4jfrom airflow.providers.neo4j.hooks.neo4j import Neo4jHook
from airflow.providers.neo4j.operators.neo4j import Neo4jOperatorfrom airflow import DAG
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
from datetime import datetime
# Using the operator in a DAG
with DAG(
"neo4j_example",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
) as dag:
# Execute a simple Cypher query
query_task = Neo4jOperator(
task_id="query_neo4j",
neo4j_conn_id="neo4j_default",
sql="MATCH (n:Person) RETURN n.name LIMIT 10"
)
# Execute parameterized query
param_query_task = Neo4jOperator(
task_id="param_query",
neo4j_conn_id="neo4j_conn",
sql="MATCH (p:Person {name: $name}) RETURN p",
parameters={"name": "John Doe"}
)
# Using the hook directly
from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
hook = Neo4jHook(conn_id="neo4j_default")
results = hook.run("MATCH (n) RETURN COUNT(n) as total_nodes")
print(f"Total nodes: {results[0]['total_nodes']}")Create a Neo4j connection in Airflow with these parameters:
Configure connection behavior through JSON extras:
{
"neo4j_scheme": false,
"encrypted": false,
"certs_self_signed": false,
"certs_trusted_ca": false
}Establishes and manages connections to Neo4j databases with support for various connection schemes, encryption options, and authentication methods.
class Neo4jHook(BaseHook):
"""
Hook for interacting with Neo4j databases.
Parameters:
- conn_id: str, connection ID (default: "neo4j_default")
Attributes:
- conn_name_attr: str = "neo4j_conn_id"
- default_conn_name: str = "neo4j_default"
- conn_type: str = "neo4j"
- hook_name: str = "Neo4j"
"""
def __init__(self, conn_id: str = "neo4j_default", *args, **kwargs) -> None: ...
def get_conn(self) -> Driver:
"""Establish Neo4j database connection."""
def get_uri(self, conn: Connection) -> str:
"""Build connection URI from connection configuration."""
def run(self, query: str, parameters: dict[str, Any] | None = None) -> list[Any]:
"""Execute Neo4j query and return results."""Executes Cypher queries within Airflow tasks with support for parameterized queries and template variables.
class Neo4jOperator(BaseOperator):
"""
Operator for executing Cypher queries in Neo4j.
Parameters:
- sql: str, Cypher query to execute
- neo4j_conn_id: str, connection ID (default: "neo4j_default")
- parameters: dict[str, Any] | None, query parameters
Attributes:
- template_fields: Sequence[str] = ("sql", "parameters")
"""
def __init__(
self,
*,
sql: str,
neo4j_conn_id: str = "neo4j_default",
parameters: dict[str, Any] | None = None,
**kwargs,
) -> None: ...
def execute(self, context: Context) -> None:
"""Execute the Cypher query using Neo4jHook."""The provider supports multiple Neo4j connection schemes based on connection configuration:
DEFAULT_NEO4J_PORT: int = 7687
# URI format examples:
# bolt://hostname:7687 (default)
# neo4j://hostname:7687 (routing enabled)
# bolt+ssc://hostname:7687 (self-signed certificates)
# bolt+s://hostname:7687 (trusted CA certificates)
# neo4j+ssc://hostname:7687 (routing + self-signed)
# neo4j+s://hostname:7687 (routing + trusted CA)from typing import Any
from collections.abc import Sequence
from neo4j import Driver
from airflow.models import Connection
# Context type varies by Airflow version
try:
from airflow.sdk.definitions.context import Context
except ImportError:
from airflow.utils.context import Context
# The provider uses version compatibility handling
from airflow.providers.neo4j.version_compat import BaseOperator
# BaseHook varies by Airflow version
try:
from airflow.sdk.bases.hook import BaseHook
except ImportError:
from airflow.hooks.base import BaseHook# Simple node query
results = hook.run("MATCH (n:Person) RETURN n.name, n.age")
for record in results:
print(f"Name: {record['n.name']}, Age: {record['n.age']}")
# Count query
count_result = hook.run("MATCH (n) RETURN COUNT(n) as node_count")
total_nodes = count_result[0]['node_count']# Using parameters for safe query execution
query = "MATCH (p:Person {country: $country}) RETURN p.name"
parameters = {"country": "USA"}
results = hook.run(query, parameters)
# Creating nodes with parameters
create_query = """
CREATE (p:Person {
name: $name,
age: $age,
email: $email
})
RETURN p
"""
create_params = {
"name": "Alice Smith",
"age": 30,
"email": "alice@example.com"
}
hook.run(create_query, create_params)# Using Airflow template variables
templated_operator = Neo4jOperator(
task_id="daily_analysis",
sql="""
MATCH (event:Event)
WHERE event.date = $analysis_date
RETURN COUNT(event) as daily_count
""",
parameters={
"analysis_date": "{{ ds }}" # Airflow execution date
}
)
# Multi-line templated queries
complex_query = """
MATCH (user:User)-[r:PURCHASED]->(product:Product)
WHERE r.date >= $start_date AND r.date <= $end_date
RETURN
user.name,
product.category,
COUNT(r) as purchase_count,
SUM(r.amount) as total_spent
ORDER BY total_spent DESC
LIMIT $limit
"""
analysis_operator = Neo4jOperator(
task_id="purchase_analysis",
sql=complex_query,
parameters={
"start_date": "{{ ds }}",
"end_date": "{{ next_ds }}",
"limit": 100
}
)# Standard bolt connection
{
"conn_type": "neo4j",
"host": "localhost",
"port": 7687,
"login": "neo4j",
"password": "password",
"schema": "neo4j" # database name
}
# Secure connection with routing
{
"conn_type": "neo4j",
"host": "neo4j.company.com",
"port": 7687,
"login": "app_user",
"password": "secure_password",
"extra": {
"neo4j_scheme": true, # Use neo4j:// scheme
"certs_trusted_ca": true, # Enable TLS with trusted CA
"encrypted": true # Force encryption
}
}