Python-PostgreSQL Database Adapter
—
Logical and physical replication support for PostgreSQL streaming replication, including replication slot management, message handling, and real-time data streaming for database replication and change data capture.
Specialized connection classes for different replication modes.
class LogicalReplicationConnection(connection):
"""Logical replication connection."""
def __init__(self, *args, **kwargs):
"""Initialize logical replication connection."""
class PhysicalReplicationConnection(connection):
"""Physical replication connection."""
def __init__(self, *args, **kwargs):
"""Initialize physical replication connection."""Usage Example:
import psycopg2
from psycopg2.extras import LogicalReplicationConnection, PhysicalReplicationConnection
# Logical replication connection
logical_conn = psycopg2.connect(
host="localhost",
port=5432,
user="replication_user",
password="password",
database="mydb",
connection_factory=LogicalReplicationConnection
)
# Physical replication connection
physical_conn = psycopg2.connect(
host="localhost",
port=5432,
user="replication_user",
password="password",
connection_factory=PhysicalReplicationConnection
)Specialized cursor for replication operations with slot management and streaming capabilities.
class ReplicationCursor(cursor):
"""Cursor for replication connections."""
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""
Create replication slot.
Parameters:
- slot_name (str): Name for the replication slot
- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
- output_plugin (str, optional): Output plugin name for logical replication
Returns:
tuple: (slot_name, consistent_point)
"""
def drop_replication_slot(self, slot_name):
"""
Drop replication slot.
Parameters:
- slot_name (str): Name of slot to drop
"""
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False, status_interval=10):
"""
Start replication stream.
Parameters:
- slot_name (str, optional): Replication slot name
- slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
- start_lsn (int): Starting LSN position
- timeline (int): Timeline ID for physical replication
- options (dict, optional): Plugin options for logical replication
- decode (bool): Decode messages to text
- status_interval (int): Status update interval in seconds
"""
def read_replication_message(self):
"""
Read next replication message.
Returns:
ReplicationMessage: Next message from stream
"""
def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply_requested=False):
"""
Send replication feedback to server.
Parameters:
- write_lsn (int): Write LSN position
- flush_lsn (int): Flush LSN position
- apply_lsn (int): Apply LSN position
- reply_requested (bool): Request reply from server
"""
def fileno(self):
"""
Get file descriptor for connection.
Returns:
int: File descriptor
"""Constants for replication types and operations.
REPLICATION_PHYSICAL: int # Physical replication type
REPLICATION_LOGICAL: int # Logical replication typeContainer for replication stream messages with metadata and payload.
class ReplicationMessage:
"""Replication message class."""
@property
def data_start(self):
"""Start LSN of the message."""
@property
def wal_end(self):
"""End LSN of the WAL record."""
@property
def send_time(self):
"""Send time of the message."""
@property
def payload(self):
"""Message payload data."""
@property
def cursor(self):
"""Cursor that received the message."""Exception class for controlling replication flow.
class StopReplication(Exception):
"""Exception to stop replication loop."""Usage Example:
import psycopg2
import select
from psycopg2.extras import (
LogicalReplicationConnection,
REPLICATION_LOGICAL,
StopReplication
)
# Connect for logical replication
conn = psycopg2.connect(
host="localhost",
port=5432,
user="replication_user",
password="password",
database="postgres", # Connect to postgres for slot management
connection_factory=LogicalReplicationConnection
)
# Create replication cursor
cur = conn.cursor()
try:
# Create logical replication slot
slot_name = "test_slot"
cur.create_replication_slot(
slot_name,
slot_type=REPLICATION_LOGICAL,
output_plugin="test_decoding"
)
print(f"Created replication slot: {slot_name}")
# Start replication
cur.start_replication(
slot_name=slot_name,
decode=True,
status_interval=10
)
# Message processing loop
message_count = 0
max_messages = 100
def process_replication_stream():
"""Process replication messages."""
global message_count
msg = cur.read_replication_message()
if msg:
print(f"LSN: {msg.data_start}, Time: {msg.send_time}")
print(f"Payload: {msg.payload}")
# Send feedback to server
cur.send_feedback(flush_lsn=msg.data_start)
message_count += 1
if message_count >= max_messages:
raise StopReplication("Processed enough messages")
# Use select for non-blocking I/O
while True:
try:
# Wait for data or timeout
ready = select.select([cur], [], [], 1.0)
if ready[0]:
process_replication_stream()
else:
print("No messages, sending keepalive...")
cur.send_feedback()
except StopReplication as e:
print(f"Stopping replication: {e}")
break
except psycopg2.Error as e:
print(f"Replication error: {e}")
break
finally:
# Clean up
try:
cur.drop_replication_slot(slot_name)
print(f"Dropped replication slot: {slot_name}")
except:
pass
conn.close()Advanced logical replication with custom output plugin options.
Usage Example:
import psycopg2
import json
from datetime import datetime
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL
class LogicalReplicationConsumer:
"""Consumer for logical replication changes."""
def __init__(self, connection_params, slot_name, plugin_name="wal2json"):
self.connection_params = connection_params
self.slot_name = slot_name
self.plugin_name = plugin_name
self.conn = None
self.cur = None
def connect(self):
"""Establish replication connection."""
self.conn = psycopg2.connect(
**self.connection_params,
connection_factory=LogicalReplicationConnection
)
self.cur = self.conn.cursor()
def create_slot(self, plugin_options=None):
"""Create replication slot with plugin."""
try:
self.cur.create_replication_slot(
self.slot_name,
slot_type=REPLICATION_LOGICAL,
output_plugin=self.plugin_name
)
print(f"Created slot '{self.slot_name}' with plugin '{self.plug_name}'")
except psycopg2.Error as e:
if "already exists" in str(e):
print(f"Slot '{self.slot_name}' already exists")
else:
raise
def start_consuming(self, start_lsn=0, plugin_options=None):
"""Start consuming replication messages."""
options = plugin_options or {
'include-xids': '0',
'include-timestamp': '1',
'include-schemas': '1',
'include-types': '1',
'format-version': '2'
}
self.cur.start_replication(
slot_name=self.slot_name,
start_lsn=start_lsn,
options=options,
decode=True
)
print(f"Started replication from LSN {start_lsn}")
def process_messages(self, message_handler, max_messages=None):
"""Process replication messages with custom handler."""
processed = 0
try:
while True:
msg = self.cur.read_replication_message()
if msg:
try:
# Parse JSON payload (for wal2json plugin)
if self.plugin_name == "wal2json":
change_data = json.loads(msg.payload)
else:
change_data = msg.payload
# Call custom message handler
message_handler(msg, change_data)
# Send acknowledgment
self.cur.send_feedback(flush_lsn=msg.data_start)
processed += 1
if max_messages and processed >= max_messages:
break
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
print(f"Raw payload: {msg.payload}")
except Exception as e:
print(f"Message processing error: {e}")
# Continue processing other messages
else:
# Send periodic keepalive
self.cur.send_feedback()
except KeyboardInterrupt:
print("\nReplication stopped by user")
except Exception as e:
print(f"Replication error: {e}")
raise
def close(self):
"""Close replication connection."""
if self.conn:
self.conn.close()
# Custom message handler
def handle_change_message(msg, change_data):
"""Handle individual change messages."""
print(f"\n--- Change at LSN {msg.data_start} ---")
print(f"Timestamp: {msg.send_time}")
if isinstance(change_data, dict):
# wal2json format
if 'change' in change_data:
for change in change_data['change']:
table = f"{change['schema']}.{change['table']}"
operation = change['kind']
print(f"Table: {table}, Operation: {operation}")
if 'columnnames' in change and 'columnvalues' in change:
columns = change['columnnames']
values = change['columnvalues']
data = dict(zip(columns, values))
print(f"Data: {data}")
else:
# Raw text format
print(f"Raw change: {change_data}")
# Usage
consumer = LogicalReplicationConsumer(
connection_params={
'host': 'localhost',
'port': 5432,
'user': 'replication_user',
'password': 'password',
'database': 'postgres'
},
slot_name='app_changes',
plugin_name='wal2json'
)
try:
consumer.connect()
consumer.create_slot()
consumer.start_consuming()
consumer.process_messages(handle_change_message, max_messages=50)
finally:
consumer.close()Physical replication for WAL streaming and backup purposes.
Usage Example:
import psycopg2
from psycopg2.extras import PhysicalReplicationConnection, REPLICATION_PHYSICAL
# Physical replication connection
conn = psycopg2.connect(
host="primary_server",
port=5432,
user="replication_user",
password="password",
connection_factory=PhysicalReplicationConnection
)
cur = conn.cursor()
try:
# Create physical replication slot
slot_name = "standby_slot"
cur.create_replication_slot(slot_name, slot_type=REPLICATION_PHYSICAL)
# Start physical replication
cur.start_replication(
slot_name=slot_name,
start_lsn=0,
timeline=1
)
# Process WAL records
wal_records = 0
max_records = 1000
while wal_records < max_records:
msg = cur.read_replication_message()
if msg:
print(f"WAL record at LSN {msg.data_start}, size: {len(msg.payload)}")
# In real scenario, you would write WAL data to files
# or stream to standby server
# Send feedback
cur.send_feedback(flush_lsn=msg.data_start)
wal_records += 1
else:
# Send keepalive
cur.send_feedback()
finally:
# Clean up
try:
cur.drop_replication_slot(slot_name)
except:
pass
conn.close()Monitor replication lag and slot status.
Usage Example:
import psycopg2
import time
from datetime import datetime
def monitor_replication_slots(conn):
"""Monitor replication slot status."""
with conn.cursor() as cur:
cur.execute("""
SELECT slot_name, slot_type, active, restart_lsn,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size
FROM pg_replication_slots
""")
slots = cur.fetchall()
print(f"\n--- Replication Slots Status ({datetime.now()}) ---")
for slot in slots:
print(f"Slot: {slot[0]}")
print(f" Type: {slot[1]}")
print(f" Active: {slot[2]}")
print(f" Restart LSN: {slot[3]}")
print(f" Confirmed Flush LSN: {slot[4]}")
print(f" Lag Size: {slot[5]}")
print()
def monitor_replication_stats(conn):
"""Monitor replication statistics."""
with conn.cursor() as cur:
cur.execute("""
SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn,
replay_lsn, sync_state,
pg_size_pretty(pg_wal_lsn_diff(sent_lsn, flush_lsn)) as write_lag,
pg_size_pretty(pg_wal_lsn_diff(flush_lsn, replay_lsn)) as replay_lag
FROM pg_stat_replication
""")
replicas = cur.fetchall()
print(f"\n--- Replication Statistics ({datetime.now()}) ---")
for replica in replicas:
print(f"Client: {replica[0]}")
print(f" State: {replica[1]}")
print(f" Sent LSN: {replica[2]}")
print(f" Write LSN: {replica[3]}")
print(f" Flush LSN: {replica[4]}")
print(f" Replay LSN: {replica[5]}")
print(f" Sync State: {replica[6]}")
print(f" Write Lag: {replica[7]}")
print(f" Replay Lag: {replica[8]}")
print()
# Monitoring usage
monitor_conn = psycopg2.connect(
host="localhost",
database="postgres",
user="postgres",
password="password"
)
try:
monitor_replication_slots(monitor_conn)
monitor_replication_stats(monitor_conn)
finally:
monitor_conn.close()class LogicalReplicationConnection(connection):
"""Connection for logical replication."""
class PhysicalReplicationConnection(connection):
"""Connection for physical replication."""REPLICATION_PHYSICAL: int # Physical replication mode
REPLICATION_LOGICAL: int # Logical replication modeclass ReplicationCursor(cursor):
"""Specialized cursor for replication."""
def create_replication_slot(self, slot_name: str, slot_type: int = None,
output_plugin: str = None) -> tuple[str, str]:
"""Create replication slot."""
def drop_replication_slot(self, slot_name: str) -> None:
"""Drop replication slot."""
def start_replication(self, slot_name: str = None, slot_type: int = None,
start_lsn: int = 0, timeline: int = 0,
options: dict = None, decode: bool = False,
status_interval: int = 10) -> None:
"""Start replication stream."""
def read_replication_message(self) -> 'ReplicationMessage | None':
"""Read next replication message."""
def send_feedback(self, flush_lsn: int = 0, applied_lsn: int = 0,
reply_requested: bool = False) -> None:
"""Send feedback to server."""
def fileno(self) -> int:
"""Get file descriptor."""class ReplicationMessage:
"""Replication stream message."""
data_start: int # Start LSN of message
wal_end: int # End LSN of WAL record
send_time: datetime # Message send time
payload: bytes # Message payload data
cursor: ReplicationCursor # Source cursorclass StopReplication(Exception):
"""Exception to stop replication loop."""Install with Tessl CLI
npx tessl i tessl/pypi-psycopg2