CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-neo4j-driver

The official Neo4j driver for JavaScript applications, enabling connection to and interaction with Neo4j graph databases.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

reactive-programming.mddocs/

Reactive Programming

RxJS-based reactive API for streaming query results and handling large datasets with backpressure support.

Capabilities

Reactive Sessions

Reactive sessions using RxJS observables for non-blocking, streaming query execution.

interface RxSession {
  /**
   * Run a Cypher query and return reactive result
   * @param query - The Cypher query string
   * @param parameters - Query parameters as key-value pairs
   * @returns RxResult for streaming records
   */
  run(query: string, parameters?: Parameters): RxResult;
  
  /**
   * Begin a reactive transaction
   * @param config - Optional transaction configuration
   * @returns Observable of RxTransaction
   */
  beginTransaction(config?: TransactionConfig): Observable<RxTransaction>;
  
  /**
   * Execute a read transaction reactively with retry logic
   * @param work - Function returning observable of results
   * @returns Observable of work function results
   */
  executeRead<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
  
  /**
   * Execute a write transaction reactively with retry logic
   * @param work - Function returning observable of results
   * @returns Observable of work function results
   */
  executeWrite<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
  
  /** Close the reactive session */
  close(): Observable<void>;
}

Usage Examples:

import { driver, auth } from "neo4j-driver";
import { map, take, toArray } from "rxjs/operators";

const neo4jDriver = driver("neo4j://localhost:7687", auth.basic("neo4j", "password"));
const rxSession = neo4jDriver.rxSession();

// Basic reactive query
rxSession.run("MATCH (p:Person) RETURN p.name AS name LIMIT 10")
  .records()
  .pipe(
    map(record => record.get("name")),
    take(5)
  )
  .subscribe({
    next: name => console.log(`Person: ${name}`),
    complete: () => console.log("Query completed"),
    error: err => console.error("Error:", err)
  });

// Reactive transaction
rxSession.executeRead(tx => 
  tx.run("MATCH (p:Person) WHERE p.age > $age RETURN p", { age: 25 })
    .records()
    .pipe(
      map(record => record.get("p").properties),
      toArray()
    )
).subscribe({
  next: people => console.log(`Found ${people.length} people`),
  error: err => console.error("Transaction failed:", err)
});

// Close session
rxSession.close().subscribe(() => console.log("Session closed"));

Reactive Results

Streaming result interface for processing large datasets efficiently.

interface RxResult {
  /**
   * Get result column keys
   * @returns Observable emitting array of column names
   */
  keys(): Observable<string[]>;
  
  /**
   * Stream result records
   * @returns Observable emitting individual records
   */
  records(): Observable<Record>;
  
  /**
   * Get result summary after consuming all records
   * @returns Observable emitting result summary
   */
  consume(): Observable<ResultSummary>;
}

Usage Examples:

// Stream large result set
const rxResult = rxSession.run(`
  MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
  RETURN p.name AS person, c.name AS company
`);

// Process records as they arrive
rxResult.records()
  .pipe(
    map(record => ({
      person: record.get("person"),
      company: record.get("company")
    })),
    // Process in batches of 100
    bufferCount(100)
  )
  .subscribe({
    next: batch => {
      console.log(`Processing batch of ${batch.length} records`);
      // Process batch...
    },
    complete: () => {
      // Get summary after all records processed
      rxResult.consume().subscribe(summary => {
        console.log(`Processed ${summary.counters.nodesReturned} records`);
      });
    }
  });

// Get keys first, then stream records
rxResult.keys().subscribe(keys => {
  console.log(`Result columns: ${keys.join(", ")}`);
  
  rxResult.records()
    .pipe(take(10))
    .subscribe(record => {
      keys.forEach(key => {
        console.log(`${key}: ${record.get(key)}`);
      });
    });
});

Reactive Transactions

Explicit reactive transaction management with observable-based control flow.

interface RxTransaction {
  /**
   * Run a query within the reactive transaction
   * @param query - The Cypher query string
   * @param parameters - Query parameters as key-value pairs
   * @returns RxResult for streaming records
   */
  run(query: string, parameters?: Parameters): RxResult;
  
  /** Commit the reactive transaction */
  commit(): Observable<void>;
  
  /** Rollback the reactive transaction */
  rollback(): Observable<void>;
  
  /** Check if transaction is open */
  isOpen(): boolean;
}

Usage Examples:

import { mergeMap, catchError, EMPTY } from "rxjs";

// Complex reactive transaction
rxSession.beginTransaction()
  .pipe(
    mergeMap(tx => {
      // Execute multiple queries in sequence
      const createUser = tx.run(`
        CREATE (u:User {id: randomUUID(), email: $email})
        RETURN u.id AS userId
      `, { email: "user@example.com" });
      
      return createUser.records().pipe(
        mergeMap(record => {
          const userId = record.get("userId");
          
          // Create user profile
          return tx.run(`
            MATCH (u:User {id: $userId})
            CREATE (u)-[:HAS_PROFILE]->(p:Profile {name: $name})
          `, { userId, name: "John Doe" }).consume();
        }),
        mergeMap(() => tx.commit()),
        catchError(error => {
          console.error("Transaction failed:", error);
          return tx.rollback();
        })
      );
    })
  )
  .subscribe({
    next: () => console.log("User created successfully"),
    error: err => console.error("Failed to create user:", err)
  });

