0
# Reactive Programming
1
2
RxJS-based reactive API for streaming query results and handling large datasets with backpressure support.
3
4
## Capabilities
5
6
### Reactive Sessions
7
8
Reactive sessions using RxJS observables for non-blocking, streaming query execution.
9
10
```typescript { .api }
11
interface RxSession {
12
/**
13
* Run a Cypher query and return reactive result
14
* @param query - The Cypher query string
15
* @param parameters - Query parameters as key-value pairs
16
* @returns RxResult for streaming records
17
*/
18
run(query: string, parameters?: Parameters): RxResult;
19
20
/**
21
* Begin a reactive transaction
22
* @param config - Optional transaction configuration
23
* @returns Observable of RxTransaction
24
*/
25
beginTransaction(config?: TransactionConfig): Observable<RxTransaction>;
26
27
/**
28
* Execute a read transaction reactively with retry logic
29
* @param work - Function returning observable of results
30
* @returns Observable of work function results
31
*/
32
executeRead<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
33
34
/**
35
* Execute a write transaction reactively with retry logic
36
* @param work - Function returning observable of results
37
* @returns Observable of work function results
38
*/
39
executeWrite<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;
40
41
/** Close the reactive session */
42
close(): Observable<void>;
43
}
44
```
45
46
**Usage Examples:**
47
48
```typescript
49
import { driver, auth } from "neo4j-driver";
50
import { map, take, toArray } from "rxjs/operators";
51
52
const neo4jDriver = driver("neo4j://localhost:7687", auth.basic("neo4j", "password"));
53
const rxSession = neo4jDriver.rxSession();
54
55
// Basic reactive query
56
rxSession.run("MATCH (p:Person) RETURN p.name AS name LIMIT 10")
57
.records()
58
.pipe(
59
map(record => record.get("name")),
60
take(5)
61
)
62
.subscribe({
63
next: name => console.log(`Person: ${name}`),
64
complete: () => console.log("Query completed"),
65
error: err => console.error("Error:", err)
66
});
67
68
// Reactive transaction
69
rxSession.executeRead(tx =>
70
tx.run("MATCH (p:Person) WHERE p.age > $age RETURN p", { age: 25 })
71
.records()
72
.pipe(
73
map(record => record.get("p").properties),
74
toArray()
75
)
76
).subscribe({
77
next: people => console.log(`Found ${people.length} people`),
78
error: err => console.error("Transaction failed:", err)
79
});
80
81
// Close session
82
rxSession.close().subscribe(() => console.log("Session closed"));
83
```
84
85
### Reactive Results
86
87
Streaming result interface for processing large datasets efficiently.
88
89
```typescript { .api }
90
interface RxResult {
91
/**
92
* Get result column keys
93
* @returns Observable emitting array of column names
94
*/
95
keys(): Observable<string[]>;
96
97
/**
98
* Stream result records
99
* @returns Observable emitting individual records
100
*/
101
records(): Observable<Record>;
102
103
/**
104
* Get result summary after consuming all records
105
* @returns Observable emitting result summary
106
*/
107
consume(): Observable<ResultSummary>;
108
}
109
```
110
111
**Usage Examples:**
112
113
```typescript
114
// Stream large result set
115
const rxResult = rxSession.run(`
116
MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
117
RETURN p.name AS person, c.name AS company
118
`);
119
120
// Process records as they arrive
121
rxResult.records()
122
.pipe(
123
map(record => ({
124
person: record.get("person"),
125
company: record.get("company")
126
})),
127
// Process in batches of 100
128
bufferCount(100)
129
)
130
.subscribe({
131
next: batch => {
132
console.log(`Processing batch of ${batch.length} records`);
133
// Process batch...
134
},
135
complete: () => {
136
// Get summary after all records processed
137
rxResult.consume().subscribe(summary => {
138
console.log(`Processed ${summary.counters.nodesReturned} records`);
139
});
140
}
141
});
142
143
// Get keys first, then stream records
144
rxResult.keys().subscribe(keys => {
145
console.log(`Result columns: ${keys.join(", ")}`);
146
147
rxResult.records()
148
.pipe(take(10))
149
.subscribe(record => {
150
keys.forEach(key => {
151
console.log(`${key}: ${record.get(key)}`);
152
});
153
});
154
});
155
```
156
157
### Reactive Transactions
158
159
Explicit reactive transaction management with observable-based control flow.
160
161
```typescript { .api }
162
interface RxTransaction {
163
/**
164
* Run a query within the reactive transaction
165
* @param query - The Cypher query string
166
* @param parameters - Query parameters as key-value pairs
167
* @returns RxResult for streaming records
168
*/
169
run(query: string, parameters?: Parameters): RxResult;
170
171
/** Commit the reactive transaction */
172
commit(): Observable<void>;
173
174
/** Rollback the reactive transaction */
175
rollback(): Observable<void>;
176
177
/** Check if transaction is open */
178
isOpen(): boolean;
179
}
180
```
181
182
**Usage Examples:**
183
184
```typescript
185
import { mergeMap, catchError, EMPTY } from "rxjs";
186
187
// Complex reactive transaction
188
rxSession.beginTransaction()
189
.pipe(
190
mergeMap(tx => {
191
// Execute multiple queries in sequence
192
const createUser = tx.run(`
193
CREATE (u:User {id: randomUUID(), email: $email})
194
RETURN u.id AS userId
195
`, { email: "user@example.com" });
196
197
return createUser.records().pipe(
198
mergeMap(record => {
199
const userId = record.get("userId");
200
201
// Create user profile
202
return tx.run(`
203
MATCH (u:User {id: $userId})
204
CREATE (u)-[:HAS_PROFILE]->(p:Profile {name: $name})
205
`, { userId, name: "John Doe" }).consume();
206
}),
207
mergeMap(() => tx.commit()),
208
catchError(error => {
209
console.error("Transaction failed:", error);
210
return tx.rollback();
211
})
212
);
213
})
214
)
215
.subscribe({
216
next: () => console.log("User created successfully"),
217
error: err => console.error("Failed to create user:", err)
218
});
219
```
220
221
### Reactive Managed Transactions
222
223
Managed reactive transactions with automatic retry logic.
224
225
```typescript { .api }
226
interface RxManagedTransaction {
227
/**
228
* Run a query within the managed reactive transaction
229
* @param query - The Cypher query string
230
* @param parameters - Query parameters as key-value pairs
231
* @returns RxResult for streaming records
232
*/
233
run(query: string, parameters?: Parameters): RxResult;
234
}
235
```
236
237
**Usage Examples:**
238
239
```typescript
240
import { mergeMap, map, toArray, retry } from "rxjs/operators";
241
242
// Reactive read transaction with retry
243
rxSession.executeRead(tx =>
244
tx.run(`
245
MATCH (p:Product)
246
WHERE p.category = $category
247
RETURN p
248
ORDER BY p.price DESC
249
`, { category: "electronics" })
250
.records()
251
.pipe(
252
map(record => record.get("p").properties),
253
toArray()
254
)
255
).pipe(
256
retry(3) // Additional retry on top of built-in retry
257
).subscribe({
258
next: products => {
259
console.log(`Found ${products.length} electronics`);
260
products.forEach(product => {
261
console.log(`${product.name}: $${product.price}`);
262
});
263
},
264
error: err => console.error("Failed to fetch products:", err)
265
});
266
267
// Reactive write transaction
268
rxSession.executeWrite(tx => {
269
const orders = [
270
{ customerId: "cust-1", total: 99.99 },
271
{ customerId: "cust-2", total: 149.50 }
272
];
273
274
return from(orders).pipe(
275
mergeMap(order =>
276
tx.run(`
277
MATCH (c:Customer {id: $customerId})
278
CREATE (c)-[:PLACED]->(o:Order {
279
id: randomUUID(),
280
total: $total,
281
createdAt: datetime()
282
})
283
RETURN o.id AS orderId
284
`, order).records()
285
),
286
map(record => record.get("orderId")),
287
toArray()
288
);
289
}).subscribe({
290
next: orderIds => {
291
console.log(`Created orders: ${orderIds.join(", ")}`);
292
},
293
error: err => console.error("Failed to create orders:", err)
294
});
295
```
296
297
### Backpressure and Flow Control
298
299
Reactive programming provides natural backpressure handling for large datasets.
300
301
**Usage Examples:**
302
303
```typescript
304
import { concatMap, delay, bufferTime } from "rxjs/operators";
305
306
// Process large dataset with controlled throughput
307
rxSession.run("MATCH (n) RETURN n") // Large dataset
308
.records()
309
.pipe(
310
// Process records with delay to control rate
311
concatMap(record =>
312
of(record).pipe(delay(10)) // 10ms delay between records
313
),
314
// Buffer records by time window
315
bufferTime(1000), // Collect records for 1 second
316
// Process each batch
317
mergeMap(batch => {
318
console.log(`Processing batch of ${batch.length} records`);
319
return of(batch); // Process batch
320
})
321
)
322
.subscribe({
323
next: batch => {
324
// Handle processed batch
325
},
326
complete: () => console.log("Large dataset processing completed")
327
});
328
329
// Memory-efficient streaming with window operations
330
rxSession.run("MATCH (p:Person) RETURN p ORDER BY p.createdAt")
331
.records()
332
.pipe(
333
// Process in sliding window of 1000 records
334
windowCount(1000),
335
mergeMap(window =>
336
window.pipe(
337
map(record => record.get("p").properties),
338
toArray(),
339
// Process each window
340
tap(batch => {
341
console.log(`Processing window of ${batch.length} people`);
342
// Perform batch processing...
343
})
344
)
345
)
346
)
347
.subscribe({
348
complete: () => console.log("Windowed processing completed")
349
});
350
```
351
352
### Error Handling in Reactive Streams
353
354
Comprehensive error handling patterns for reactive Neo4j operations.
355
356
```typescript
357
import { catchError, retry, retryWhen, delay, scan, throwError } from "rxjs/operators";
358
359
// Retry with exponential backoff
360
rxSession.executeRead(tx =>
361
tx.run("MATCH (n:Node) RETURN count(n) AS count")
362
.records()
363
).pipe(
364
retryWhen(errors =>
365
errors.pipe(
366
scan((retryCount, error) => {
367
if (retryCount >= 3) {
368
throw error;
369
}
370
console.log(`Retry attempt ${retryCount + 1}`);
371
return retryCount + 1;
372
}, 0),
373
delay(1000) // 1 second delay between retries
374
)
375
),
376
catchError(error => {
377
console.error("Final error after retries:", error);
378
return throwError(() => error);
379
})
380
).subscribe({
381
next: record => console.log(`Count: ${record.get("count")}`),
382
error: err => console.error("Query failed:", err)
383
});
384
385
// Graceful degradation
386
rxSession.run("MATCH (p:Person) RETURN p")
387
.records()
388
.pipe(
389
catchError(error => {
390
console.warn("Primary query failed, using fallback");
391
// Fallback to simpler query
392
return rxSession.run("RETURN 'No data available' AS message")
393
.records();
394
})
395
)
396
.subscribe({
397
next: record => console.log(record.get(0)),
398
error: err => console.error("Fallback also failed:", err)
399
});
400
```