CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-psycopg2

Python-PostgreSQL Database Adapter

Pending
Overview
Eval results
Files

replication.mddocs/

PostgreSQL Replication

Logical and physical replication support for PostgreSQL streaming replication, including replication slot management, message handling, and real-time data streaming for database replication and change data capture.

Capabilities

Replication Connection Types

Specialized connection classes for different replication modes.

class LogicalReplicationConnection(connection):
    """Logical replication connection."""
    
    def __init__(self, *args, **kwargs):
        """Initialize logical replication connection."""

class PhysicalReplicationConnection(connection):
    """Physical replication connection."""
    
    def __init__(self, *args, **kwargs):
        """Initialize physical replication connection."""

Usage Example:

import psycopg2
from psycopg2.extras import LogicalReplicationConnection, PhysicalReplicationConnection

# Logical replication connection
logical_conn = psycopg2.connect(
    host="localhost",
    port=5432,
    user="replication_user",
    password="password",
    database="mydb",
    connection_factory=LogicalReplicationConnection
)

# Physical replication connection  
physical_conn = psycopg2.connect(
    host="localhost", 
    port=5432,
    user="replication_user",
    password="password",
    connection_factory=PhysicalReplicationConnection
)

Replication Cursor

Specialized cursor for replication operations with slot management and streaming capabilities.

class ReplicationCursor(cursor):
    """Cursor for replication connections."""
    
    def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
        """
        Create replication slot.
        
        Parameters:
        - slot_name (str): Name for the replication slot
        - slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
        - output_plugin (str, optional): Output plugin name for logical replication
        
        Returns:
        tuple: (slot_name, consistent_point)
        """
    
    def drop_replication_slot(self, slot_name):
        """
        Drop replication slot.
        
        Parameters:
        - slot_name (str): Name of slot to drop
        """
    
    def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, 
                         timeline=0, options=None, decode=False, status_interval=10):
        """
        Start replication stream.
        
        Parameters:
        - slot_name (str, optional): Replication slot name
        - slot_type (int, optional): REPLICATION_LOGICAL or REPLICATION_PHYSICAL
        - start_lsn (int): Starting LSN position
        - timeline (int): Timeline ID for physical replication
        - options (dict, optional): Plugin options for logical replication
        - decode (bool): Decode messages to text
        - status_interval (int): Status update interval in seconds
        """
    
    def read_replication_message(self):
        """
        Read next replication message.
        
        Returns:
        ReplicationMessage: Next message from stream
        """
    
    def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply_requested=False):
        """
        Send replication feedback to server.
        
        Parameters:
        - write_lsn (int): Write LSN position
        - flush_lsn (int): Flush LSN position  
        - apply_lsn (int): Apply LSN position
        - reply_requested (bool): Request reply from server
        """
    
    def fileno(self):
        """
        Get file descriptor for connection.
        
        Returns:
        int: File descriptor
        """

Replication Constants

Constants for replication types and operations.

REPLICATION_PHYSICAL: int  # Physical replication type
REPLICATION_LOGICAL: int   # Logical replication type

Replication Message

Container for replication stream messages with metadata and payload.

class ReplicationMessage:
    """Replication message class."""
    
    @property
    def data_start(self):
        """Start LSN of the message."""
    
    @property
    def wal_end(self):
        """End LSN of the WAL record."""
    
    @property  
    def send_time(self):
        """Send time of the message."""
    
    @property
    def payload(self):
        """Message payload data."""
    
    @property
    def cursor(self):
        """Cursor that received the message."""

Replication Control

Exception class for controlling replication flow.

class StopReplication(Exception):
    """Exception to stop replication loop."""

Usage Example:

import psycopg2
import select
from psycopg2.extras import (
    LogicalReplicationConnection, 
    REPLICATION_LOGICAL,
    StopReplication
)

# Connect for logical replication
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    user="replication_user", 
    password="password",
    database="postgres",  # Connect to postgres for slot management
    connection_factory=LogicalReplicationConnection
)

# Create replication cursor
cur = conn.cursor()

try:
    # Create logical replication slot
    slot_name = "test_slot"
    cur.create_replication_slot(
        slot_name, 
        slot_type=REPLICATION_LOGICAL,
        output_plugin="test_decoding"
    )
    print(f"Created replication slot: {slot_name}")
    
    # Start replication
    cur.start_replication(
        slot_name=slot_name,
        decode=True,
        status_interval=10
    )
    
    # Message processing loop
    message_count = 0
    max_messages = 100
    
    def process_replication_stream():
        """Process replication messages."""
        global message_count
        
        msg = cur.read_replication_message()
        if msg:
            print(f"LSN: {msg.data_start}, Time: {msg.send_time}")
            print(f"Payload: {msg.payload}")
            
            # Send feedback to server
            cur.send_feedback(flush_lsn=msg.data_start)
            
            message_count += 1
            if message_count >= max_messages:
                raise StopReplication("Processed enough messages")
    
    # Use select for non-blocking I/O
    while True:
        try:
            # Wait for data or timeout
            ready = select.select([cur], [], [], 1.0)
            if ready[0]:
                process_replication_stream()
            else:
                print("No messages, sending keepalive...")
                cur.send_feedback()
                
        except StopReplication as e:
            print(f"Stopping replication: {e}")
            break
        except psycopg2.Error as e:
            print(f"Replication error: {e}")
            break

