or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mddriver-management.mderror-handling.mdgraph-types.mdindex.mdreactive-programming.mdsession-operations.mdtemporal-types.mdtransaction-management.md

reactive-programming.mddocs/

0

# Reactive Programming

1

2

RxJS-based reactive API for streaming query results and handling large datasets with backpressure support.

3

4

## Capabilities

5

6

### Reactive Sessions

7

8

Reactive sessions using RxJS observables for non-blocking, streaming query execution.

9

10

```typescript { .api }

11

interface RxSession {

12

/**

13

* Run a Cypher query and return reactive result

14

* @param query - The Cypher query string

15

* @param parameters - Query parameters as key-value pairs

16

* @returns RxResult for streaming records

17

*/

18

run(query: string, parameters?: Parameters): RxResult;

19

20

/**

21

* Begin a reactive transaction

22

* @param config - Optional transaction configuration

23

* @returns Observable of RxTransaction

24

*/

25

beginTransaction(config?: TransactionConfig): Observable<RxTransaction>;

26

27

/**

28

* Execute a read transaction reactively with retry logic

29

* @param work - Function returning observable of results

30

* @returns Observable of work function results

31

*/

32

executeRead<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;

33

34

/**

35

* Execute a write transaction reactively with retry logic

36

* @param work - Function returning observable of results

37

* @returns Observable of work function results

38

*/

39

executeWrite<T>(work: (tx: RxManagedTransaction) => Observable<T>): Observable<T>;

40

41

/** Close the reactive session */

42

close(): Observable<void>;

43

}

44

```

45

46

**Usage Examples:**

47

48

```typescript

49

import { driver, auth } from "neo4j-driver";

50

import { map, take, toArray } from "rxjs/operators";

51

52

const neo4jDriver = driver("neo4j://localhost:7687", auth.basic("neo4j", "password"));

53

const rxSession = neo4jDriver.rxSession();

54

55

// Basic reactive query

56

rxSession.run("MATCH (p:Person) RETURN p.name AS name LIMIT 10")

57

.records()

58

.pipe(

59

map(record => record.get("name")),

60

take(5)

61

)

62

.subscribe({

63

next: name => console.log(`Person: ${name}`),

64

complete: () => console.log("Query completed"),

65

error: err => console.error("Error:", err)

66

});

67

68

// Reactive transaction

69

rxSession.executeRead(tx =>

70

tx.run("MATCH (p:Person) WHERE p.age > $age RETURN p", { age: 25 })

71

.records()

72

.pipe(

73

map(record => record.get("p").properties),

74

toArray()

75

)

76

).subscribe({

77

next: people => console.log(`Found ${people.length} people`),

78

error: err => console.error("Transaction failed:", err)

79

});

80

81

// Close session

82

rxSession.close().subscribe(() => console.log("Session closed"));

83

```

84

85

### Reactive Results

86

87

Streaming result interface for processing large datasets efficiently.

88

89

```typescript { .api }

90

interface RxResult {

91

/**

92

* Get result column keys

93

* @returns Observable emitting array of column names

94

*/

95

keys(): Observable<string[]>;

96

97

/**

98

* Stream result records

99

* @returns Observable emitting individual records

100

*/

101

records(): Observable<Record>;

102

103

/**

104

* Get result summary after consuming all records

105

* @returns Observable emitting result summary

106

*/

107

consume(): Observable<ResultSummary>;

108

}

109

```

110

111

**Usage Examples:**

112

113

```typescript

114

// Stream large result set

115

const rxResult = rxSession.run(`

116

MATCH (p:Person)-[:WORKS_FOR]->(c:Company)

117

RETURN p.name AS person, c.name AS company

118

