or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-neo4j

Apache Airflow provider package for Neo4j graph database integration with hooks and operators

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-neo4j@3.10.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-neo4j@3.10.0

0

# Apache Airflow Neo4j Provider

1

2

An Apache Airflow provider package that enables integration with Neo4j graph databases. This provider offers hooks for database connections and operators for executing Cypher queries within Airflow workflows.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-neo4j

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-neo4j`

9

- **Requires**: apache-airflow>=2.10.0, neo4j>=5.20.0

10

11

## Core Imports

12

13

```python

14

from airflow.providers.neo4j.hooks.neo4j import Neo4jHook

15

from airflow.providers.neo4j.operators.neo4j import Neo4jOperator

16

```

17

18

## Basic Usage

19

20

```python

21

from airflow import DAG

22

from airflow.providers.neo4j.operators.neo4j import Neo4jOperator

23

from datetime import datetime

24

25

# Using the operator in a DAG

26

with DAG(

27

"neo4j_example",

28

start_date=datetime(2024, 1, 1),

29

schedule=None,

30

catchup=False,

31

) as dag:

32

33

# Execute a simple Cypher query

34

query_task = Neo4jOperator(

35

task_id="query_neo4j",

36

neo4j_conn_id="neo4j_default",

37

sql="MATCH (n:Person) RETURN n.name LIMIT 10"

38

)

39

40

# Execute parameterized query

41

param_query_task = Neo4jOperator(

42

task_id="param_query",

43

neo4j_conn_id="neo4j_conn",

44

sql="MATCH (p:Person {name: $name}) RETURN p",

45

parameters={"name": "John Doe"}

46

)

47

48

# Using the hook directly

49

from airflow.providers.neo4j.hooks.neo4j import Neo4jHook

50

51

hook = Neo4jHook(conn_id="neo4j_default")

52

results = hook.run("MATCH (n) RETURN COUNT(n) as total_nodes")

53

print(f"Total nodes: {results[0]['total_nodes']}")

54

```

55

56

## Connection Configuration

57

58

### Connection Setup

59

60

Create a Neo4j connection in Airflow with these parameters:

61

62

- **Connection Type**: neo4j

63

- **Host**: Neo4j server hostname

64

- **Port**: Neo4j server port (default: 7687)

65

- **Login**: Username for authentication

66

- **Password**: Password for authentication

67

- **Schema**: Database name (optional, for multi-database setups)

68

69

### Connection Extras

70

71

Configure connection behavior through JSON extras:

72

73

```json

74

{

75

"neo4j_scheme": false,

76

"encrypted": false,

77

"certs_self_signed": false,

78

"certs_trusted_ca": false

79

}

80

```

81

82

## Capabilities

83

84

### Database Connection Management

85

86

Establishes and manages connections to Neo4j databases with support for various connection schemes, encryption options, and authentication methods.

87

88

```python { .api }

89

class Neo4jHook(BaseHook):

90

"""

91

Hook for interacting with Neo4j databases.

92

93

Parameters:

94

- conn_id: str, connection ID (default: "neo4j_default")

95

96

Attributes:

97

- conn_name_attr: str = "neo4j_conn_id"

98

- default_conn_name: str = "neo4j_default"

99

- conn_type: str = "neo4j"

100

- hook_name: str = "Neo4j"

101

"""

102

103

def __init__(self, conn_id: str = "neo4j_default", *args, **kwargs) -> None: ...

104

105

def get_conn(self) -> Driver:

106

"""Establish Neo4j database connection."""

107

108

def get_uri(self, conn: Connection) -> str:

109

"""Build connection URI from connection configuration."""

110

111

def run(self, query: str, parameters: dict[str, Any] | None = None) -> list[Any]:

112

"""Execute Neo4j query and return results."""

113

```

114

115

### Query Execution

116

117

Executes Cypher queries within Airflow tasks with support for parameterized queries and template variables.

118

119

```python { .api }

120

class Neo4jOperator(BaseOperator):

121

"""

122

Operator for executing Cypher queries in Neo4j.

123

124

Parameters:

125

- sql: str, Cypher query to execute

126

- neo4j_conn_id: str, connection ID (default: "neo4j_default")

127

- parameters: dict[str, Any] | None, query parameters

128

129

Attributes:

130

- template_fields: Sequence[str] = ("sql", "parameters")

131

"""

132

133

