or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mderrors.mdindex.mdlarge-objects.mdnotifications.mdquery-processing.mdquerying.mdreplication.mdtransactions.mdtypes.md

replication.mddocs/

0

# Replication

1

2

Logical replication functionality for real-time data streaming and change data capture from PostgreSQL databases.

3

4

## Capabilities

5

6

### Subscription Management

7

8

Subscribe to logical replication events to receive real-time data changes from PostgreSQL.

9

10

```javascript { .api }

11

/**

12

* Subscribe to logical replication events

13

* @param event - Event pattern to subscribe to (table.operation or wildcard)

14

* @param callback - Function called for each replication event

15

* @param onsubscribe - Optional callback when subscription is established

16

* @param onerror - Optional error handler

17

* @returns Promise resolving to subscription handle

18

*/

19

subscribe(

20

event: string,

21

callback: (row: Row | null, info: ReplicationEvent) => void,

22

onsubscribe?: () => void,

23

onerror?: (error: Error) => void

24

): Promise<SubscriptionHandle>;

25

```

26

27

**Usage Examples:**

28

29

```javascript

30

// Subscribe to all changes on a specific table

31

const subscription = await sql.subscribe(

32

'users',

33

(row, info) => {

34

console.log(`User ${info.command}:`, row);

35

handleUserChange(row, info);

36

},

37

() => console.log('Subscribed to user changes'),

38

(error) => console.error('Subscription error:', error)

39

);

40

41

// Subscribe to specific operations

42

await sql.subscribe('products.insert', (row, info) => {

43

console.log('New product added:', row);

44

invalidateProductCache();

45

});

46

47

await sql.subscribe('orders.update', (row, info) => {

48

console.log('Order updated:', row);

49

updateOrderDisplay(row);

50

});

51

52

// Subscribe to all events (wildcard)

53

await sql.subscribe('*', (row, info) => {

54

console.log(`${info.relation.schema}.${info.relation.table} ${info.command}`);

55

logDatabaseChange(info);

56

});

57

```

58

59

## Event Patterns

60

61

### Table-Specific Subscriptions

62

63

Subscribe to changes on specific tables with optional operation filtering.

64

65

```javascript { .api }

66

// Pattern formats:

67

// 'table' - All operations on table

68

// 'table.insert' - Only INSERT operations

69

// 'table.update' - Only UPDATE operations

70

// 'table.delete' - Only DELETE operations

71

// 'schema.table' - Fully qualified table name

72

// '*' - All tables and operations

73

```

74

75

**Usage Examples:**

76

77

```javascript

78

// Monitor user registration

79

await sql.subscribe('users.insert', (user, info) => {

80

sendWelcomeEmail(user.email);

81

updateUserCount();

82

});

83

84

// Track order status changes

85

await sql.subscribe('orders.update', (order, info) => {

86

if (info.old && info.old.status !== order.status) {

87

notifyStatusChange(order.id, order.status);

88

}

89

});

90

91

// Monitor product deletions

92

await sql.subscribe('products.delete', (product, info) => {

93

removeFromSearchIndex(product.id);

94

clearProductCache(product.id);

95

});

96

97

// Cross-schema monitoring

98

await sql.subscribe('inventory.stock_levels', (stock, info) => {

99

if (stock.quantity < stock.reorder_point) {

100

createReorderAlert(stock.product_id);

101

}

102

});

103

```

104

105

### Advanced Pattern Matching

106

107

Use sophisticated patterns to filter replication events.

108

109

```javascript

110

// Subscribe with key-based filtering

111

await sql.subscribe('users.update=id', (user, info) => {

112

// Only updates where primary key changed (rare but possible)

113

handleUserIdChange(user, info.old);

114

});

115

116

// Schema-specific subscriptions

117

await sql.subscribe('public.*', (row, info) => {

118

// All changes in public schema

119

auditPublicSchemaChange(row, info);

120

});

121

122

await sql.subscribe('audit.*', (row, info) => {

123

// All changes in audit schema

124

forwardToComplianceSystem(row, info);

125

});

126

```

