CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-py-postgresql

PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.

Pending
Overview
Eval results
Files

cluster-management.mddocs/

Cluster Management

PostgreSQL cluster and installation management tools for controlling local PostgreSQL instances, configuration, and maintenance operations.

Capabilities

Cluster Interface

Interface for managing PostgreSQL cluster instances including startup, shutdown, and configuration.

class Cluster:
    """
    PostgreSQL cluster management interface for local cluster control.
    """
    
    def start():
        """
        Start the PostgreSQL cluster.
        
        Raises:
        ClusterError: If cluster cannot be started
        """
    
    def stop():
        """
        Stop the PostgreSQL cluster gracefully.
        
        Raises:
        ClusterError: If cluster cannot be stopped
        """
    
    def restart():
        """
        Restart the PostgreSQL cluster.
        
        Raises:
        ClusterError: If cluster cannot be restarted
        """
    
    def initialize():
        """
        Initialize a new PostgreSQL cluster (initdb).
        
        Raises:
        ClusterError: If cluster initialization fails
        """
    
    def status():
        """
        Get cluster status information.
        
        Returns:
        dict: Cluster status including state, PID, uptime
        """
    
    def reload():
        """
        Reload cluster configuration without restart.
        
        Raises:
        ClusterError: If configuration reload fails
        """
    
    @property
    def data_directory():
        """
        Get cluster data directory path.
        
        Returns:
        str: Path to PostgreSQL data directory
        """
    
    @property
    def configuration():
        """
        Get cluster configuration settings.
        
        Returns:
        dict: Configuration parameters and values
        """
    
    @property
    def log_file():
        """
        Get path to cluster log file.
        
        Returns:
        str: Path to PostgreSQL log file
        """

Installation Interface

Interface for PostgreSQL installation detection and information retrieval.

class Installation:
    """
    PostgreSQL installation information and utilities.
    """
    
    def version():
        """
        Get PostgreSQL version information.
        
        Returns:
        dict: Version details including major, minor, patch versions
        """
    
    def bin_directory():
        """
        Get PostgreSQL binary directory path.
        
        Returns:
        str: Path to PostgreSQL binaries (pg_ctl, psql, etc.)
        """
    
    def lib_directory():
        """
        Get PostgreSQL library directory path.
        
        Returns:
        str: Path to PostgreSQL libraries
        """
    
    def share_directory():
        """
        Get PostgreSQL shared data directory path.
        
        Returns:
        str: Path to PostgreSQL shared files
        """
    
    def config_directory():
        """
        Get default configuration directory path.
        
        Returns:
        str: Path to default configuration files
        """
    
    def find_clusters():
        """
        Find existing PostgreSQL clusters on the system.
        
        Returns:
        list: List of cluster data directories found
        """

Cluster Creation and Configuration

Functions for creating and configuring new PostgreSQL clusters.

def create_cluster(data_directory, **options):
    """
    Create a new PostgreSQL cluster.
    
    Parameters:
    - data_directory (str): Path for cluster data directory
    - **options: Initialization options (encoding, locale, auth_method, etc.)
    
    Returns:
    Cluster: New cluster instance
    
    Raises:
    ClusterError: If cluster creation fails
    """

def configure_cluster(cluster, settings):
    """
    Configure cluster settings.
    
    Parameters:
    - cluster (Cluster): Cluster instance to configure
    - settings (dict): Configuration settings to apply
    
    Raises:
    ClusterError: If configuration fails
    """

Cluster Utilities

Utility functions for cluster maintenance and operations.

def backup_cluster(cluster, backup_path):
    """
    Create cluster backup using pg_basebackup.
    
    Parameters:
    - cluster (Cluster): Cluster to backup
    - backup_path (str): Destination path for backup
    
    Raises:
    ClusterError: If backup fails
    """

def restore_cluster(backup_path, data_directory):
    """
    Restore cluster from backup.
    
    Parameters:
    - backup_path (str): Path to backup files
    - data_directory (str): Target data directory
    
    Returns:
    Cluster: Restored cluster instance
    
    Raises:
    ClusterError: If restore fails
    """

def vacuum_cluster(cluster, database=None):
    """
    Perform vacuum maintenance on cluster.
    
    Parameters:
    - cluster (Cluster): Cluster to vacuum
    - database (str, optional): Specific database to vacuum (all if None)
    
    Raises:
    ClusterError: If vacuum fails
    """

