Configure and operate the Neo4j Connector for Kafka (sink + source) and the native Neo4j CDC API. Covers Cypher/Pattern/CUD sink strategies, CDC-based and query-based source, exactly-once semantics, DLQ error handling, Confluent Cloud managed connector, schema registry (Avro/JSON), and native db.cdc.query cursor-loop patterns (Neo4j 5.13+ Enterprise/Aura BC/VDC). Use when streaming Kafka events into Neo4j, streaming Neo4j changes to Kafka, or querying Neo4j change events without Kafka. Does NOT handle Cypher query authoring — use neo4j-cypher-skill. Does NOT handle bulk CSV/file import — use neo4j-import-skill. Does NOT handle GDS algorithms — use neo4j-gds-skill.
72
88%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Passed
No known issues
db.cdc.query)neo4j-cypher-skillneo4j-import-skillneo4j-gds-skillneo4j-cypher-skill| Use case | Strategy |
|---|---|
| Custom transformation of Kafka payload → graph | Sink: Cypher |
| Mirror another Neo4j CDC source | Sink: CDC (schema or source-id sub-strategy) |
| Map Kafka JSON fields to graph nodes/rels with no code | Sink: Pattern |
| Consume pre-formatted CUD JSON messages | Sink: CUD |
| Stream all Neo4j changes to Kafka (real-time) | Source: CDC (Neo4j 5.13+ EE/Aura BC/VDC) |
| Stream specific query results on a schedule | Source: Query |
| Consume CDC events in-process, no Kafka | Native CDC API (db.cdc.query) |
{
"neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
"neo4j.database": "neo4j"
}Authentication types: BASIC | BEARER | KERBEROS | CUSTOM | NONE
Never hardcode passwords — use Kafka Connect secrets provider (${file:...} or ${env:...}).
Connector auto-prepends UNWIND $events AS __value — write query using __value:
{
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "person-creates,person-updates",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.cypher.topic.person-creates":
"MERGE (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.topic.person-updates":
"MATCH (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-key-as": "__key",
"neo4j.cypher.bind-header-as": "__header"
}MERGE pattern — idempotent upsert:
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH SET p.updatedAt = datetime(), p += __value.propertiesNo Cypher needed — map message fields to graph via pattern syntax:
{
"neo4j.pattern.topic.users": "(:User{!userId, name, email})",
"neo4j.pattern.topic.friendships":
"(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}Pattern rules:
!prop = key property (used for MERGE)prop: field.path = map from nested message field* = map all message fields-prop = exclude property (cannot mix with inclusions){
"neo4j.cdc.schema.topics": "neo4j-cdc-events"
}Or with source-id tracking (stores elementId as property):
{
"neo4j.cdc.source-id.topics": "neo4j-cdc-events",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}Requires: connector ≥ 5.3.0, Kafka broker EOS support, and a NODE KEY constraint.
Step 1 — Create constraint:
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;Step 2 — Add to connector config:
{
"neo4j.eos-offset-label": "__KafkaOffset"
}Without EOS: connector provides at-least-once — write idempotent Cypher (MERGE, not CREATE).
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "neo4j-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3"
}errors.tolerance=none (default) — stops on first error. Use all + DLQ for production.
{
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "1s",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
"neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
"neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}neo4j.start-from options: NOW | EARLIEST | a specific cursor string
Multiple patterns per topic — indexed 0, 1, 2...:
{
"neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}Cursor warning: after DB restore from backup, CDC cursors are invalidated. Reconfigure neo4j.start-from.
{
"neo4j.source-strategy": "QUERY",
"neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
"neo4j.query.streaming-property": "updatedAt",
"neo4j.query.topic": "person-changes",
"neo4j.query.polling-interval": "5s",
"neo4j.query.polling-duration": "10s"
}$lastCheck is auto-injected by connector. neo4j.query.streaming-property must be returned by the query and should be indexed.
Requires: Neo4j 5.13+ Enterprise, AuraDB BC, or AuraDB VDC.
Enable CDC first (self-managed — set in neo4j.conf):
db.cdc.enabled=trueOn Aura: enabled by default on eligible tiers.
// Get cursor for "right now" — start tracking from this point forward
CALL db.cdc.current() YIELD id RETURN id AS cursor;
// Get earliest available cursor (replay from history start)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;Cursors are exclusive: db.cdc.current() does NOT include the transaction it points to.
// All changes since cursor
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;Filtered — nodes with label Person, CREATE only:
CALL db.cdc.query($cursor, [
{select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;Filtered — specific relationship type with property change tracking:
CALL db.cdc.query($cursor, [
{select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;| Field | Values | Applies to |
|---|---|---|
select | 'e' (all), 'n' (nodes), 'r' (rels) | both |
operation | 'c' (create), 'u' (update), 'd' (delete) | both |
labels | ['Label1','Label2'] (node must have ALL) | nodes |
type | 'REL_TYPE' | relationships |
elementId | specific element ID string | both |
key | {propName: value} (requires key constraint) | both |
changesTo | ['prop1','prop2'] (AND — all must change) | both |
authenticatedUser | username string | both |
executingUser | username string | both |
txMetadata | {key: value} | both |
{
id: STRING, // cursor for this event (use as next $cursor)
txId: INTEGER, // transaction ID
seq: INTEGER, // ordering within transaction
metadata: {
executingUser: STRING,
authenticatedUser: STRING,
captureMode: STRING, // "DIFF" or "FULL"
txStartTime: DATETIME,
txCommitTime: DATETIME,
txMetadata: MAP
},
event: {
elementId: STRING,
eventType: STRING, // "n" or "r"
operation: STRING, // "c", "u", "d"
labels: [STRING], // nodes only
type: STRING, // relationships only
keys: MAP,
state: {
before: { properties: MAP }, // null on CREATE
after: { properties: MAP } // null on DELETE
}
}
}from neo4j import GraphDatabase
driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))
def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
records, _, _ = driver.execute_query(
"CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
"RETURN id, txId, seq, event ORDER BY txId, seq",
cursor=cursor, selectors=selectors,
database_="neo4j"
)
events = [r.data() for r in records]
# Advance cursor to last event id; keep current if no events
next_cursor = events[-1]["id"] if events else cursor
return events, next_cursor
# Bootstrap
with driver.session(database="neo4j") as s:
cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time
while True:
events, cursor = poll_changes(cursor, selectors)
for e in events:
print(e["event"]["operation"], e["event"]["elementId"])
time.sleep(1)Confluent Cloud hosts the Neo4j Sink connector as a fully managed service (no JAR upload needed).
Config differences vs self-managed:
connector.class field — selected in UI/APIRequired Confluent Cloud fields:
{
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "...",
"kafka.api.secret": "...",
"input.data.format": "JSON",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "..."
}One strategy per topic — cannot mix Cypher and Pattern on same topic.
Source connector always generates messages with schema support — must configure converters:
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://your-schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://your-schema-registry"
}For JSON Schema:
{
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://..."
}Sink converter must match source — Avro sink cannot consume JSON schema source messages.
| Error | Cause | Fix |
|---|---|---|
CDC is not enabled | db.cdc.enabled not set / wrong tier | Enable in neo4j.conf or upgrade to EE/BC/VDC |
Invalid cursor after DB restore | Backup invalidates cursors | Reset neo4j.start-from to NOW or EARLIEST |
Cannot merge node using null | Key property missing in message | Validate message schema; add null check in Cypher |
| Messages replayed after restart | No EOS configured | Add neo4j.eos-offset-label + NODE KEY constraint |
| Connector stops on bad message | errors.tolerance=none (default) | Set errors.tolerance=all + DLQ topic |
SchemaException on sink | Converter mismatch source/sink | Match key/value converters on both ends |
Empty events from db.cdc.query | Cursor points to current | Use db.cdc.earliest() to replay; wait for new txns |
neo4j.eos-offset-labelerrors.tolerance=all + DLQ configured for production sinkneo4j.query.streaming-property indexedneo4j.start-from)66ed0e1
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.