or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-cursors.mdbatch-operations.mdconnection-pooling.mdconnections-cursors.mderror-handling.mdindex.mdreplication.mdsql-composition.mdtimezone-support.mdtype-adaptation.md

replication.mddocs/

0

# PostgreSQL Replication

1

2

Logical and physical replication support for PostgreSQL streaming replication, including replication slot management, message handling, and real-time data streaming for database replication and change data capture.

3

4

## Capabilities

5

6

### Replication Connection Types

7

8

Specialized connection classes for different replication modes.

9

10

```python { .api }

11

class LogicalReplicationConnection(connection):

12

"""Logical replication connection."""

13

14

def __init__(self, *args, **kwargs):

15

"""Initialize logical replication connection."""

16

17

class PhysicalReplicationConnection(connection):

18

"""Physical replication connection."""

19

20

def __init__(self, *args, **kwargs):

21

"""Initialize physical replication connection."""

22

```

23

24

**Usage Example:**

25

26

```python

27

import psycopg2

28

from psycopg2.extras import LogicalReplicationConnection, PhysicalReplicationConnection

29

30

# Logical replication connection

31

logical_conn = psycopg2.connect(

32

host="localhost",

33

port=5432,

34

user="replication_user",

35

password="password",

36

database="mydb",

37

connection_factory=LogicalReplicationConnection

38

)

39

40

# Physical replication connection

41

physical_conn = psycopg2.connect(

42

host="localhost",

43

port=5432,

44

user="replication_user",

45

password="password",

46

connection_factory=PhysicalReplicationConnection

47

)

48

```

49

50

### Replication Cursor

51

52

Specialized cursor for replication operations with slot management and streaming capabilities.

53

54

```python { .api }

55

class ReplicationCursor(cursor):

56

"""Cursor for replication connections."""

57

58

def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):

59

"""

60

Create replication slot.

61

62

Parameters:

63

- slot_name (str): Name for the replication slot

64

- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL

65

- output_plugin (str, optional): Output plugin name for logical replication

66

67

Returns:

68

tuple: (slot_name, consistent_point)

69

"""

70

71

def drop_replication_slot(self, slot_name):

72

"""

73

Drop replication slot.

74

75

Parameters:

76

- slot_name (str): Name of slot to drop

77

"""

78

79

def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,

80

timeline=0, options=None, decode=False, status_interval=10):

81

"""

82

Start replication stream.

83

84

Parameters:

85

- slot_name (str, optional): Replication slot name

86

- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL

87

- start_lsn (int): Starting LSN position

88

- timeline (int): Timeline ID for physical replication

89

- options (dict, optional): Plugin options for logical replication

90

- decode (bool): Decode messages to text

91

- status_interval (int): Status update interval in seconds

92

"""

93

94

def read_replication_message(self):

95

"""

96

Read next replication message.

97

98

Returns:

99

ReplicationMessage: Next message from stream

100

"""

101

102

def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply_requested=False):

103

"""

104

Send replication feedback to server.

105

106

Parameters:

107

- write_lsn (int): Write LSN position

108

- flush_lsn (int): Flush LSN position

109

- apply_lsn (int): Apply LSN position

110

- reply_requested (bool): Request reply from server

111

"""

112

113

def fileno(self):

114

"""

115

Get file descriptor for connection.

116

117

Returns:

118

int: File descriptor

119

"""

120

```

121

122

### Replication Constants

123

124

Constants for replication types and operations.

125

126

```python { .api }

127

REPLICATION_PHYSICAL: int # Physical replication type

128

REPLICATION_LOGICAL: int # Logical replication type

129

```

130

131

### Replication Message

132

133

Container for replication stream messages with metadata and payload.

134

135

```python { .api }

136

class ReplicationMessage:

137

"""Replication message class."""

138

139

@property

140

def data_start(self):

141

"""Start LSN of the message."""

142

143

@property

144

def wal_end(self):

145

"""End LSN of the WAL record."""

146

147

@property

148

def send_time(self):

149

"""Send time of the message."""

150

151

@property

152

def payload(self):

153

"""Message payload data."""

154

155

@property

156

def cursor(self):

157

"""Cursor that received the message."""

158

```

159

160

### Replication Control

161

162

Exception class for controlling replication flow.

163

164

