Use when reading from or writing to Neo4j with Apache Spark or Databricks using the Neo4j Connector for Apache Spark (org.neo4j:neo4j-connector-apache-spark). Covers SparkSession setup, DataFrame reads via labels/Cypher/relationship scan, DataFrame writes with SaveMode, node.keys for MERGE, relationship write mapping, partition and batch tuning, PySpark and Scala examples, Databricks cluster config, Databricks secrets for credentials, Delta Lake to Neo4j pipelines. Does NOT handle Cypher authoring — use neo4j-cypher-skill. Does NOT handle the Python bolt driver — use neo4j-driver-python-skill. Does NOT handle GDS algorithms — use neo4j-gds-skill.
90
88%
Does it follow best practices?
Impact
92%
1.10xAverage score across 3 eval scenarios
Passed
No known issues
neo4j-driver-python-skillneo4j-cypher-skillneo4j-gds-skillneo4j-spring-data-skill| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()org.neo4j:neo4j-connector-apache-spark_2.12 — match Scala version to runtimeneo4j.url neo4j+s://xxxx.databases.neo4j.io
neo4j.authentication.type basic
neo4j.authentication.basic.username {{secrets/neo4j/username}}
neo4j.authentication.basic.password {{secrets/neo4j/password}}# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)| Option | Description | Default |
|---|---|---|
neo4j.url | Bolt/Neo4j URI | — (required) |
neo4j.authentication.type | none, basic, kerberos, bearer | basic |
neo4j.authentication.basic.username | Username | driver default |
neo4j.authentication.basic.password | Password | driver default |
neo4j.authentication.bearer.token | Bearer token | — |
neo4j.database | Target database | driver default |
neo4j.access.mode | read or write | read |
neo4j.encryption.enabled | TLS (ignored with +s/+ssc URI) | false |
Three mutually exclusive read modes — use exactly one per .read() call.
# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()Multi-label filter (AND): .option("labels", ":Person:Employee")
Result includes <id> (internal Neo4j id) and <labels> columns.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())Use explicit RETURN aliases — they become DataFrame column names. No SKIP/LIMIT in query (connector handles pagination).
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())Result columns: <rel.id>, <rel.type>, <source.*>, <target.*>, plus relationship properties.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())Full read options reference: references/read-patterns.md
| SaveMode | Cypher | Requires |
|---|---|---|
Append | CREATE | nothing extra |
Overwrite | MERGE | node.keys (nodes) or *.node.keys (rels) |
ErrorIfExists | CREATE + error if exists | — |
Always create uniqueness constraints on node.keys properties before writing in Overwrite mode.
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())node.keys with rename: .option("node.keys", "df_col:node_property,id:personId")
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()Use coalesce(1) before relationship writes to avoid deadlocks.
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())relationship.source.save.mode / relationship.target.save.mode:
Match — find existing nodes (fail if missing)Append — always CREATE new nodesOverwrite — MERGE nodesFull write options reference: references/write-patterns.md
# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())Pipeline pattern for relationships — load both node sets first, then write edges:
# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Product").option("node.keys", "product_id").save()
# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
.option("relationship", "ORDERED") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Customer") \
.option("relationship.source.save.mode", "Match") \
.option("relationship.source.node.keys", "customer_id:customer_id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "Match") \
.option("relationship.target.node.keys", "product_id:product_id") \
.save()| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | repartition(N) where N ≤ Neo4j CPU cores |
| Relationship writes (lock risk) | coalesce(1) — single partition |
| Large datasets | batch.size 10000–20000 (adjust to heap) |
| MERGE-heavy loads | Add uniqueness constraint on node.keys properties first |
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())| Error | Cause | Fix |
|---|---|---|
ClassNotFoundException: org.neo4j.spark.DataSource | JAR not on classpath | Add spark.jars.packages or attach library |
| Deadlock on relationship write | Multiple partitions locking nodes | coalesce(1) before write |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE |
| OOM on Neo4j side | batch.size too large | Reduce to 5000–10000; check heap |
Schema all string columns | No APOC, schema not sampled | Set schema.flatten.limit higher; or use query mode with explicit types |
Access mode is read error on write | Session opened in read mode | Remove neo4j.access.mode or set to write |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
_for_spark_3)node.keys set when using Overwrite modenode.keys properties before MERGE writescoalesce(1) applied before relationship writesbatch.size sized to Neo4j heap (start 5000, tune up)query mode: no SKIP/LIMIT in Cypher (connector paginates internally)66ed0e1
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.