`);

119

120

// Process records as they arrive

121

rxResult.records()

122

.pipe(

123

map(record => ({

124

person: record.get("person"),

125

company: record.get("company")

126

})),

127

// Process in batches of 100

128

bufferCount(100)

129

)

130

.subscribe({

131

next: batch => {

132

console.log(`Processing batch of ${batch.length} records`);

133

// Process batch...

134

},

135

complete: () => {

136

// Get summary after all records processed

137

rxResult.consume().subscribe(summary => {

138

console.log(`Processed ${summary.counters.nodesReturned} records`);

139

});

140

}

141

});

142

143

// Get keys first, then stream records

144

rxResult.keys().subscribe(keys => {

145

console.log(`Result columns: ${keys.join(", ")}`);

146

147

rxResult.records()

148

.pipe(take(10))

149

.subscribe(record => {

150

keys.forEach(key => {

151

console.log(`${key}: ${record.get(key)}`);

152

});

153

});

154

});

155

```

156

157

### Reactive Transactions

158

159

Explicit reactive transaction management with observable-based control flow.

160

161

```typescript { .api }

162

interface RxTransaction {

163

/**

164

* Run a query within the reactive transaction

165

* @param query - The Cypher query string

166

* @param parameters - Query parameters as key-value pairs

167

* @returns RxResult for streaming records

168

*/

169

run(query: string, parameters?: Parameters): RxResult;

170

171

/** Commit the reactive transaction */

172

commit(): Observable<void>;

173

174

/** Rollback the reactive transaction */

175

rollback(): Observable<void>;

176

177

/** Check if transaction is open */

178

isOpen(): boolean;

179

}

180

```

181

182

**Usage Examples:**

183

184

```typescript

185

import { mergeMap, catchError, EMPTY } from "rxjs";

186

187

// Complex reactive transaction

188

rxSession.beginTransaction()

189

.pipe(

190

mergeMap(tx => {

191

// Execute multiple queries in sequence

192

const createUser = tx.run(`

193

CREATE (u:User {id: randomUUID(), email: $email})

194

RETURN u.id AS userId

195

`, { email: "user@example.com" });

196

197

return createUser.records().pipe(

198

mergeMap(record => {

199

const userId = record.get("userId");

200

201

// Create user profile

202

return tx.run(`

203

MATCH (u:User {id: $userId})

204

CREATE (u)-[:HAS_PROFILE]->(p:Profile {name: $name})

205

`, { userId, name: "John Doe" }).consume();

206

}),

207

mergeMap(() => tx.commit()),

208

catchError(error => {

209

console.error("Transaction failed:", error);

210

return tx.rollback();

211

})

212

);

213

})

214

)

215

.subscribe({

216

next: () => console.log("User created successfully"),

217

error: err => console.error("Failed to create user:", err)

218

});

219

```

220

221

### Reactive Managed Transactions

222

223

Managed reactive transactions with automatic retry logic.

224

225

```typescript { .api }

226

interface RxManagedTransaction {

227

/**

228

* Run a query within the managed reactive transaction

229

* @param query - The Cypher query string

230

* @param parameters - Query parameters as key-value pairs

231

* @returns RxResult for streaming records

232

*/

233

run(query: string, parameters?: Parameters): RxResult;

234

}

235

```

236

237

**Usage Examples:**

238

239

```typescript

240

import { mergeMap, map, toArray, retry } from "rxjs/operators";

241

242

// Reactive read transaction with retry

243

rxSession.executeRead(tx =>

244

tx.run(`

245

MATCH (p:Product)

246

WHERE p.category = $category

247

RETURN p

248

ORDER BY p.price DESC

249

`, { category: "electronics" })

250

.records()

251

.pipe(

252

map(record => record.get("p").properties),

253

toArray()

254

)

255

).pipe(

256

retry(3) // Additional retry on top of built-in retry

257

).subscribe({

258

next: products => {

259

console.log(`Found ${products.length} electronics`);

260

products.forEach(product => {

261

console.log(`${product.name}: $${product.price}`);

262

});

263

},

264

error: err => console.error("Failed to fetch products:", err)

265

});

266

267

// Reactive write transaction

268

rxSession.executeWrite(tx => {

269

const orders = [

270

{ customerId: "cust-1", total: 99.99 },

271

{ customerId: "cust-2", total: 149.50 }

272

];

273

274

return from(orders).pipe(

275

mergeMap(order =>

276

tx.run(`