```python { .api }

165

class StopReplication(Exception):

166

"""Exception to stop replication loop."""

167

```

168

169

**Usage Example:**

170

171

```python

172

import psycopg2

173

import select

174

from psycopg2.extras import (

175

LogicalReplicationConnection,

176

REPLICATION_LOGICAL,

177

StopReplication

178

)

179

180

# Connect for logical replication

181

conn = psycopg2.connect(

182

host="localhost",

183

port=5432,

184

user="replication_user",

185

password="password",

186

database="postgres", # Connect to postgres for slot management

187

connection_factory=LogicalReplicationConnection

188

)

189

190

# Create replication cursor

191

cur = conn.cursor()

192

193

try:

194

# Create logical replication slot

195

slot_name = "test_slot"

196

cur.create_replication_slot(

197

slot_name,

198

slot_type=REPLICATION_LOGICAL,

199

output_plugin="test_decoding"

200

)

201

print(f"Created replication slot: {slot_name}")

202

203

# Start replication

204

cur.start_replication(

205

slot_name=slot_name,

206

decode=True,

207

status_interval=10

208

)

209

210

# Message processing loop

211

message_count = 0

212

max_messages = 100

213

214

def process_replication_stream():

215

"""Process replication messages."""

216

global message_count

217

218

msg = cur.read_replication_message()

219

if msg:

220

print(f"LSN: {msg.data_start}, Time: {msg.send_time}")

221

print(f"Payload: {msg.payload}")

222

223

# Send feedback to server

224

cur.send_feedback(flush_lsn=msg.data_start)

225

226

message_count += 1

227

if message_count >= max_messages:

228

raise StopReplication("Processed enough messages")

229

230

# Use select for non-blocking I/O

231

while True:

232

try:

233

# Wait for data or timeout

234

ready = select.select([cur], [], [], 1.0)

235

if ready[0]:

236

process_replication_stream()

237

else:

238

print("No messages, sending keepalive...")

239

cur.send_feedback()

240

241

except StopReplication as e:

242

print(f"Stopping replication: {e}")

243

break

244

except psycopg2.Error as e:

245

print(f"Replication error: {e}")

246

break

247

248

finally:

249

# Clean up

250

try:

251

cur.drop_replication_slot(slot_name)

252

print(f"Dropped replication slot: {slot_name}")

253

except:

254

pass

255

256

conn.close()

257

```

258

259

### Logical Replication with Output Plugin

260

261

Advanced logical replication with custom output plugin options.

262

263

**Usage Example:**

264

265