127

128

## ReplicationEvent Interface

129

130

Comprehensive information about each replication event.

131

132

```javascript { .api }

133

interface ReplicationEvent {

134

/** Type of operation: 'insert', 'update', or 'delete' */

135

command: 'insert' | 'update' | 'delete';

136

137

/** Information about the table */

138

relation: RelationInfo;

139

140

/** Whether this change involves the primary key */

141

key?: boolean;

142

143

/** Previous row data for UPDATE and DELETE operations */

144

old?: Row | null;

145

}

146

147

interface RelationInfo {

148

/** Table OID */

149

oid: number;

150

151

/** Schema name */

152

schema: string;

153

154

/** Table name */

155

table: string;

156

157

/** Column information */

158

columns: ColumnInfo[];

159

}

160

161

interface ColumnInfo {

162

/** Column name */

163

name: string;

164

165

/** PostgreSQL type OID */

166

type: number;

167

168

/** Type modifier */

169

modifier: number;

170

171

/** Whether column is part of replica identity */

172

key: boolean;

173

}

174

```

175

176

**Usage Examples:**

177

178

```javascript

179

// Detailed event processing

180

await sql.subscribe('*', (row, info) => {

181

const { command, relation, key, old } = info;

182

183

console.log(`Operation: ${command}`);

184

console.log(`Table: ${relation.schema}.${relation.table}`);

185

console.log(`Key change: ${key || false}`);

186

187

switch (command) {

188

case 'insert':

189

console.log('New row:', row);

190

break;

191

192

case 'update':

193

console.log('New row:', row);

194

console.log('Old row:', old);

195

196

// Compare specific fields

197

const nameChanged = old?.name !== row?.name;

198

if (nameChanged) {

199

handleNameChange(row, old);

200

}

201

break;

202

203

case 'delete':

204

console.log('Deleted row:', old);

205

break;

206

}

207

});

208

```

209

210

## SubscriptionHandle Interface

211

212

Manage active replication subscriptions.

213

214

```javascript { .api }

215

interface SubscriptionHandle {

216

/** Stop the subscription */

217

unsubscribe(): Promise<void>;

218

219

/** Check if subscription is active */

220

active: boolean;

221

}

222

```

223

224

**Usage Examples:**

225

226

```javascript

227

// Store subscription references for cleanup

228

const subscriptions = [];

229

230

subscriptions.push(

231

await sql.subscribe('users', handleUserChange)

232

);

233

234

subscriptions.push(

235

await sql.subscribe('orders', handleOrderChange)

236

);

237

238

// Clean up all subscriptions

239

async function cleanup() {

240

for (const subscription of subscriptions) {

241

if (subscription.active) {

242

await subscription.unsubscribe();

243

}

244

}

245

subscriptions.length = 0;

246

}

247

248

// Handle process shutdown

249

process.on('SIGTERM', cleanup);

250

process.on('SIGINT', cleanup);

251

```

252

253

## Real-time Data Synchronization

254

255

### Cache Invalidation

256

257

Automatically invalidate caches when data changes.

258

259

```javascript

260

class SmartCache {

261

constructor() {

262

this.cache = new Map();

263

this.setupReplication();

264

}

265

266

async setupReplication() {

267

// Invalidate user cache on changes

268

await sql.subscribe('users', (user, info) => {

269

const userId = user?.id || info.old?.id;

270

this.cache.delete(`user:${userId}`);

271

console.log(`Invalidated cache for user ${userId}`);

272

});

273

274

// Invalidate product cache

275

await sql.subscribe('products', (product, info) => {

276

const productId = product?.id || info.old?.id;

277

this.cache.delete(`product:${productId}`);

278

279

// Also invalidate category cache

280

const categoryId = product?.category_id || info.old?.category_id;

281

this.cache.delete(`category:${categoryId}`);

282

});

283

}

284

285

async getUser(id) {

286

const key = `user:${id}`;

287

if (this.cache.has(key)) {

288

return this.cache.get(key);

289

}

290

291

const user = await sql`SELECT * FROM users WHERE id = ${id}`;

292

this.cache.set(key, user[0]);

293

return user[0];

294

}

295

}

296

297

const smartCache = new SmartCache();

298

```