Usage Examples

Basic Cluster Management

import postgresql.cluster as pg_cluster
import postgresql.installation as pg_install

# Find PostgreSQL installation
installation = pg_install.Installation()
print(f"PostgreSQL version: {installation.version()}")
print(f"Binary directory: {installation.bin_directory()}")

# Find existing clusters
clusters = installation.find_clusters()
print(f"Found {len(clusters)} existing clusters:")
for cluster_path in clusters:
    print(f"  {cluster_path}")

# Connect to existing cluster
if clusters:
    cluster = pg_cluster.Cluster(clusters[0])
    
    # Check cluster status
    status = cluster.status()
    print(f"Cluster status: {status}")
    
    # Start cluster if not running
    if status['state'] != 'running':
        print("Starting cluster...")
        cluster.start()
        print("Cluster started successfully")
    
    # Get cluster information
    print(f"Data directory: {cluster.data_directory}")
    print(f"Log file: {cluster.log_file}")
    
    # Reload configuration
    cluster.reload()
    print("Configuration reloaded")

Creating a New Cluster

import postgresql.cluster as pg_cluster
import os

# Define cluster parameters
data_dir = "/path/to/new/cluster"
cluster_options = {
    'encoding': 'UTF8',
    'locale': 'en_US.UTF-8',
    'auth_method': 'md5',
    'username': 'postgres',
    'password': 'secure_password'
}

try:
    # Create new cluster
    print(f"Creating cluster in {data_dir}...")
    cluster = pg_cluster.create_cluster(data_dir, **cluster_options)
    
    # Configure cluster settings
    settings = {
        'max_connections': 200,
        'shared_buffers': '256MB',
        'effective_cache_size': '1GB',
        'maintenance_work_mem': '64MB',
        'checkpoint_completion_target': 0.9,
        'wal_buffers': '16MB',
        'default_statistics_target': 100
    }
    
    pg_cluster.configure_cluster(cluster, settings)
    print("Cluster configured with performance settings")
    
    # Start the cluster
    cluster.start()
    print("Cluster started successfully")
    
    # Verify cluster is running
    status = cluster.status()
    print(f"Cluster PID: {status.get('pid')}")
    print(f"Uptime: {status.get('uptime')}")
    
except pg_cluster.ClusterError as e:
    print(f"Cluster creation failed: {e}")

Cluster Maintenance Operations

import postgresql.cluster as pg_cluster
import postgresql
from datetime import datetime

# Connect to cluster management
cluster = pg_cluster.Cluster("/path/to/cluster/data")

# Create backup
backup_dir = f"/backups/cluster_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

try:
    print("Creating cluster backup...")
    pg_cluster.backup_cluster(cluster, backup_dir)
    print(f"Backup created successfully: {backup_dir}")
    
    # Perform maintenance
    print("Performing cluster maintenance...")
    
    # Connect to cluster for database operations
    db = postgresql.open(f'pq://postgres@localhost/postgres')
    
    # Get database list
    databases = db.query("SELECT datname FROM pg_database WHERE datistemplate = false")
    
    for db_row in databases:
        db_name = db_row['datname']
        print(f"Vacuuming database: {db_name}")
        
        # Connect to specific database for vacuum
        db_conn = postgresql.open(f'pq://postgres@localhost/{db_name}')
        
        # Vacuum analyze
        db_conn.execute("VACUUM ANALYZE")
        
        # Get database statistics
        stats = db_conn.query("""
            SELECT 
                pg_size_pretty(pg_database_size(current_database())) as size,
                (SELECT count(*) FROM pg_stat_user_tables) as user_tables
        """)
        
        if stats:
            print(f"  Size: {stats[0]['size']}, Tables: {stats[0]['user_tables']}")
        
        db_conn.close()
    
    db.close()
    
    # Reload cluster configuration
    cluster.reload()
    print("Cluster maintenance completed")
    
except Exception as e:
    print(f"Maintenance failed: {e}")

Cluster Configuration Management

import postgresql.cluster as pg_cluster
import postgresql.configfile as pg_config

cluster = pg_cluster.Cluster("/path/to/cluster/data")

# Read current configuration
config_file = os.path.join(cluster.data_directory, "postgresql.conf")
config = pg_config.read_config(config_file)

print("Current configuration:")
for key, value in config.items():
    print(f"  {key} = {value}")