277

MATCH (c:Customer {id: $customerId})

278

CREATE (c)-[:PLACED]->(o:Order {

279

id: randomUUID(),

280

total: $total,

281

createdAt: datetime()

282

})

283

RETURN o.id AS orderId

284

`, order).records()

285

),

286

map(record => record.get("orderId")),

287

toArray()

288

);

289

}).subscribe({

290

next: orderIds => {

291

console.log(`Created orders: ${orderIds.join(", ")}`);

292

},

293

error: err => console.error("Failed to create orders:", err)

294

});

295

```

296

297

### Backpressure and Flow Control

298

299

Reactive programming provides natural backpressure handling for large datasets.

300

301

**Usage Examples:**

302

303

```typescript

304

import { concatMap, delay, bufferTime } from "rxjs/operators";

305

306

// Process large dataset with controlled throughput

307

rxSession.run("MATCH (n) RETURN n") // Large dataset

308

.records()

309

.pipe(

310

// Process records with delay to control rate

311

concatMap(record =>

312

of(record).pipe(delay(10)) // 10ms delay between records

313

),

314

// Buffer records by time window

315

bufferTime(1000), // Collect records for 1 second

316

// Process each batch

317

mergeMap(batch => {

318

console.log(`Processing batch of ${batch.length} records`);

319

return of(batch); // Process batch

320

})

321

)

322

.subscribe({

323

next: batch => {

324

// Handle processed batch

325

},

326

complete: () => console.log("Large dataset processing completed")

327

});

328

329

// Memory-efficient streaming with window operations

330

rxSession.run("MATCH (p:Person) RETURN p ORDER BY p.createdAt")

331

.records()

332

.pipe(

333

// Process in sliding window of 1000 records

334

windowCount(1000),

335

mergeMap(window =>

336

window.pipe(

337

map(record => record.get("p").properties),

338

toArray(),

339

// Process each window

340

tap(batch => {

341

console.log(`Processing window of ${batch.length} people`);

342

// Perform batch processing...

343

})

344

)

345

)

346

)

347

.subscribe({

348

complete: () => console.log("Windowed processing completed")

349

});

350

```

351

352

### Error Handling in Reactive Streams

353

354

Comprehensive error handling patterns for reactive Neo4j operations.

355

356

```typescript

357

import { catchError, retry, retryWhen, delay, scan, throwError } from "rxjs/operators";

358

359

// Retry with exponential backoff

360

rxSession.executeRead(tx =>

361

tx.run("MATCH (n:Node) RETURN count(n) AS count")

362

.records()

363

).pipe(

364

retryWhen(errors =>

365

errors.pipe(

366

scan((retryCount, error) => {

367

if (retryCount >= 3) {

368

throw error;

369

}

370

console.log(`Retry attempt ${retryCount + 1}`);

371

return retryCount + 1;

372

}, 0),

373

delay(1000) // 1 second delay between retries

374

)

375

),

376

catchError(error => {

377

console.error("Final error after retries:", error);

378

return throwError(() => error);

379

})

380

).subscribe({

381

next: record => console.log(`Count: ${record.get("count")}`),

382

error: err => console.error("Query failed:", err)

383

});

384

385

// Graceful degradation

386

rxSession.run("MATCH (p:Person) RETURN p")

387

.records()

388

.pipe(

389

catchError(error => {

390

console.warn("Primary query failed, using fallback");

391

// Fallback to simpler query

392

return rxSession.run("RETURN 'No data available' AS message")

393

.records();

394

})

395

)

396

.subscribe({

397

next: record => console.log(record.get(0)),

398

error: err => console.error("Fallback also failed:", err)

399

});

400

```