299

300

### Event Sourcing

301

302

Implement event sourcing patterns using replication events.

303

304

```javascript

305

class EventStore {

306

constructor() {

307

this.events = [];

308

this.setupEventCapture();

309

}

310

311

async setupEventCapture() {

312

await sql.subscribe('*', (row, info) => {

313

const event = {

314

id: generateEventId(),

315

timestamp: new Date(),

316

aggregate: `${info.relation.schema}.${info.relation.table}`,

317

aggregateId: this.extractId(row, info),

318

eventType: info.command,

319

data: row,

320

previousData: info.old,

321

metadata: {

322

relation: info.relation,

323

keyChange: info.key

324

}

325

};

326

327

this.events.push(event);

328

this.processEvent(event);

329

});

330

}

331

332

extractId(row, info) {

333

// Extract ID from current or old row

334

return row?.id || info.old?.id;

335

}

336

337

processEvent(event) {

338

// Process event for projections, notifications, etc.

339

console.log(`Event: ${event.eventType} on ${event.aggregate}`);

340

341

// Update read models

342

this.updateProjections(event);

343

344

// Send notifications

345

this.notifySubscribers(event);

346

}

347

348

updateProjections(event) {

349

// Update materialized views or denormalized data

350

switch (event.aggregate) {

351

case 'public.orders':

352

this.updateOrderSummary(event);

353

break;

354

case 'public.users':

355

this.updateUserStats(event);

356

break;

357

}

358

}

359

}

360

361

const eventStore = new EventStore();

362

```

363

364

### Multi-Service Synchronization

365

366

Keep multiple services synchronized with database changes.

367

368

```javascript

369

class ServiceSynchronizer {

370

constructor() {

371

this.services = {

372

search: new SearchService(),

373

cache: new CacheService(),

374

analytics: new AnalyticsService()

375

};

376

377

this.setupSynchronization();

378

}

379

380

async setupSynchronization() {

381

// Synchronize user data across services

382

await sql.subscribe('users', async (user, info) => {

383

const { command } = info;

384

const userId = user?.id || info.old?.id;

385

386

try {

387

switch (command) {

388

case 'insert':

389

await Promise.all([

390

this.services.search.indexUser(user),

391

this.services.cache.cacheUser(user),

392

this.services.analytics.trackUserCreation(user)

393

]);

394

break;

395

396

case 'update':

397

await Promise.all([

398

this.services.search.updateUser(user),

399

this.services.cache.updateUser(user),

400

this.services.analytics.trackUserUpdate(user, info.old)

401

]);

402

break;

403

404

case 'delete':

405

await Promise.all([

406

this.services.search.removeUser(userId),

407

this.services.cache.removeUser(userId),

408

this.services.analytics.trackUserDeletion(info.old)

409

]);

410

break;

411

}

412

} catch (error) {

413

console.error(`Service sync error for user ${userId}:`, error);

414

// Implement retry logic or dead letter queue

415

}

416

});

417

418

// Product synchronization

419

await sql.subscribe('products', async (product, info) => {

420

await this.syncProduct(product, info);

421

});

422

}

423

424

async syncProduct(product, info) {

425

const productId = product?.id || info.old?.id;

426

427

// Update search index

428

if (info.command === 'delete') {

429

await this.services.search.removeProduct(productId);

430

} else {

431

await this.services.search.indexProduct(product);

432

}

433

434

// Update recommendations

435

await this.services.analytics.updateRecommendations(product, info);

436

437

// Clear related caches

438

await this.services.cache.clearProductCaches(productId);

439

}

440

}

441

442

const synchronizer = new ServiceSynchronizer();

443

```

444

445

## Configuration and Setup

446

447

### Prerequisites

448

449

Set up PostgreSQL for logical replication.

450

451