Reactive Managed Transactions

Managed reactive transactions with automatic retry logic.

interface RxManagedTransaction {
  /**
   * Run a query within the managed reactive transaction
   * @param query - The Cypher query string
   * @param parameters - Query parameters as key-value pairs
   * @returns RxResult for streaming records
   */
  run(query: string, parameters?: Parameters): RxResult;
}

Usage Examples:

import { mergeMap, map, toArray, retry } from "rxjs/operators";

// Reactive read transaction with retry
rxSession.executeRead(tx => 
  tx.run(`
    MATCH (p:Product)
    WHERE p.category = $category
    RETURN p
    ORDER BY p.price DESC
  `, { category: "electronics" })
    .records()
    .pipe(
      map(record => record.get("p").properties),
      toArray()
    )
).pipe(
  retry(3) // Additional retry on top of built-in retry
).subscribe({
  next: products => {
    console.log(`Found ${products.length} electronics`);
    products.forEach(product => {
      console.log(`${product.name}: $${product.price}`);
    });
  },
  error: err => console.error("Failed to fetch products:", err)
});

// Reactive write transaction
rxSession.executeWrite(tx => {
  const orders = [
    { customerId: "cust-1", total: 99.99 },
    { customerId: "cust-2", total: 149.50 }
  ];
  
  return from(orders).pipe(
    mergeMap(order => 
      tx.run(`
        MATCH (c:Customer {id: $customerId})
        CREATE (c)-[:PLACED]->(o:Order {
          id: randomUUID(),
          total: $total,
          createdAt: datetime()
        })
        RETURN o.id AS orderId
      `, order).records()
    ),
    map(record => record.get("orderId")),
    toArray()
  );
}).subscribe({
  next: orderIds => {
    console.log(`Created orders: ${orderIds.join(", ")}`);
  },
  error: err => console.error("Failed to create orders:", err)
});

Backpressure and Flow Control

Reactive programming provides natural backpressure handling for large datasets.

Usage Examples:

import { concatMap, delay, bufferTime } from "rxjs/operators";

// Process large dataset with controlled throughput
rxSession.run("MATCH (n) RETURN n") // Large dataset
  .records()
  .pipe(
    // Process records with delay to control rate
    concatMap(record => 
      of(record).pipe(delay(10)) // 10ms delay between records
    ),
    // Buffer records by time window
    bufferTime(1000), // Collect records for 1 second
    // Process each batch
    mergeMap(batch => {
      console.log(`Processing batch of ${batch.length} records`);
      return of(batch); // Process batch
    })
  )
  .subscribe({
    next: batch => {
      // Handle processed batch
    },
    complete: () => console.log("Large dataset processing completed")
  });

// Memory-efficient streaming with window operations
rxSession.run("MATCH (p:Person) RETURN p ORDER BY p.createdAt")
  .records()
  .pipe(
    // Process in sliding window of 1000 records
    windowCount(1000),
    mergeMap(window => 
      window.pipe(
        map(record => record.get("p").properties),
        toArray(),
        // Process each window
        tap(batch => {
          console.log(`Processing window of ${batch.length} people`);
          // Perform batch processing...
        })
      )
    )
  )
  .subscribe({
    complete: () => console.log("Windowed processing completed")
  });

Error Handling in Reactive Streams

Comprehensive error handling patterns for reactive Neo4j operations.

import { catchError, retry, retryWhen, delay, scan, throwError } from "rxjs/operators";

// Retry with exponential backoff
rxSession.executeRead(tx => 
  tx.run("MATCH (n:Node) RETURN count(n) AS count")
    .records()
).pipe(
  retryWhen(errors => 
    errors.pipe(
      scan((retryCount, error) => {
        if (retryCount >= 3) {
          throw error;
        }
        console.log(`Retry attempt ${retryCount + 1}`);
        return retryCount + 1;
      }, 0),
      delay(1000) // 1 second delay between retries
    )
  ),
  catchError(error => {
    console.error("Final error after retries:", error);
    return throwError(() => error);
  })
).subscribe({
  next: record => console.log(`Count: ${record.get("count")}`),
  error: err => console.error("Query failed:", err)
});

// Graceful degradation
rxSession.run("MATCH (p:Person) RETURN p")
  .records()
  .pipe(
    catchError(error => {
      console.warn("Primary query failed, using fallback");
      // Fallback to simpler query
      return rxSession.run("RETURN 'No data available' AS message")
        .records();
    })
  )
  .subscribe({
    next: record => console.log(record.get(0)),
    error: err => console.error("Fallback also failed:", err)
  });

docs

authentication.md

driver-management.md

error-handling.md

graph-types.md

index.md

reactive-programming.md

session-operations.md

temporal-types.md

transaction-management.md

tile.json