finally:
    # Clean up
    try:
        cur.drop_replication_slot(slot_name)
        print(f"Dropped replication slot: {slot_name}")
    except:
        pass
    
    conn.close()

Logical Replication with Output Plugin

Advanced logical replication with custom output plugin options.

Usage Example:

import psycopg2
import json
from datetime import datetime
from psycopg2.extras import LogicalReplicationConnection, REPLICATION_LOGICAL

class LogicalReplicationConsumer:
    """Consumer for logical replication changes."""
    
    def __init__(self, connection_params, slot_name, plugin_name="wal2json"):
        self.connection_params = connection_params
        self.slot_name = slot_name
        self.plugin_name = plugin_name
        self.conn = None
        self.cur = None
    
    def connect(self):
        """Establish replication connection."""
        self.conn = psycopg2.connect(
            **self.connection_params,
            connection_factory=LogicalReplicationConnection
        )
        self.cur = self.conn.cursor()
    
    def create_slot(self, plugin_options=None):
        """Create replication slot with plugin."""
        try:
            self.cur.create_replication_slot(
                self.slot_name,
                slot_type=REPLICATION_LOGICAL,
                output_plugin=self.plugin_name
            )
            print(f"Created slot '{self.slot_name}' with plugin '{self.plug_name}'")
        except psycopg2.Error as e:
            if "already exists" in str(e):
                print(f"Slot '{self.slot_name}' already exists")
            else:
                raise
    
    def start_consuming(self, start_lsn=0, plugin_options=None):
        """Start consuming replication messages."""
        options = plugin_options or {
            'include-xids': '0',
            'include-timestamp': '1', 
            'include-schemas': '1',
            'include-types': '1',
            'format-version': '2'
        }
        
        self.cur.start_replication(
            slot_name=self.slot_name,
            start_lsn=start_lsn,
            options=options,
            decode=True
        )
        
        print(f"Started replication from LSN {start_lsn}")
    
    def process_messages(self, message_handler, max_messages=None):
        """Process replication messages with custom handler."""
        processed = 0
        
        try:
            while True:
                msg = self.cur.read_replication_message()
                if msg:
                    try:
                        # Parse JSON payload (for wal2json plugin)
                        if self.plugin_name == "wal2json":
                            change_data = json.loads(msg.payload)
                        else:
                            change_data = msg.payload
                        
                        # Call custom message handler
                        message_handler(msg, change_data)
                        
                        # Send acknowledgment
                        self.cur.send_feedback(flush_lsn=msg.data_start)
                        
                        processed += 1
                        if max_messages and processed >= max_messages:
                            break
                            
                    except json.JSONDecodeError as e:
                        print(f"JSON decode error: {e}")
                        print(f"Raw payload: {msg.payload}")
                    
                    except Exception as e:
                        print(f"Message processing error: {e}")
                        # Continue processing other messages
                
                else:
                    # Send periodic keepalive
                    self.cur.send_feedback()
                    
        except KeyboardInterrupt:
            print("\nReplication stopped by user")
        except Exception as e:
            print(f"Replication error: {e}")
            raise
    
    def close(self):
        """Close replication connection."""
        if self.conn:
            self.conn.close()

# Custom message handler
def handle_change_message(msg, change_data):
    """Handle individual change messages."""
    print(f"\n--- Change at LSN {msg.data_start} ---")
    print(f"Timestamp: {msg.send_time}")
    
    if isinstance(change_data, dict):
        # wal2json format
        if 'change' in change_data:
            for change in change_data['change']:
                table = f"{change['schema']}.{change['table']}"
                operation = change['kind']
                print(f"Table: {table}, Operation: {operation}")
                
                if 'columnnames' in change and 'columnvalues' in change:
                    columns = change['columnnames']
                    values = change['columnvalues']
                    data = dict(zip(columns, values))
                    print(f"Data: {data}")
    else:
        # Raw text format
        print(f"Raw change: {change_data}")

# Usage
consumer = LogicalReplicationConsumer(
    connection_params={
        'host': 'localhost',
        'port': 5432,
        'user': 'replication_user',
        'password': 'password', 
        'database': 'postgres'
    },
    slot_name='app_changes',
    plugin_name='wal2json'
)

try:
    consumer.connect()
    consumer.create_slot()
    consumer.start_consuming()
    consumer.process_messages(handle_change_message, max_messages=50)
finally:
    consumer.close()

Physical Replication

Physical replication for WAL streaming and backup purposes.

Usage Example:

import psycopg2
from psycopg2.extras import PhysicalReplicationConnection, REPLICATION_PHYSICAL

# Physical replication connection
conn = psycopg2.connect(
    host="primary_server",
    port=5432,
    user="replication_user",
    password="password",
    connection_factory=PhysicalReplicationConnection
)

