Apache Airflow provider package for Neo4j graph database integration with hooks and operators
npx @tessl/cli install tessl/pypi-apache-airflow-providers-neo4j@3.10.00
# Apache Airflow Neo4j Provider
1
2
An Apache Airflow provider package that enables integration with Neo4j graph databases. This provider offers hooks for database connections and operators for executing Cypher queries within Airflow workflows.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-neo4j
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-neo4j`
9
- **Requires**: apache-airflow>=2.10.0, neo4j>=5.20.0
10
11
## Core Imports
12
13
```python
14
from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
15
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
16
```
17
18
## Basic Usage
19
20
```python
21
from airflow import DAG
22
from airflow.providers.neo4j.operators.neo4j import Neo4jOperator
23
from datetime import datetime
24
25
# Using the operator in a DAG
26
with DAG(
27
"neo4j_example",
28
start_date=datetime(2024, 1, 1),
29
schedule=None,
30
catchup=False,
31
) as dag:
32
33
# Execute a simple Cypher query
34
query_task = Neo4jOperator(
35
task_id="query_neo4j",
36
neo4j_conn_id="neo4j_default",
37
sql="MATCH (n:Person) RETURN n.name LIMIT 10"
38
)
39
40
# Execute parameterized query
41
param_query_task = Neo4jOperator(
42
task_id="param_query",
43
neo4j_conn_id="neo4j_conn",
44
sql="MATCH (p:Person {name: $name}) RETURN p",
45
parameters={"name": "John Doe"}
46
)
47
48
# Using the hook directly
49
from airflow.providers.neo4j.hooks.neo4j import Neo4jHook
50
51
hook = Neo4jHook(conn_id="neo4j_default")
52
results = hook.run("MATCH (n) RETURN COUNT(n) as total_nodes")
53
print(f"Total nodes: {results[0]['total_nodes']}")
54
```
55
56
## Connection Configuration
57
58
### Connection Setup
59
60
Create a Neo4j connection in Airflow with these parameters:
61
62
- **Connection Type**: neo4j
63
- **Host**: Neo4j server hostname
64
- **Port**: Neo4j server port (default: 7687)
65
- **Login**: Username for authentication
66
- **Password**: Password for authentication
67
- **Schema**: Database name (optional, for multi-database setups)
68
69
### Connection Extras
70
71
Configure connection behavior through JSON extras:
72
73
```json
74
{
75
"neo4j_scheme": false,
76
"encrypted": false,
77
"certs_self_signed": false,
78
"certs_trusted_ca": false
79
}
80
```
81
82
## Capabilities
83
84
### Database Connection Management
85
86
Establishes and manages connections to Neo4j databases with support for various connection schemes, encryption options, and authentication methods.
87
88
```python { .api }
89
class Neo4jHook(BaseHook):
90
"""
91
Hook for interacting with Neo4j databases.
92
93
Parameters:
94
- conn_id: str, connection ID (default: "neo4j_default")
95
96
Attributes:
97
- conn_name_attr: str = "neo4j_conn_id"
98
- default_conn_name: str = "neo4j_default"
99
- conn_type: str = "neo4j"
100
- hook_name: str = "Neo4j"
101
"""
102
103
def __init__(self, conn_id: str = "neo4j_default", *args, **kwargs) -> None: ...
104
105
def get_conn(self) -> Driver:
106
"""Establish Neo4j database connection."""
107
108
def get_uri(self, conn: Connection) -> str:
109
"""Build connection URI from connection configuration."""
110
111
def run(self, query: str, parameters: dict[str, Any] | None = None) -> list[Any]:
112
"""Execute Neo4j query and return results."""
113
```
114
115
### Query Execution
116
117
Executes Cypher queries within Airflow tasks with support for parameterized queries and template variables.
118
119
```python { .api }
120
class Neo4jOperator(BaseOperator):
121
"""
122
Operator for executing Cypher queries in Neo4j.
123
124
Parameters:
125
- sql: str, Cypher query to execute
126
- neo4j_conn_id: str, connection ID (default: "neo4j_default")
127
- parameters: dict[str, Any] | None, query parameters
128
129
Attributes:
130
- template_fields: Sequence[str] = ("sql", "parameters")
131
"""
132
133
def __init__(
134
self,
135
*,
136
sql: str,
137
neo4j_conn_id: str = "neo4j_default",
138
parameters: dict[str, Any] | None = None,
139
**kwargs,
140
) -> None: ...
141
142
def execute(self, context: Context) -> None:
143
"""Execute the Cypher query using Neo4jHook."""
144
```
145
146
## Connection URI Schemes
147
148
The provider supports multiple Neo4j connection schemes based on connection configuration:
149
150
```python { .api }
151
DEFAULT_NEO4J_PORT: int = 7687
152
153
# URI format examples:
154
# bolt://hostname:7687 (default)
155
# neo4j://hostname:7687 (routing enabled)
156
# bolt+ssc://hostname:7687 (self-signed certificates)
157
# bolt+s://hostname:7687 (trusted CA certificates)
158
# neo4j+ssc://hostname:7687 (routing + self-signed)
159
# neo4j+s://hostname:7687 (routing + trusted CA)
160
```
161
162
## Types
163
164
```python { .api }
165
from typing import Any
166
from collections.abc import Sequence
167
from neo4j import Driver
168
from airflow.models import Connection
169
170
# Context type varies by Airflow version
171
try:
172
from airflow.sdk.definitions.context import Context
173
except ImportError:
174
from airflow.utils.context import Context
175
176
# The provider uses version compatibility handling
177
from airflow.providers.neo4j.version_compat import BaseOperator
178
179
# BaseHook varies by Airflow version
180
try:
181
from airflow.sdk.bases.hook import BaseHook
182
except ImportError:
183
from airflow.hooks.base import BaseHook
184
```
185
186
## Usage Examples
187
188
### Basic Query Execution
189
190
```python
191
# Simple node query
192
results = hook.run("MATCH (n:Person) RETURN n.name, n.age")
193
for record in results:
194
print(f"Name: {record['n.name']}, Age: {record['n.age']}")
195
196
# Count query
197
count_result = hook.run("MATCH (n) RETURN COUNT(n) as node_count")
198
total_nodes = count_result[0]['node_count']
199
```
200
201
### Parameterized Queries
202
203
```python
204
# Using parameters for safe query execution
205
query = "MATCH (p:Person {country: $country}) RETURN p.name"
206
parameters = {"country": "USA"}
207
results = hook.run(query, parameters)
208
209
# Creating nodes with parameters
210
create_query = """
211
CREATE (p:Person {
212
name: $name,
213
age: $age,
214
email: $email
215
})
216
RETURN p
217
"""
218
create_params = {
219
"name": "Alice Smith",
220
"age": 30,
221
"email": "alice@example.com"
222
}
223
hook.run(create_query, create_params)
224
```
225
226
### Template Variables in Operators
227
228
```python
229
# Using Airflow template variables
230
templated_operator = Neo4jOperator(
231
task_id="daily_analysis",
232
sql="""
233
MATCH (event:Event)
234
WHERE event.date = $analysis_date
235
RETURN COUNT(event) as daily_count
236
""",
237
parameters={
238
"analysis_date": "{{ ds }}" # Airflow execution date
239
}
240
)
241
242
# Multi-line templated queries
243
complex_query = """
244
MATCH (user:User)-[r:PURCHASED]->(product:Product)
245
WHERE r.date >= $start_date AND r.date <= $end_date
246
RETURN
247
user.name,
248
product.category,
249
COUNT(r) as purchase_count,
250
SUM(r.amount) as total_spent
251
ORDER BY total_spent DESC
252
LIMIT $limit
253
"""
254
255
analysis_operator = Neo4jOperator(
256
task_id="purchase_analysis",
257
sql=complex_query,
258
parameters={
259
"start_date": "{{ ds }}",
260
"end_date": "{{ next_ds }}",
261
"limit": 100
262
}
263
)
264
```
265
266
### Connection Configuration Examples
267
268
```python
269
# Standard bolt connection
270
{
271
"conn_type": "neo4j",
272
"host": "localhost",
273
"port": 7687,
274
"login": "neo4j",
275
"password": "password",
276
"schema": "neo4j" # database name
277
}
278
279
# Secure connection with routing
280
{
281
"conn_type": "neo4j",
282
"host": "neo4j.company.com",
283
"port": 7687,
284
"login": "app_user",
285
"password": "secure_password",
286
"extra": {
287
"neo4j_scheme": true, # Use neo4j:// scheme
288
"certs_trusted_ca": true, # Enable TLS with trusted CA
289
"encrypted": true # Force encryption
290
}
291
}
292
```