CtrlK
BlogDocsLog inGet started
Tessl Logo

neo4j-spark-skill

Use when reading from or writing to Neo4j with Apache Spark or Databricks using the Neo4j Connector for Apache Spark (org.neo4j:neo4j-connector-apache-spark). Covers SparkSession setup, DataFrame reads via labels/Cypher/relationship scan, DataFrame writes with SaveMode, node.keys for MERGE, relationship write mapping, partition and batch tuning, PySpark and Scala examples, Databricks cluster config, Databricks secrets for credentials, Delta Lake to Neo4j pipelines. Does NOT handle Cypher authoring — use neo4j-cypher-skill. Does NOT handle the Python bolt driver — use neo4j-driver-python-skill. Does NOT handle GDS algorithms — use neo4j-gds-skill.

90

1.10x
Quality

88%

Does it follow best practices?

Impact

92%

1.10x

Average score across 3 eval scenarios

SecuritybySnyk

Passed

No known issues

SKILL.md
Quality
Evals
Security

Neo4j Connector for Apache Spark

When to Use

  • Reading Neo4j nodes/relationships into Spark DataFrames
  • Writing Spark DataFrames to Neo4j as nodes or relationships
  • Databricks notebooks connecting to Neo4j
  • Delta Lake → Neo4j ingestion pipelines
  • Partitioned parallel reads from large Neo4j graphs

When NOT to Use

  • Python bolt driver / execute_queryneo4j-driver-python-skill
  • Cypher query writingneo4j-cypher-skill
  • GDS graph algorithmsneo4j-gds-skill
  • Spring Boot + Neo4jneo4j-spring-data-skill

Version Matrix

ConnectorSparkScalaDatabricks RuntimeNeo4j
5.4.x3.3, 3.4, 3.52.12, 2.1312.2, 13.3, 14.3 LTS4.4, 5.x, 2025.x

Maven artifact (Scala 2.12, Spark 3):

org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3

Scala 2.13 variant:

org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3

Setup

Standalone Spark (PySpark)

from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("neo4j-app")
    .config("spark.jars.packages",
            "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
    .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "neo4j")
    .config("neo4j.authentication.basic.password", "password")
    .getOrCreate())

Standalone Spark (Scala)

val spark = SparkSession.builder
  .appName("neo4j-app")
  .config("spark.jars.packages",
    "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
  .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
  .config("neo4j.authentication.type", "basic")
  .config("neo4j.authentication.basic.username", "neo4j")
  .config("neo4j.authentication.basic.password", "password")
  .getOrCreate()

Databricks — Cluster Installation

  1. Cluster → LibrariesInstall NewMaven
  2. Search: org.neo4j:neo4j-connector-apache-spark_2.12 — match Scala version to runtime
  3. Cluster → Advanced OptionsSpark tab — add config:
    neo4j.url neo4j+s://xxxx.databases.neo4j.io
    neo4j.authentication.type basic
    neo4j.authentication.basic.username {{secrets/neo4j/username}}
    neo4j.authentication.basic.password {{secrets/neo4j/password}}
  4. Use Single user access mode (Unity Catalog shared mode not supported)

Databricks — Secrets (preferred over plaintext)

# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password

neo4j_url  = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")

spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)

Key Configuration Options

OptionDescriptionDefault
neo4j.urlBolt/Neo4j URI— (required)
neo4j.authentication.typenone, basic, kerberos, bearerbasic
neo4j.authentication.basic.usernameUsernamedriver default
neo4j.authentication.basic.passwordPassworddriver default
neo4j.authentication.bearer.tokenBearer token
neo4j.databaseTarget databasedriver default
neo4j.access.moderead or writeread
neo4j.encryption.enabledTLS (ignored with +s/+ssc URI)false

Reading from Neo4j

Three mutually exclusive read modes — use exactly one per .read() call.

Label scan (nodes)

# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Person")
    .load())
df.printSchema()
df.show()
// Scala
val df = spark.read
  .format("org.neo4j.spark.DataSource")
  .option("labels", ":Person")
  .load()

Multi-label filter (AND): .option("labels", ":Person:Employee")

Result includes <id> (internal Neo4j id) and <labels> columns.

Cypher query read

df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
    .load())

Use explicit RETURN aliases — they become DataFrame column names. No SKIP/LIMIT in query (connector handles pagination).

Relationship scan

df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("relationship", "BOUGHT")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.target.labels", ":Product")
    .load())

Result columns: <rel.id>, <rel.type>, <source.*>, <target.*>, plus relationship properties.

Read partition tuning

df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Transaction")
    .option("partitions", "10")        # parallel partitions (default: 1)
    .option("batch.size", "5000")      # rows per partition batch (default: 5000)
    .option("schema.flatten.limit", "100")  # rows sampled for schema inference
    .load())

