0
# PostgreSQL Replication
1
2
Logical and physical replication support for PostgreSQL streaming replication, including replication slot management, message handling, and real-time data streaming for database replication and change data capture.
3
4
## Capabilities
5
6
### Replication Connection Types
7
8
Specialized connection classes for different replication modes.
9
10
```python { .api }
11
class LogicalReplicationConnection(connection):
12
"""Logical replication connection."""
13
14
def __init__(self, *args, **kwargs):
15
"""Initialize logical replication connection."""
16
17
class PhysicalReplicationConnection(connection):
18
"""Physical replication connection."""
19
20
def __init__(self, *args, **kwargs):
21
"""Initialize physical replication connection."""
22
```
23
24
**Usage Example:**
25
26
```python
27
import psycopg2
28
from psycopg2.extras import LogicalReplicationConnection, PhysicalReplicationConnection
29
30
# Logical replication connection
31
logical_conn = psycopg2.connect(
32
host="localhost",
33
port=5432,
34
user="replication_user",
35
password="password",
36
database="mydb",
37
connection_factory=LogicalReplicationConnection
38
)
39
40
# Physical replication connection
41
physical_conn = psycopg2.connect(
42
host="localhost",
43
port=5432,
44
user="replication_user",
45
password="password",
46
connection_factory=PhysicalReplicationConnection
47
)
48
```
49
50
### Replication Cursor
51
52
Specialized cursor for replication operations with slot management and streaming capabilities.
53
54
```python { .api }
55
class ReplicationCursor(cursor):
56
"""Cursor for replication connections."""
57
58
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
59
"""
60
Create replication slot.
61
62
Parameters:
63
- slot_name (str): Name for the replication slot
64
- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
65
- output_plugin (str, optional): Output plugin name for logical replication
66
67
Returns:
68
tuple: (slot_name, consistent_point)
69
"""
70
71
def drop_replication_slot(self, slot_name):
72
"""
73
Drop replication slot.
74
75
Parameters:
76
- slot_name (str): Name of slot to drop
77
"""
78
79
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
80
timeline=0, options=None, decode=False, status_interval=10):
81
"""
82
Start replication stream.
83
84
Parameters:
85
- slot_name (str, optional): Replication slot name
86
- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
87
- start_lsn (int): Starting LSN position
88
- timeline (int): Timeline ID for physical replication
89
- options (dict, optional): Plugin options for logical replication
90
- decode (bool): Decode messages to text
91
- status_interval (int): Status update interval in seconds
92
"""
93
94
def read_replication_message(self):
95
"""
96
Read next replication message.
97
98
Returns:
99
ReplicationMessage: Next message from stream
100
"""
101
102
def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply_requested=False):
103
"""
104
Send replication feedback to server.
105
106
Parameters:
107
- write_lsn (int): Write LSN position
108
- flush_lsn (int): Flush LSN position
109
- apply_lsn (int): Apply LSN position
110
- reply_requested (bool): Request reply from server
111
"""
112
113
def fileno(self):
114
"""
115
Get file descriptor for connection.
116
117
Returns:
118
int: File descriptor
119
"""
120
```
121
122
### Replication Constants
123
124
Constants for replication types and operations.
125
126
```python { .api }
127
REPLICATION_PHYSICAL: int # Physical replication type
128
REPLICATION_LOGICAL: int # Logical replication type
129
```
130
131
### Replication Message
132
133
Container for replication stream messages with metadata and payload.
134
135
```python { .api }
136
class ReplicationMessage:
137
"""Replication message class."""
138
139
@property
140
def data_start(self):
141
"""Start LSN of the message."""
142
143
@property
144
def wal_end(self):
145
"""End LSN of the WAL record."""
146
147
@property
148
def send_time(self):
149
"""Send time of the message."""
150
151
@property
152
def payload(self):
153
"""Message payload data."""
154
155
@property
156
def cursor(self):
157
"""Cursor that received the message."""
158
```
159
160
### Replication Control
161
162
Exception class for controlling replication flow.
163
164
```python { .api }
165
class StopReplication(Exception):
166
"""Exception to stop replication loop."""
167
```
168
169
**Usage Example:**
170
171
```python
172
import psycopg2
173
import select
174
from psycopg2.extras import (
175
LogicalReplicationConnection,
176
REPLICATION_LOGICAL,
177
StopReplication
178
)
179
180
# Connect for logical replication
181
conn = psycopg2.connect(
182
host="localhost",
183
port=5432,
184
user="replication_user",
185
password="password",
186
database="postgres", # Connect to postgres for slot management
187
connection_factory=LogicalReplicationConnection
188
)
189
190
# Create replication cursor
191
cur = conn.cursor()
192
193
try:
194
# Create logical replication slot
195
slot_name = "test_slot"
196
cur.create_replication_slot(
197
slot_name,
198
slot_type=REPLICATION_LOGICAL,
199
output_plugin="test_decoding"
200
)
201
print(f"Created replication slot: {slot_name}")
202
203
# Start replication
204
cur.start_replication(
205
slot_name=slot_name,
206
decode=True,
207
status_interval=10
208
)
209
210
# Message processing loop
211
message_count = 0
212
max_messages = 100
213
214
def process_replication_stream():
215
"""Process replication messages."""
216
global message_count
217
218
msg = cur.read_replication_message()
219
if msg:
220
print(f"LSN: {msg.data_start}, Time: {msg.send_time}")
221
print(f"Payload: {msg.payload}")
222
223
# Send feedback to server
224
cur.send_feedback(flush_lsn=msg.data_start)
225
226
message_count += 1
227
if message_count >= max_messages:
228
raise StopReplication("Processed enough messages")
229
230
# Use select for non-blocking I/O
231
while True:
232
try:
233
# Wait for data or timeout
234
ready = select.select([cur], [], [], 1.0)
235
if ready[0]:
236
process_replication_stream()
237
else:
238
print("No messages, sending keepalive...")
239
cur.send_feedback()
240
241
except StopReplication as e:
242
print(f"Stopping replication: {e}")
243
break
244
except psycopg2.Error as e:
245
print(f"Replication error: {e}")
246
break
247
248
finally:
249
# Clean up
250
try:
251
cur.drop_replication_slot(slot_name)
252
print(f"Dropped replication slot: {slot_name}")
253
except:
254
pass
255
256
conn.close()
257
```
258
259
### Logical Replication with Output Plugin
260
261
Advanced logical replication with custom output plugin options.
262
263
**Usage Example:**
264
265
```python
266
import psycopg2
267
import json
268
from datetime import datetime
269
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL
270
271
class LogicalReplicationConsumer:
272
"""Consumer for logical replication changes."""
273
274
def __init__(self, connection_params, slot_name, plugin_name="wal2json"):
275
self.connection_params = connection_params
276
self.slot_name = slot_name
277
self.plugin_name = plugin_name
278
self.conn = None
279
self.cur = None
280
281
def connect(self):
282
"""Establish replication connection."""
283
self.conn = psycopg2.connect(
284
**self.connection_params,
285
connection_factory=LogicalReplicationConnection
286
)
287
self.cur = self.conn.cursor()
288
289
def create_slot(self, plugin_options=None):
290
"""Create replication slot with plugin."""
291
try:
292
self.cur.create_replication_slot(
293
self.slot_name,
294
slot_type=REPLICATION_LOGICAL,
295
output_plugin=self.plugin_name
296
)
297
print(f"Created slot '{self.slot_name}' with plugin '{self.plug_name}'")
298
except psycopg2.Error as e:
299
if "already exists" in str(e):
300
print(f"Slot '{self.slot_name}' already exists")
301
else:
302
raise
303
304
def start_consuming(self, start_lsn=0, plugin_options=None):
305
"""Start consuming replication messages."""
306
options = plugin_options or {
307
'include-xids': '0',
308
'include-timestamp': '1',
309
'include-schemas': '1',
310
'include-types': '1',
311
'format-version': '2'
312
}
313
314
self.cur.start_replication(
315
slot_name=self.slot_name,
316
start_lsn=start_lsn,
317
options=options,
318
decode=True
319
)
320
321
print(f"Started replication from LSN {start_lsn}")
322
323
def process_messages(self, message_handler, max_messages=None):
324
"""Process replication messages with custom handler."""
325
processed = 0
326
327
try:
328
while True:
329
msg = self.cur.read_replication_message()
330
if msg:
331
try:
332
# Parse JSON payload (for wal2json plugin)
333
if self.plugin_name == "wal2json":
334
change_data = json.loads(msg.payload)
335
else:
336
change_data = msg.payload
337
338
# Call custom message handler
339
message_handler(msg, change_data)
340
341
# Send acknowledgment
342
self.cur.send_feedback(flush_lsn=msg.data_start)
343
344
processed += 1
345
if max_messages and processed >= max_messages:
346
break
347
348
except json.JSONDecodeError as e:
349
print(f"JSON decode error: {e}")
350
print(f"Raw payload: {msg.payload}")
351
352
except Exception as e:
353
print(f"Message processing error: {e}")
354
# Continue processing other messages
355
356
else:
357
# Send periodic keepalive
358
self.cur.send_feedback()
359
360
except KeyboardInterrupt:
361
print("\nReplication stopped by user")
362
except Exception as e:
363
print(f"Replication error: {e}")
364
raise
365
366
def close(self):
367
"""Close replication connection."""
368
if self.conn:
369
self.conn.close()
370
371
# Custom message handler
372
def handle_change_message(msg, change_data):
373
"""Handle individual change messages."""
374
print(f"\n--- Change at LSN {msg.data_start} ---")
375
print(f"Timestamp: {msg.send_time}")
376
377
if isinstance(change_data, dict):
378
# wal2json format
379
if 'change' in change_data:
380
for change in change_data['change']:
381
table = f"{change['schema']}.{change['table']}"
382
operation = change['kind']
383
print(f"Table: {table}, Operation: {operation}")
384
385
if 'columnnames' in change and 'columnvalues' in change:
386
columns = change['columnnames']
387
values = change['columnvalues']
388
data = dict(zip(columns, values))
389
print(f"Data: {data}")
390
else:
391
# Raw text format
392
print(f"Raw change: {change_data}")
393
394
# Usage
395
consumer = LogicalReplicationConsumer(
396
connection_params={
397
'host': 'localhost',
398
'port': 5432,
399
'user': 'replication_user',
400
'password': 'password',
401
'database': 'postgres'
402
},
403
slot_name='app_changes',
404
plugin_name='wal2json'
405
)
406
407
try:
408
consumer.connect()
409
consumer.create_slot()
410
consumer.start_consuming()
411
consumer.process_messages(handle_change_message, max_messages=50)
412
finally:
413
consumer.close()
414
```
415
416
### Physical Replication
417
418
Physical replication for WAL streaming and backup purposes.
419
420
**Usage Example:**
421
422
```python
423
import psycopg2
424
from psycopg2.extras import PhysicalReplicationConnection, REPLICATION_PHYSICAL
425
426
# Physical replication connection
427
conn = psycopg2.connect(
428
host="primary_server",
429
port=5432,
430
user="replication_user",
431
password="password",
432
connection_factory=PhysicalReplicationConnection
433
)
434
435
cur = conn.cursor()
436
437
try:
438
# Create physical replication slot
439
slot_name = "standby_slot"
440
cur.create_replication_slot(slot_name, slot_type=REPLICATION_PHYSICAL)
441
442
# Start physical replication
443
cur.start_replication(
444
slot_name=slot_name,
445
start_lsn=0,
446
timeline=1
447
)
448
449
# Process WAL records
450
wal_records = 0
451
max_records = 1000
452
453
while wal_records < max_records:
454
msg = cur.read_replication_message()
455
if msg:
456
print(f"WAL record at LSN {msg.data_start}, size: {len(msg.payload)}")
457
458
# In real scenario, you would write WAL data to files
459
# or stream to standby server
460
461
# Send feedback
462
cur.send_feedback(flush_lsn=msg.data_start)
463
wal_records += 1
464
else:
465
# Send keepalive
466
cur.send_feedback()
467
468
finally:
469
# Clean up
470
try:
471
cur.drop_replication_slot(slot_name)
472
except:
473
pass
474
conn.close()
475
```
476
477
### Replication Monitoring
478
479
Monitor replication lag and slot status.
480
481
**Usage Example:**
482
483
```python
484
import psycopg2
485
import time
486
from datetime import datetime
487
488
def monitor_replication_slots(conn):
489
"""Monitor replication slot status."""
490
with conn.cursor() as cur:
491
cur.execute("""
492
SELECT slot_name, slot_type, active, restart_lsn,
493
confirmed_flush_lsn,
494
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size
495
FROM pg_replication_slots
496
""")
497
498
slots = cur.fetchall()
499
print(f"\n--- Replication Slots Status ({datetime.now()}) ---")
500
for slot in slots:
501
print(f"Slot: {slot[0]}")
502
print(f" Type: {slot[1]}")
503
print(f" Active: {slot[2]}")
504
print(f" Restart LSN: {slot[3]}")
505
print(f" Confirmed Flush LSN: {slot[4]}")
506
print(f" Lag Size: {slot[5]}")
507
print()
508
509
def monitor_replication_stats(conn):
510
"""Monitor replication statistics."""
511
with conn.cursor() as cur:
512
cur.execute("""
513
SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn,
514
replay_lsn, sync_state,
515
pg_size_pretty(pg_wal_lsn_diff(sent_lsn, flush_lsn)) as write_lag,
516
pg_size_pretty(pg_wal_lsn_diff(flush_lsn, replay_lsn)) as replay_lag
517
FROM pg_stat_replication
518
""")
519
520
replicas = cur.fetchall()
521
print(f"\n--- Replication Statistics ({datetime.now()}) ---")
522
for replica in replicas:
523
print(f"Client: {replica[0]}")
524
print(f" State: {replica[1]}")
525
print(f" Sent LSN: {replica[2]}")
526
print(f" Write LSN: {replica[3]}")
527
print(f" Flush LSN: {replica[4]}")
528
print(f" Replay LSN: {replica[5]}")
529
print(f" Sync State: {replica[6]}")
530
print(f" Write Lag: {replica[7]}")
531
print(f" Replay Lag: {replica[8]}")
532
print()
533
534
# Monitoring usage
535
monitor_conn = psycopg2.connect(
536
host="localhost",
537
database="postgres",
538
user="postgres",
539
password="password"
540
)
541
542
try:
543
monitor_replication_slots(monitor_conn)
544
monitor_replication_stats(monitor_conn)
545
finally:
546
monitor_conn.close()
547
```
548
549
## Types
550
551
### Replication Connection Types
552
553
```python { .api }
554
class LogicalReplicationConnection(connection):
555
"""Connection for logical replication."""
556
557
class PhysicalReplicationConnection(connection):
558
"""Connection for physical replication."""
559
```
560
561
### Replication Constants
562
563
```python { .api }
564
REPLICATION_PHYSICAL: int # Physical replication mode
565
REPLICATION_LOGICAL: int # Logical replication mode
566
```
567
568
### Replication Cursor Interface
569
570
```python { .api }
571
class ReplicationCursor(cursor):
572
"""Specialized cursor for replication."""
573
574
def create_replication_slot(self, slot_name: str, slot_type: int = None,
575
output_plugin: str = None) -> tuple[str, str]:
576
"""Create replication slot."""
577
578
def drop_replication_slot(self, slot_name: str) -> None:
579
"""Drop replication slot."""
580
581
def start_replication(self, slot_name: str = None, slot_type: int = None,
582
start_lsn: int = 0, timeline: int = 0,
583
options: dict = None, decode: bool = False,
584
status_interval: int = 10) -> None:
585
"""Start replication stream."""
586
587
def read_replication_message(self) -> 'ReplicationMessage | None':
588
"""Read next replication message."""
589
590
def send_feedback(self, flush_lsn: int = 0, applied_lsn: int = 0,
591
reply_requested: bool = False) -> None:
592
"""Send feedback to server."""
593
594
def fileno(self) -> int:
595
"""Get file descriptor."""
596
```
597
598
### Replication Message Interface
599
600
```python { .api }
601
class ReplicationMessage:
602
"""Replication stream message."""
603
604
data_start: int # Start LSN of message
605
wal_end: int # End LSN of WAL record
606
send_time: datetime # Message send time
607
payload: bytes # Message payload data
608
cursor: ReplicationCursor # Source cursor
609
```
610
611
### Control Exceptions
612
613
```python { .api }
614
class StopReplication(Exception):
615
"""Exception to stop replication loop."""
616
```