```sql

452

-- Enable logical replication in postgresql.conf

453

-- wal_level = logical

454

-- max_replication_slots = 4

455

-- max_wal_senders = 4

456

457

-- Create publication for tables you want to replicate

458

CREATE PUBLICATION app_changes FOR ALL TABLES;

459

460

-- Or create publication for specific tables

461

CREATE PUBLICATION user_changes FOR TABLE users, profiles;

462

463

-- Create replication slot

464

SELECT pg_create_logical_replication_slot('app_slot', 'pgoutput');

465

466

-- Grant replication permissions

467

ALTER USER myuser WITH REPLICATION;

468

```

469

470

### Connection Configuration

471

472

Configure postgres.js for replication access.

473

474

```javascript { .api }

475

// Replication requires specific connection configuration

476

const sql = postgres(connectionConfig, {

477

// Connection must have replication privileges

478

replication: 'database',

479

480

// Publication name (created above)

481

publication: 'app_changes',

482

483

// Replication slot name (created above)

484

slot: 'app_slot',

485

486

// Additional options

487

max: 1, // Replication uses dedicated connection

488

idle_timeout: 0, // Keep connection alive

489

});

490

```

491

492

**Usage Examples:**

493

494

```javascript

495

// Production replication setup

496

const replicationSql = postgres({

497

host: 'localhost',

498

database: 'myapp',

499

username: 'replication_user',

500

password: process.env.REPLICATION_PASSWORD,

501

replication: 'database',

502

publication: 'app_changes',

503

slot: 'myapp_slot',

504

max: 1,

505

idle_timeout: 0

506

});

507

508

// Start monitoring all changes

509

await replicationSql.subscribe('*', (row, info) => {

510

console.log(`Change detected: ${info.relation.table} ${info.command}`);

511

processChange(row, info);

512

});

513

```

514

515

### Error Handling and Resilience

516

517

Handle replication errors and connection issues.

518

519

```javascript

520

class ResilientReplication {

521

constructor(config) {

522

this.config = config;

523

this.subscriptions = new Map();

524

this.reconnectAttempts = 0;

525

this.maxReconnectAttempts = 10;

526

}

527

528

async connect() {

529

try {

530

this.sql = postgres(this.config);

531

await this.restoreSubscriptions();

532

this.reconnectAttempts = 0;

533

} catch (error) {

534

console.error('Replication connection failed:', error);

535

await this.scheduleReconnect();

536

}

537

}

538

539

async subscribe(pattern, handler) {

540

try {

541

const subscription = await this.sql.subscribe(

542

pattern,

543

handler,

544

() => console.log(`Subscribed to ${pattern}`),

545

(error) => this.handleSubscriptionError(pattern, error)

546

);

547

548

this.subscriptions.set(pattern, { handler, subscription });

549

return subscription;

550

} catch (error) {

551

console.error(`Subscription failed for ${pattern}:`, error);

552

throw error;

553

}

554

}

555

556

async handleSubscriptionError(pattern, error) {

557

console.error(`Subscription error for ${pattern}:`, error);

558

559

// Remove failed subscription

560

this.subscriptions.delete(pattern);

561

562

// Trigger reconnection

563

await this.scheduleReconnect();

564

}

565

566

async restoreSubscriptions() {

567

for (const [pattern, { handler }] of this.subscriptions) {

568

await this.subscribe(pattern, handler);

569

}

570

}

571

572

async scheduleReconnect() {

573

if (this.reconnectAttempts < this.maxReconnectAttempts) {

574

this.reconnectAttempts++;

575

const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

576

577

setTimeout(() => {

578

console.log(`Reconnecting... (attempt ${this.reconnectAttempts})`);

579

this.connect();

580

}, delay);

581

} else {

582

console.error('Max reconnection attempts reached');

583

}

584

}

585

}

586

587

// Usage

588

const replication = new ResilientReplication({

589

host: 'localhost',

590

database: 'myapp',

591

username: 'replication_user',

592

replication: 'database',

593

publication: 'app_changes',

594

slot: 'myapp_slot'

595

});

596

597

await replication.connect();

598

await replication.subscribe('users', handleUserChanges);

599

```