```python

266

import psycopg2

267

import json

268

from datetime import datetime

269

from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL

270

271

class LogicalReplicationConsumer:

272

"""Consumer for logical replication changes."""

273

274

def __init__(self, connection_params, slot_name, plugin_name="wal2json"):

275

self.connection_params = connection_params

276

self.slot_name = slot_name

277

self.plugin_name = plugin_name

278

self.conn = None

279

self.cur = None

280

281

def connect(self):

282

"""Establish replication connection."""

283

self.conn = psycopg2.connect(

284

**self.connection_params,

285

connection_factory=LogicalReplicationConnection

286

)

287

self.cur = self.conn.cursor()

288

289

def create_slot(self, plugin_options=None):

290

"""Create replication slot with plugin."""

291

try:

292

self.cur.create_replication_slot(

293

self.slot_name,

294

slot_type=REPLICATION_LOGICAL,

295

output_plugin=self.plugin_name

296

)

297

print(f"Created slot '{self.slot_name}' with plugin '{self.plug_name}'")

298

except psycopg2.Error as e:

299

if "already exists" in str(e):

300

print(f"Slot '{self.slot_name}' already exists")

301

else:

302

raise

303

304

def start_consuming(self, start_lsn=0, plugin_options=None):

305

"""Start consuming replication messages."""

306

options = plugin_options or {

307

'include-xids': '0',

308

'include-timestamp': '1',

309

'include-schemas': '1',

310

'include-types': '1',

311

'format-version': '2'

312

}

313

314

self.cur.start_replication(

315

slot_name=self.slot_name,

316

start_lsn=start_lsn,

317

options=options,

318

decode=True

319

)

320

321

print(f"Started replication from LSN {start_lsn}")

322

323

def process_messages(self, message_handler, max_messages=None):

324

"""Process replication messages with custom handler."""

325

processed = 0

326

327

try:

328

while True:

329

msg = self.cur.read_replication_message()

330

if msg:

331

try:

332

# Parse JSON payload (for wal2json plugin)

333

if self.plugin_name == "wal2json":

334

change_data = json.loads(msg.payload)

335

else:

336

change_data = msg.payload

337

338

# Call custom message handler

339

message_handler(msg, change_data)

340

341

# Send acknowledgment

342

self.cur.send_feedback(flush_lsn=msg.data_start)

343

344

processed += 1

345

if max_messages and processed >= max_messages:

346

break

347

348

except json.JSONDecodeError as e:

349

print(f"JSON decode error: {e}")

350

print(f"Raw payload: {msg.payload}")

351

352

except Exception as e:

353

print(f"Message processing error: {e}")

354

# Continue processing other messages

355

356

else:

357

# Send periodic keepalive

358

self.cur.send_feedback()

359

360

except KeyboardInterrupt:

361

print("\nReplication stopped by user")

362

except Exception as e:

363

print(f"Replication error: {e}")

364

raise

365

366

def close(self):

367

"""Close replication connection."""

368

if self.conn:

369

self.conn.close()

370

371

# Custom message handler

372

def handle_change_message(msg, change_data):

373

"""Handle individual change messages."""

374

print(f"\n--- Change at LSN {msg.data_start} ---")

375

print(f"Timestamp: {msg.send_time}")

376

377

if isinstance(change_data, dict):

378

# wal2json format

379

if 'change' in change_data:

380

for change in change_data['change']:

381

table = f"{change['schema']}.{change['table']}"

382

operation = change['kind']

383

print(f"Table: {table}, Operation: {operation}")

384

385

if 'columnnames' in change and 'columnvalues' in change:

386

columns = change['columnnames']

387

values = change['columnvalues']

388

data = dict(zip(columns, values))

389

print(f"Data: {data}")

390

else:

391

# Raw text format

392

print(f"Raw change: {change_data}")

393

394

# Usage

395

consumer = LogicalReplicationConsumer(

396

connection_params={

397

'host': 'localhost',

398

'port': 5432,

399

'user': 'replication_user',

400

'password': 'password',

401

'database': 'postgres'

402

},

403

slot_name='app_changes',

404

plugin_name='wal2json'

405

)

406

407

try:

408

consumer.connect()

409

consumer.create_slot()

410

consumer.start_consuming()

411

consumer.process_messages(handle_change_message, max_messages=50)

412

finally:

413

consumer.close()

414

```

415

416

### Physical Replication

417

418

Physical replication for WAL streaming and backup purposes.

419

420

**Usage Example:**

421

422

```python

423

import psycopg2

424

from psycopg2.extras import PhysicalReplicationConnection, REPLICATION_PHYSICAL

425

426

# Physical replication connection

427

conn = psycopg2.connect(

428

host="primary_server",

429

port=5432,

430

user="replication_user",

431

password="password",

432

connection_factory=PhysicalReplicationConnection

433

)

434

435

cur = conn.cursor()

436

437

try:

438

# Create physical replication slot

439

slot_name = "standby_slot"

440

cur.create_replication_slot(slot_name, slot_type=REPLICATION_PHYSICAL)

441

442

# Start physical replication

443

cur.start_replication(

444

slot_name=slot_name,

445

start_lsn=0,

446

timeline=1

447

)

448

449

# Process WAL records

450

wal_records = 0

451

max_records = 1000

452

453

while wal_records < max_records:

454

msg = cur.read_replication_message()

455

if msg:

456

print(f"WAL record at LSN {msg.data_start}, size: {len(msg.payload)}")

457

458

# In real scenario, you would write WAL data to files

459

# or stream to standby server

460

461

# Send feedback

462

cur.send_feedback(flush_lsn=msg.data_start)

463

wal_records += 1

464

else:

465

# Send keepalive

466

cur.send_feedback()

467

468

finally:

469

# Clean up

470

try:

471

cur.drop_replication_slot(slot_name)

472

except:

473

pass

474

conn.close()

475

```

476

477

### Replication Monitoring

478

479

Monitor replication lag and slot status.

480

481

**Usage Example:**

482

483