# Update configuration settings
new_settings = {
    'max_connections': 300,
    'shared_buffers': '512MB',
    'effective_cache_size': '2GB',
    'random_page_cost': 1.1,  # For SSD storage
    'seq_page_cost': 1.0,
    'log_min_duration_statement': 1000,  # Log slow queries
    'log_line_prefix': '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h ',
    'log_statement': 'mod',  # Log modifications
    'checkpoint_segments': 64,
    'checkpoint_completion_target': 0.9
}

try:
    # Apply new settings
    pg_cluster.configure_cluster(cluster, new_settings)
    
    # Reload configuration (some settings require restart)
    cluster.reload()
    print("Configuration updated and reloaded")
    
    # Check which settings require restart
    import postgresql
    db = postgresql.open('pq://postgres@localhost/postgres')
    
    restart_required = db.query("""
        SELECT name, setting, pending_restart 
        FROM pg_settings 
        WHERE pending_restart = true
    """)
    
    if restart_required:
        print("Settings requiring restart:")
        for setting in restart_required:
            print(f"  {setting['name']}: {setting['setting']}")
        
        response = input("Restart cluster now? (y/n): ")
        if response.lower() == 'y':
            cluster.restart()
            print("Cluster restarted successfully")
    
    db.close()
    
except Exception as e:
    print(f"Configuration update failed: {e}")

High Availability Cluster Setup

import postgresql.cluster as pg_cluster
import postgresql
import os
import shutil

def setup_primary_secondary_clusters():
    """Set up primary-secondary cluster configuration."""
    
    primary_data = "/path/to/primary/data"
    secondary_data = "/path/to/secondary/data"
    
    try:
        # Create primary cluster
        print("Creating primary cluster...")
        primary = pg_cluster.create_cluster(primary_data, {
            'encoding': 'UTF8',
            'username': 'postgres',
            'password': 'primary_password'
        })
        
        # Configure primary for replication
        primary_settings = {
            'wal_level': 'replica',
            'max_wal_senders': 3,
            'wal_keep_segments': 64,
            'synchronous_commit': 'on',
            'archive_mode': 'on',
            'archive_command': 'cp %p /path/to/archive/%f'
        }
        
        pg_cluster.configure_cluster(primary, primary_settings)
        
        # Start primary
        primary.start()
        print("Primary cluster started")
        
        # Create replication user
        db = postgresql.open('pq://postgres:primary_password@localhost/postgres')
        db.execute("""
            CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'repl_password'
        """)
        db.close()
        
        # Create base backup for secondary
        print("Creating base backup for secondary...")
        pg_cluster.backup_cluster(primary, secondary_data)
        
        # Configure secondary
        secondary = pg_cluster.Cluster(secondary_data)
        
        # Create recovery configuration for secondary
        recovery_conf = os.path.join(secondary_data, "recovery.conf")
        with open(recovery_conf, 'w') as f:
            f.write(f"""
standby_mode = 'on'
primary_conninfo = 'host=localhost port=5432 user=replicator password=repl_password'
trigger_file = '/tmp/postgresql.trigger'
""")
        
        # Start secondary in standby mode
        secondary.start()
        print("Secondary cluster started in standby mode")
        
        # Verify replication
        print("Verifying replication...")
        
        # Connect to primary and create test data
        primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
        primary_db.execute("CREATE TABLE IF NOT EXISTS repl_test (id serial, data text)")
        primary_db.execute("INSERT INTO repl_test (data) VALUES ('test data')")
        
        # Wait a moment for replication
        import time
        time.sleep(2)
        
        # Connect to secondary and verify data
        secondary_db = postgresql.open('pq://postgres:primary_password@localhost:5433/postgres')
        result = secondary_db.query("SELECT * FROM repl_test")
        
        if result:
            print(f"Replication verified: {len(result)} rows replicated")
        else:
            print("Replication verification failed")
        
        primary_db.close()
        secondary_db.close()
        
        return primary, secondary
        
    except Exception as e:
        print(f"High availability setup failed: {e}")
        return None, None

# Set up HA clusters
primary, secondary = setup_primary_secondary_clusters()

if primary and secondary:
    print("High availability cluster setup completed successfully")
    
    # Monitor replication lag
    def check_replication_lag():
        try:
            primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
            lag_query = primary_db.query("""
                SELECT 
                    client_addr,
                    application_name,
                    state,
                    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) as lag
                FROM pg_stat_replication
            """)
            
            print("Replication status:")
            for row in lag_query:
                print(f"  Client: {row['client_addr']}, App: {row['application_name']}")
                print(f"  State: {row['state']}, Lag: {row['lag']}")
            
            primary_db.close()
            
        except Exception as e:
            print(f"Replication monitoring failed: {e}")
    
    check_replication_lag()

