The official Neo4j driver for JavaScript applications, enabling connection to and interaction with Neo4j graph databases.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
RxJS-based reactive API for streaming query results and handling large datasets with backpressure support.
Reactive sessions using RxJS observables for non-blocking, streaming query execution.
interface RxSession {
/**
* Run a Cypher query and return reactive result
* @param query - The Cypher query string
* @param parameters - Query parameters as key-value pairs
* @returns RxResult for streaming records
*/
run(query: string, parameters?: Parameters): RxResult;
/**
* Begin a reactive transaction
* @param config - Optional transaction configuration
* @returns Observable of RxTransaction
*/
beginTransaction(config?: TransactionConfig): Observable<RxTransaction>;
/**
* Execute a read transaction reactively with retry logic
* @param work - Function returning observable of results
* @returns Observable of work function results
*/
executeRead<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
/**
* Execute a write transaction reactively with retry logic
* @param work - Function returning observable of results
* @returns Observable of work function results
*/
executeWrite<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
/** Close the reactive session */
close(): Observable<void>;
}Usage Examples:
import { driver, auth } from "neo4j-driver";
import { map, take, toArray } from "rxjs/operators";
const neo4jDriver = driver("neo4j://localhost:7687", auth.basic("neo4j", "password"));
const rxSession = neo4jDriver.rxSession();
// Basic reactive query
rxSession.run("MATCH (p:Person) RETURN p.name AS name LIMIT 10")
.records()
.pipe(
map(record => record.get("name")),
take(5)
)
.subscribe({
next: name => console.log(`Person: ${name}`),
complete: () => console.log("Query completed"),
error: err => console.error("Error:", err)
});
// Reactive transaction
rxSession.executeRead(tx =>
tx.run("MATCH (p:Person) WHERE p.age > $age RETURN p", { age: 25 })
.records()
.pipe(
map(record => record.get("p").properties),
toArray()
)
).subscribe({
next: people => console.log(`Found ${people.length} people`),
error: err => console.error("Transaction failed:", err)
});
// Close session
rxSession.close().subscribe(() => console.log("Session closed"));Streaming result interface for processing large datasets efficiently.
interface RxResult {
/**
* Get result column keys
* @returns Observable emitting array of column names
*/
keys(): Observable<string[]>;
/**
* Stream result records
* @returns Observable emitting individual records
*/
records(): Observable<Record>;
/**
* Get result summary after consuming all records
* @returns Observable emitting result summary
*/
consume(): Observable<ResultSummary>;
}Usage Examples:
// Stream large result set
const rxResult = rxSession.run(`
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
RETURN p.name AS person, c.name AS company
`);
// Process records as they arrive
rxResult.records()
.pipe(
map(record => ({
person: record.get("person"),
company: record.get("company")
})),
// Process in batches of 100
bufferCount(100)
)
.subscribe({
next: batch => {
console.log(`Processing batch of ${batch.length} records`);
// Process batch...
},
complete: () => {
// Get summary after all records processed
rxResult.consume().subscribe(summary => {
console.log(`Processed ${summary.counters.nodesReturned} records`);
});
}
});
// Get keys first, then stream records
rxResult.keys().subscribe(keys => {
console.log(`Result columns: ${keys.join(", ")}`);
rxResult.records()
.pipe(take(10))
.subscribe(record => {
keys.forEach(key => {
console.log(`${key}: ${record.get(key)}`);
});
});
});Explicit reactive transaction management with observable-based control flow.
interface RxTransaction {
/**
* Run a query within the reactive transaction
* @param query - The Cypher query string
* @param parameters - Query parameters as key-value pairs
* @returns RxResult for streaming records
*/
run(query: string, parameters?: Parameters): RxResult;
/** Commit the reactive transaction */
commit(): Observable<void>;
/** Rollback the reactive transaction */
rollback(): Observable<void>;
/** Check if transaction is open */
isOpen(): boolean;
}Usage Examples:
import { mergeMap, catchError, EMPTY } from "rxjs";
// Complex reactive transaction
rxSession.beginTransaction()
.pipe(
mergeMap(tx => {
// Execute multiple queries in sequence
const createUser = tx.run(`
CREATE (u:User {id: randomUUID(), email: $email})
RETURN u.id AS userId
`, { email: "user@example.com" });
return createUser.records().pipe(
mergeMap(record => {
const userId = record.get("userId");
// Create user profile
return tx.run(`
MATCH (u:User {id: $userId})
CREATE (u)-[:HAS_PROFILE]->(p:Profile {name: $name})
`, { userId, name: "John Doe" }).consume();
}),
mergeMap(() => tx.commit()),
catchError(error => {
console.error("Transaction failed:", error);
return tx.rollback();
})
);
})
)
.subscribe({
next: () => console.log("User created successfully"),
error: err => console.error("Failed to create user:", err)
});Managed reactive transactions with automatic retry logic.
interface RxManagedTransaction {
/**
* Run a query within the managed reactive transaction
* @param query - The Cypher query string
* @param parameters - Query parameters as key-value pairs
* @returns RxResult for streaming records
*/
run(query: string, parameters?: Parameters): RxResult;
}Usage Examples:
import { mergeMap, map, toArray, retry } from "rxjs/operators";
// Reactive read transaction with retry
rxSession.executeRead(tx =>
tx.run(`
MATCH (p:Product)
WHERE p.category = $category
RETURN p
ORDER BY p.price DESC
`, { category: "electronics" })
.records()
.pipe(
map(record => record.get("p").properties),
toArray()
)
).pipe(
retry(3) // Additional retry on top of built-in retry
).subscribe({
next: products => {
console.log(`Found ${products.length} electronics`);
products.forEach(product => {
console.log(`${product.name}: $${product.price}`);
});
},
error: err => console.error("Failed to fetch products:", err)
});
// Reactive write transaction
rxSession.executeWrite(tx => {
const orders = [
{ customerId: "cust-1", total: 99.99 },
{ customerId: "cust-2", total: 149.50 }
];
return from(orders).pipe(
mergeMap(order =>
tx.run(`
MATCH (c:Customer {id: $customerId})
CREATE (c)-[:PLACED]->(o:Order {
id: randomUUID(),
total: $total,
createdAt: datetime()
})
RETURN o.id AS orderId
`, order).records()
),
map(record => record.get("orderId")),
toArray()
);
}).subscribe({
next: orderIds => {
console.log(`Created orders: ${orderIds.join(", ")}`);
},
error: err => console.error("Failed to create orders:", err)
});Reactive programming provides natural backpressure handling for large datasets.
Usage Examples:
import { concatMap, delay, bufferTime } from "rxjs/operators";
// Process large dataset with controlled throughput
rxSession.run("MATCH (n) RETURN n") // Large dataset
.records()
.pipe(
// Process records with delay to control rate
concatMap(record =>
of(record).pipe(delay(10)) // 10ms delay between records
),
// Buffer records by time window
bufferTime(1000), // Collect records for 1 second
// Process each batch
mergeMap(batch => {
console.log(`Processing batch of ${batch.length} records`);
return of(batch); // Process batch
})
)
.subscribe({
next: batch => {
// Handle processed batch
},
complete: () => console.log("Large dataset processing completed")
});
// Memory-efficient streaming with window operations
rxSession.run("MATCH (p:Person) RETURN p ORDER BY p.createdAt")
.records()
.pipe(
// Process in sliding window of 1000 records
windowCount(1000),
mergeMap(window =>
window.pipe(
map(record => record.get("p").properties),
toArray(),
// Process each window
tap(batch => {
console.log(`Processing window of ${batch.length} people`);
// Perform batch processing...
})
)
)
)
.subscribe({
complete: () => console.log("Windowed processing completed")
});Comprehensive error handling patterns for reactive Neo4j operations.
import { catchError, retry, retryWhen, delay, scan, throwError } from "rxjs/operators";
// Retry with exponential backoff
rxSession.executeRead(tx =>
tx.run("MATCH (n:Node) RETURN count(n) AS count")
.records()
).pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) {
throw error;
}
console.log(`Retry attempt ${retryCount + 1}`);
return retryCount + 1;
}, 0),
delay(1000) // 1 second delay between retries
)
),
catchError(error => {
console.error("Final error after retries:", error);
return throwError(() => error);
})
).subscribe({
next: record => console.log(`Count: ${record.get("count")}`),
error: err => console.error("Query failed:", err)
});
// Graceful degradation
rxSession.run("MATCH (p:Person) RETURN p")
.records()
.pipe(
catchError(error => {
console.warn("Primary query failed, using fallback");
// Fallback to simpler query
return rxSession.run("RETURN 'No data available' AS message")
.records();
})
)
.subscribe({
next: record => console.log(record.get(0)),
error: err => console.error("Fallback also failed:", err)
});