Full read options reference: references/read-patterns.md


Writing to Neo4j

SaveMode

SaveModeCypherRequires
AppendCREATEnothing extra
OverwriteMERGEnode.keys (nodes) or *.node.keys (rels)
ErrorIfExistsCREATE + error if exists

Always create uniqueness constraints on node.keys properties before writing in Overwrite mode.

Write nodes — Append (CREATE)

from pyspark.sql import Row

people = spark.createDataFrame([
    {"name": "Alice", "age": 30},
    {"name": "Bob",   "age": 25},
])

(people.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save())

Write nodes — Overwrite (MERGE)

(people.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Person")
    .option("node.keys", "name")       # comma-separated; df_col:node_prop if names differ
    .save())

node.keys with rename: .option("node.keys", "df_col:node_property,id:personId")

Write nodes — Scala

import org.apache.spark.sql.SaveMode

peopleDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("labels", ":Person")
  .option("node.keys", "name")
  .save()

Write relationships

Use coalesce(1) before relationship writes to avoid deadlocks.

rel_df = spark.createDataFrame([
    {"cust_id": "C1", "prod_id": "P1", "qty": 3},
    {"cust_id": "C2", "prod_id": "P2", "qty": 1},
])

(rel_df.coalesce(1)
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "BOUGHT")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.source.save.mode", "Match")          # require existing nodes
    .option("relationship.source.node.keys", "cust_id:id")
    .option("relationship.target.labels", ":Product")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.node.keys", "prod_id:id")
    .option("relationship.properties", "qty:quantity")
    .save())

relationship.source.save.mode / relationship.target.save.mode:

  • Match — find existing nodes (fail if missing)
  • Append — always CREATE new nodes
  • Overwrite — MERGE nodes

Full write options reference: references/write-patterns.md


Databricks — Delta Lake → Neo4j Pipeline

# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")

# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")

# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Customer")
    .option("node.keys", "customer_id")
    .option("batch.size", "20000")
    .save())

Pipeline pattern for relationships — load both node sets first, then write edges:

# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
    .option("labels", ":Customer").option("node.keys", "customer_id").save()

products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
    .option("labels", ":Product").option("node.keys", "product_id").save()

# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
    .option("relationship", "ORDERED") \
    .option("relationship.save.strategy", "keys") \
    .option("relationship.source.labels", ":Customer") \
    .option("relationship.source.save.mode", "Match") \
    .option("relationship.source.node.keys", "customer_id:customer_id") \
    .option("relationship.target.labels", ":Product") \
    .option("relationship.target.save.mode", "Match") \
    .option("relationship.target.node.keys", "product_id:product_id") \
    .save()

Write Performance Tuning

ScenarioRecommendation
Node writes (no lock contention)repartition(N) where N ≤ Neo4j CPU cores
Relationship writes (lock risk)coalesce(1) — single partition
Large datasetsbatch.size 10000–20000 (adjust to heap)
MERGE-heavy loadsAdd uniqueness constraint on node.keys properties first
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
    .write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Event")
    .option("node.keys", "event_id")
    .option("batch.size", "20000")
    .save())

Common Errors

ErrorCauseFix
ClassNotFoundException: org.neo4j.spark.DataSourceJAR not on classpathAdd spark.jars.packages or attach library
Deadlock on relationship writeMultiple partitions locking nodescoalesce(1) before write
Duplicate nodes on OverwriteNo uniqueness constraint on keysCREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE
OOM on Neo4j sidebatch.size too largeReduce to 5000–10000; check heap
Schema all string columnsNo APOC, schema not sampledSet schema.flatten.limit higher; or use query mode with explicit types
Access mode is read error on writeSession opened in read modeRemove neo4j.access.mode or set to write
Databricks Shared cluster failsUnity Catalog shared mode unsupportedSwitch to Single User access mode

Checklist

  • Connector JAR version matches Spark version suffix (_for_spark_3)
  • Scala version in artifact matches cluster runtime (2.12 vs 2.13)
  • Credentials in Databricks secrets or env vars — not hardcoded
  • node.keys set when using Overwrite mode
  • Uniqueness constraint created on node.keys properties before MERGE writes
  • coalesce(1) applied before relationship writes
  • batch.size sized to Neo4j heap (start 5000, tune up)
  • Delta Lake → Neo4j: nodes written before relationships
  • query mode: no SKIP/LIMIT in Cypher (connector paginates internally)
  • Databricks: Single User access mode (not Shared)
Repository
neo4j-contrib/neo4j-skills
Last updated
Created

Is this your skill?

If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.