Cluster Monitoring and Health Checks

import postgresql.cluster as pg_cluster
import postgresql
import psutil
import time

def monitor_cluster_health(cluster):
    """Monitor cluster health and performance metrics."""
    
    try:
        # Get cluster status
        status = cluster.status()
        print(f"Cluster Status: {status['state']}")
        
        if status['state'] != 'running':
            print("Cluster is not running!")
            return False
        
        # Connect to database for detailed monitoring
        db = postgresql.open('pq://postgres@localhost/postgres')
        
        # Database connection statistics
        conn_stats = db.query("""
            SELECT 
                count(*) as total_connections,
                count(*) FILTER (WHERE state = 'active') as active_connections,
                count(*) FILTER (WHERE state = 'idle') as idle_connections
            FROM pg_stat_activity
        """)
        
        if conn_stats:
            stats = conn_stats[0]
            print(f"Connections - Total: {stats['total_connections']}, "
                  f"Active: {stats['active_connections']}, Idle: {stats['idle_connections']}")
        
        # Database size information
        db_sizes = db.query("""
            SELECT 
                datname,
                pg_size_pretty(pg_database_size(datname)) as size
            FROM pg_database 
            WHERE datistemplate = false
            ORDER BY pg_database_size(datname) DESC
        """)
        
        print("Database sizes:")
        for db_info in db_sizes:
            print(f"  {db_info['datname']}: {db_info['size']}")
        
        # Long running queries
        long_queries = db.query("""
            SELECT 
                pid,
                usename,
                datname,
                query_start,
                state,
                substring(query, 1, 100) as query_snippet
            FROM pg_stat_activity 
            WHERE state = 'active' 
            AND query_start < now() - interval '5 minutes'
            ORDER BY query_start
        """)
        
        if long_queries:
            print("Long running queries (>5 minutes):")
            for query in long_queries:
                print(f"  PID {query['pid']}: {query['usename']}@{query['datname']}")
                print(f"    Started: {query['query_start']}")
                print(f"    Query: {query['query_snippet']}...")
        
        # Checkpoint and WAL statistics
        checkpoint_stats = db.query("""
            SELECT 
                checkpoints_timed,
                checkpoints_req,
                checkpoint_write_time,
                checkpoint_sync_time
            FROM pg_stat_bgwriter
        """)
        
        if checkpoint_stats:
            stats = checkpoint_stats[0]
            print(f"Checkpoints - Timed: {stats['checkpoints_timed']}, "
                  f"Requested: {stats['checkpoints_req']}")
            print(f"Checkpoint times - Write: {stats['checkpoint_write_time']}ms, "
                  f"Sync: {stats['checkpoint_sync_time']}ms")
        
        # System resource usage
        if status.get('pid'):
            try:
                process = psutil.Process(status['pid'])
                cpu_percent = process.cpu_percent()
                memory_info = process.memory_info()
                
                print(f"System resources - CPU: {cpu_percent}%, "
                      f"Memory: {memory_info.rss / 1024 / 1024:.1f}MB")
            except psutil.NoSuchProcess:
                print("Could not get system resource information")
        
        db.close()
        return True
        
    except Exception as e:
        print(f"Health check failed: {e}")
        return False

# Monitor cluster periodically
cluster = pg_cluster.Cluster("/path/to/cluster/data")

print("Starting cluster health monitoring...")
for i in range(5):  # Monitor for 5 cycles
    print(f"\n--- Health Check {i+1} ---")
    health_ok = monitor_cluster_health(cluster)
    
    if not health_ok:
        print("Health check failed, attempting cluster restart...")
        try:
            cluster.restart()
            print("Cluster restarted successfully")
        except Exception as e:
            print(f"Cluster restart failed: {e}")
            break
    
    time.sleep(60)  # Wait 1 minute between checks

print("Health monitoring completed")

Install with Tessl CLI

npx tessl i tessl/pypi-py-postgresql

docs

advanced-features.md

cluster-management.md

connection-management.md

dbapi-interface.md

exception-handling.md

index.md

query-execution.md

transaction-management.md

type-system.md

tile.json