def __init__(

134

self,

135

*,

136

sql: str,

137

neo4j_conn_id: str = "neo4j_default",

138

parameters: dict[str, Any] | None = None,

139

**kwargs,

140

) -> None: ...

141

142

def execute(self, context: Context) -> None:

143

"""Execute the Cypher query using Neo4jHook."""

144

```

145

146

## Connection URI Schemes

147

148

The provider supports multiple Neo4j connection schemes based on connection configuration:

149

150

```python { .api }

151

DEFAULT_NEO4J_PORT: int = 7687

152

153

# URI format examples:

154

# bolt://hostname:7687 (default)

155

# neo4j://hostname:7687 (routing enabled)

156

# bolt+ssc://hostname:7687 (self-signed certificates)

157

# bolt+s://hostname:7687 (trusted CA certificates)

158

# neo4j+ssc://hostname:7687 (routing + self-signed)

159

# neo4j+s://hostname:7687 (routing + trusted CA)

160

```

161

162

## Types

163

164

```python { .api }

165

from typing import Any

166

from collections.abc import Sequence

167

from neo4j import Driver

168

from airflow.models import Connection

169

170

# Context type varies by Airflow version

171

try:

172

from airflow.sdk.definitions.context import Context

173

except ImportError:

174

from airflow.utils.context import Context

175

176

# The provider uses version compatibility handling

177

from airflow.providers.neo4j.version_compat import BaseOperator

178

179

# BaseHook varies by Airflow version

180

try:

181

from airflow.sdk.bases.hook import BaseHook

182

except ImportError:

183

from airflow.hooks.base import BaseHook

184

```

185

186

## Usage Examples

187

188

### Basic Query Execution

189

190

```python

191

# Simple node query

192

results = hook.run("MATCH (n:Person) RETURN n.name, n.age")

193

for record in results:

194

print(f"Name: {record['n.name']}, Age: {record['n.age']}")

195

196

# Count query

197

count_result = hook.run("MATCH (n) RETURN COUNT(n) as node_count")

198

total_nodes = count_result[0]['node_count']

199

```

200

201

### Parameterized Queries

202

203

```python

204

# Using parameters for safe query execution

205

query = "MATCH (p:Person {country: $country}) RETURN p.name"

206

parameters = {"country": "USA"}

207

results = hook.run(query, parameters)

208

209

# Creating nodes with parameters

210

create_query = """

211

CREATE (p:Person {

212

name: $name,

213

age: $age,

214

email: $email

215

})

216

RETURN p

217

"""

218

create_params = {

219

"name": "Alice Smith",

220

"age": 30,

221

"email": "alice@example.com"

222

}

223

hook.run(create_query, create_params)

224

```

225

226

### Template Variables in Operators

227

228

```python

229

# Using Airflow template variables

230

templated_operator = Neo4jOperator(

231

task_id="daily_analysis",

232

sql="""

233

MATCH (event:Event)

234

WHERE event.date = $analysis_date

235

RETURN COUNT(event) as daily_count

236

""",

237

parameters={

238

"analysis_date": "{{ ds }}" # Airflow execution date

239

}

240

)

241

242

# Multi-line templated queries

243

complex_query = """

244

MATCH (user:User)-[r:PURCHASED]->(product:Product)

245

WHERE r.date >= $start_date AND r.date <= $end_date

246

RETURN

247

user.name,

248

product.category,

249

COUNT(r) as purchase_count,

250

SUM(r.amount) as total_spent

251

ORDER BY total_spent DESC

252

LIMIT $limit

253

"""

254

255

analysis_operator = Neo4jOperator(

256

task_id="purchase_analysis",

257

sql=complex_query,

258

parameters={

259

"start_date": "{{ ds }}",

260

"end_date": "{{ next_ds }}",

261

"limit": 100

262

}

263

)

264

```

265

266

### Connection Configuration Examples

267

268

```python

269

# Standard bolt connection

270

{

271

"conn_type": "neo4j",

272

"host": "localhost",

273

"port": 7687,

274

"login": "neo4j",

275

"password": "password",

276

"schema": "neo4j" # database name

277

}

278

279

# Secure connection with routing

280

{

281

"conn_type": "neo4j",

282

"host": "neo4j.company.com",

283

"port": 7687,

284

"login": "app_user",

285

"password": "secure_password",

286

"extra": {

287

"neo4j_scheme": true, # Use neo4j:// scheme

288

"certs_trusted_ca": true, # Enable TLS with trusted CA

289

"encrypted": true # Force encryption

290

}

291

}

292

```