cur = conn.cursor()

try:
    # Create physical replication slot
    slot_name = "standby_slot"
    cur.create_replication_slot(slot_name, slot_type=REPLICATION_PHYSICAL)
    
    # Start physical replication
    cur.start_replication(
        slot_name=slot_name,
        start_lsn=0,
        timeline=1
    )
    
    # Process WAL records
    wal_records = 0
    max_records = 1000
    
    while wal_records < max_records:
        msg = cur.read_replication_message()
        if msg:
            print(f"WAL record at LSN {msg.data_start}, size: {len(msg.payload)}")
            
            # In real scenario, you would write WAL data to files
            # or stream to standby server
            
            # Send feedback
            cur.send_feedback(flush_lsn=msg.data_start)
            wal_records += 1
        else:
            # Send keepalive
            cur.send_feedback()

finally:
    # Clean up
    try:
        cur.drop_replication_slot(slot_name)
    except:
        pass
    conn.close()

Replication Monitoring

Monitor replication lag and slot status.

Usage Example:

import psycopg2
import time
from datetime import datetime

def monitor_replication_slots(conn):
    """Monitor replication slot status."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT slot_name, slot_type, active, restart_lsn, 
                   confirmed_flush_lsn, 
                   pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag_size
            FROM pg_replication_slots
        """)
        
        slots = cur.fetchall()
        print(f"\n--- Replication Slots Status ({datetime.now()}) ---")
        for slot in slots:
            print(f"Slot: {slot[0]}")
            print(f"  Type: {slot[1]}")
            print(f"  Active: {slot[2]}")
            print(f"  Restart LSN: {slot[3]}")
            print(f"  Confirmed Flush LSN: {slot[4]}")
            print(f"  Lag Size: {slot[5]}")
            print()

def monitor_replication_stats(conn):
    """Monitor replication statistics."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT client_addr, state, sent_lsn, write_lsn, flush_lsn, 
                   replay_lsn, sync_state,
                   pg_size_pretty(pg_wal_lsn_diff(sent_lsn, flush_lsn)) as write_lag,
                   pg_size_pretty(pg_wal_lsn_diff(flush_lsn, replay_lsn)) as replay_lag
            FROM pg_stat_replication
        """)
        
        replicas = cur.fetchall()
        print(f"\n--- Replication Statistics ({datetime.now()}) ---")
        for replica in replicas:
            print(f"Client: {replica[0]}")
            print(f"  State: {replica[1]}")
            print(f"  Sent LSN: {replica[2]}")
            print(f"  Write LSN: {replica[3]}")
            print(f"  Flush LSN: {replica[4]}")
            print(f"  Replay LSN: {replica[5]}")
            print(f"  Sync State: {replica[6]}")
            print(f"  Write Lag: {replica[7]}")
            print(f"  Replay Lag: {replica[8]}")
            print()

# Monitoring usage
monitor_conn = psycopg2.connect(
    host="localhost",
    database="postgres",
    user="postgres",
    password="password"
)

try:
    monitor_replication_slots(monitor_conn)
    monitor_replication_stats(monitor_conn)
finally:
    monitor_conn.close()

Types

Replication Connection Types

class LogicalReplicationConnection(connection):
    """Connection for logical replication."""

class PhysicalReplicationConnection(connection):
    """Connection for physical replication."""

Replication Constants

REPLICATION_PHYSICAL: int  # Physical replication mode
REPLICATION_LOGICAL: int   # Logical replication mode

Replication Cursor Interface

class ReplicationCursor(cursor):
    """Specialized cursor for replication."""
    
    def create_replication_slot(self, slot_name: str, slot_type: int = None, 
                               output_plugin: str = None) -> tuple[str, str]:
        """Create replication slot."""
    
    def drop_replication_slot(self, slot_name: str) -> None:
        """Drop replication slot."""
    
    def start_replication(self, slot_name: str = None, slot_type: int = None,
                         start_lsn: int = 0, timeline: int = 0,
                         options: dict = None, decode: bool = False,
                         status_interval: int = 10) -> None:
        """Start replication stream."""
    
    def read_replication_message(self) -> 'ReplicationMessage | None':
        """Read next replication message."""
    
    def send_feedback(self, flush_lsn: int = 0, applied_lsn: int = 0,
                     reply_requested: bool = False) -> None:
        """Send feedback to server."""
    
    def fileno(self) -> int:
        """Get file descriptor."""

Replication Message Interface

class ReplicationMessage:
    """Replication stream message."""
    
    data_start: int  # Start LSN of message
    wal_end: int     # End LSN of WAL record  
    send_time: datetime  # Message send time
    payload: bytes   # Message payload data
    cursor: ReplicationCursor  # Source cursor

Control Exceptions

class StopReplication(Exception):
    """Exception to stop replication loop."""

Install with Tessl CLI

npx tessl i tessl/pypi-psycopg2

docs

advanced-cursors.md

batch-operations.md

connection-pooling.md

connections-cursors.md

error-handling.md

index.md

replication.md

sql-composition.md

timezone-support.md

type-adaptation.md

tile.json