```python

484

import psycopg2

485

import time

486

from datetime import datetime

487

488

def monitor_replication_slots(conn):

489

"""Monitor replication slot status."""

490

with conn.cursor() as cur:

491

cur.execute("""

492

SELECT slot_name, slot_type, active, restart_lsn,

493

confirmed_flush_lsn,

494

pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size

495

FROM pg_replication_slots

496

""")

497

498

slots = cur.fetchall()

499

print(f"\n--- Replication Slots Status ({datetime.now()}) ---")

500

for slot in slots:

501

print(f"Slot: {slot[0]}")

502

print(f" Type: {slot[1]}")

503

print(f" Active: {slot[2]}")

504

print(f" Restart LSN: {slot[3]}")

505

print(f" Confirmed Flush LSN: {slot[4]}")

506

print(f" Lag Size: {slot[5]}")

507

print()

508

509

def monitor_replication_stats(conn):

510

"""Monitor replication statistics."""

511

with conn.cursor() as cur:

512

cur.execute("""

513

SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn,

514

replay_lsn, sync_state,

515

pg_size_pretty(pg_wal_lsn_diff(sent_lsn, flush_lsn)) as write_lag,

516

pg_size_pretty(pg_wal_lsn_diff(flush_lsn, replay_lsn)) as replay_lag

517

FROM pg_stat_replication

518

""")

519

520

replicas = cur.fetchall()

521

print(f"\n--- Replication Statistics ({datetime.now()}) ---")

522

for replica in replicas:

523

print(f"Client: {replica[0]}")

524

print(f" State: {replica[1]}")

525

print(f" Sent LSN: {replica[2]}")

526

print(f" Write LSN: {replica[3]}")

527

print(f" Flush LSN: {replica[4]}")

528

print(f" Replay LSN: {replica[5]}")

529

print(f" Sync State: {replica[6]}")

530

print(f" Write Lag: {replica[7]}")

531

print(f" Replay Lag: {replica[8]}")

532

print()

533

534

# Monitoring usage

535

monitor_conn = psycopg2.connect(

536

host="localhost",

537

database="postgres",

538

user="postgres",

539

password="password"

540

)

541

542

try:

543

monitor_replication_slots(monitor_conn)

544

monitor_replication_stats(monitor_conn)

545

finally:

546

monitor_conn.close()

547

```

548

549

## Types

550

551

### Replication Connection Types

552

553

```python { .api }

554

class LogicalReplicationConnection(connection):

555

"""Connection for logical replication."""

556

557

class PhysicalReplicationConnection(connection):

558

"""Connection for physical replication."""

559

```

560

561

### Replication Constants

562

563

```python { .api }

564

REPLICATION_PHYSICAL: int # Physical replication mode

565

REPLICATION_LOGICAL: int # Logical replication mode

566

```

567

568

### Replication Cursor Interface

569

570

```python { .api }

571

class ReplicationCursor(cursor):

572

"""Specialized cursor for replication."""

573

574

def create_replication_slot(self, slot_name: str, slot_type: int = None,

575

output_plugin: str = None) -> tuple[str, str]:

576

"""Create replication slot."""

577

578

def drop_replication_slot(self, slot_name: str) -> None:

579

"""Drop replication slot."""

580

581

def start_replication(self, slot_name: str = None, slot_type: int = None,

582

start_lsn: int = 0, timeline: int = 0,

583

options: dict = None, decode: bool = False,

584

status_interval: int = 10) -> None:

585

"""Start replication stream."""

586

587

def read_replication_message(self) -> 'ReplicationMessage | None':

588

"""Read next replication message."""

589

590

def send_feedback(self, flush_lsn: int = 0, applied_lsn: int = 0,

591

reply_requested: bool = False) -> None:

592

"""Send feedback to server."""

593

594

def fileno(self) -> int:

595

"""Get file descriptor."""

596

```

597

598

### Replication Message Interface

599

600

```python { .api }

601

class ReplicationMessage:

602

"""Replication stream message."""

603

604

data_start: int # Start LSN of message

605

wal_end: int # End LSN of WAL record

606

send_time: datetime # Message send time

607

payload: bytes # Message payload data

608

cursor: ReplicationCursor # Source cursor

609

```

610

611

### Control Exceptions

612

613

```python { .api }

614

class StopReplication(Exception):

615

"""Exception to stop replication loop."""

616

```