PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.
—
PostgreSQL cluster and installation management tools for controlling local PostgreSQL instances, configuration, and maintenance operations.
Interface for managing PostgreSQL cluster instances including startup, shutdown, and configuration.
class Cluster:
"""
PostgreSQL cluster management interface for local cluster control.
"""
def start():
"""
Start the PostgreSQL cluster.
Raises:
ClusterError: If cluster cannot be started
"""
def stop():
"""
Stop the PostgreSQL cluster gracefully.
Raises:
ClusterError: If cluster cannot be stopped
"""
def restart():
"""
Restart the PostgreSQL cluster.
Raises:
ClusterError: If cluster cannot be restarted
"""
def initialize():
"""
Initialize a new PostgreSQL cluster (initdb).
Raises:
ClusterError: If cluster initialization fails
"""
def status():
"""
Get cluster status information.
Returns:
dict: Cluster status including state, PID, uptime
"""
def reload():
"""
Reload cluster configuration without restart.
Raises:
ClusterError: If configuration reload fails
"""
@property
def data_directory():
"""
Get cluster data directory path.
Returns:
str: Path to PostgreSQL data directory
"""
@property
def configuration():
"""
Get cluster configuration settings.
Returns:
dict: Configuration parameters and values
"""
@property
def log_file():
"""
Get path to cluster log file.
Returns:
str: Path to PostgreSQL log file
"""Interface for PostgreSQL installation detection and information retrieval.
class Installation:
"""
PostgreSQL installation information and utilities.
"""
def version():
"""
Get PostgreSQL version information.
Returns:
dict: Version details including major, minor, patch versions
"""
def bin_directory():
"""
Get PostgreSQL binary directory path.
Returns:
str: Path to PostgreSQL binaries (pg_ctl, psql, etc.)
"""
def lib_directory():
"""
Get PostgreSQL library directory path.
Returns:
str: Path to PostgreSQL libraries
"""
def share_directory():
"""
Get PostgreSQL shared data directory path.
Returns:
str: Path to PostgreSQL shared files
"""
def config_directory():
"""
Get default configuration directory path.
Returns:
str: Path to default configuration files
"""
def find_clusters():
"""
Find existing PostgreSQL clusters on the system.
Returns:
list: List of cluster data directories found
"""Functions for creating and configuring new PostgreSQL clusters.
def create_cluster(data_directory, **options):
"""
Create a new PostgreSQL cluster.
Parameters:
- data_directory (str): Path for cluster data directory
- **options: Initialization options (encoding, locale, auth_method, etc.)
Returns:
Cluster: New cluster instance
Raises:
ClusterError: If cluster creation fails
"""
def configure_cluster(cluster, settings):
"""
Configure cluster settings.
Parameters:
- cluster (Cluster): Cluster instance to configure
- settings (dict): Configuration settings to apply
Raises:
ClusterError: If configuration fails
"""Utility functions for cluster maintenance and operations.
def backup_cluster(cluster, backup_path):
"""
Create cluster backup using pg_basebackup.
Parameters:
- cluster (Cluster): Cluster to backup
- backup_path (str): Destination path for backup
Raises:
ClusterError: If backup fails
"""
def restore_cluster(backup_path, data_directory):
"""
Restore cluster from backup.
Parameters:
- backup_path (str): Path to backup files
- data_directory (str): Target data directory
Returns:
Cluster: Restored cluster instance
Raises:
ClusterError: If restore fails
"""
def vacuum_cluster(cluster, database=None):
"""
Perform vacuum maintenance on cluster.
Parameters:
- cluster (Cluster): Cluster to vacuum
- database (str, optional): Specific database to vacuum (all if None)
Raises:
ClusterError: If vacuum fails
"""import postgresql.cluster as pg_cluster
import postgresql.installation as pg_install
# Find PostgreSQL installation
installation = pg_install.Installation()
print(f"PostgreSQL version: {installation.version()}")
print(f"Binary directory: {installation.bin_directory()}")
# Find existing clusters
clusters = installation.find_clusters()
print(f"Found {len(clusters)} existing clusters:")
for cluster_path in clusters:
print(f" {cluster_path}")
# Connect to existing cluster
if clusters:
cluster = pg_cluster.Cluster(clusters[0])
# Check cluster status
status = cluster.status()
print(f"Cluster status: {status}")
# Start cluster if not running
if status['state'] != 'running':
print("Starting cluster...")
cluster.start()
print("Cluster started successfully")
# Get cluster information
print(f"Data directory: {cluster.data_directory}")
print(f"Log file: {cluster.log_file}")
# Reload configuration
cluster.reload()
print("Configuration reloaded")import postgresql.cluster as pg_cluster
import os
# Define cluster parameters
data_dir = "/path/to/new/cluster"
cluster_options = {
'encoding': 'UTF8',
'locale': 'en_US.UTF-8',
'auth_method': 'md5',
'username': 'postgres',
'password': 'secure_password'
}
try:
# Create new cluster
print(f"Creating cluster in {data_dir}...")
cluster = pg_cluster.create_cluster(data_dir, **cluster_options)
# Configure cluster settings
settings = {
'max_connections': 200,
'shared_buffers': '256MB',
'effective_cache_size': '1GB',
'maintenance_work_mem': '64MB',
'checkpoint_completion_target': 0.9,
'wal_buffers': '16MB',
'default_statistics_target': 100
}
pg_cluster.configure_cluster(cluster, settings)
print("Cluster configured with performance settings")
# Start the cluster
cluster.start()
print("Cluster started successfully")
# Verify cluster is running
status = cluster.status()
print(f"Cluster PID: {status.get('pid')}")
print(f"Uptime: {status.get('uptime')}")
except pg_cluster.ClusterError as e:
print(f"Cluster creation failed: {e}")import postgresql.cluster as pg_cluster
import postgresql
from datetime import datetime
# Connect to cluster management
cluster = pg_cluster.Cluster("/path/to/cluster/data")
# Create backup
backup_dir = f"/backups/cluster_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
print("Creating cluster backup...")
pg_cluster.backup_cluster(cluster, backup_dir)
print(f"Backup created successfully: {backup_dir}")
# Perform maintenance
print("Performing cluster maintenance...")
# Connect to cluster for database operations
db = postgresql.open(f'pq://postgres@localhost/postgres')
# Get database list
databases = db.query("SELECT datname FROM pg_database WHERE datistemplate = false")
for db_row in databases:
db_name = db_row['datname']
print(f"Vacuuming database: {db_name}")
# Connect to specific database for vacuum
db_conn = postgresql.open(f'pq://postgres@localhost/{db_name}')
# Vacuum analyze
db_conn.execute("VACUUM ANALYZE")
# Get database statistics
stats = db_conn.query("""
SELECT
pg_size_pretty(pg_database_size(current_database())) as size,
(SELECT count(*) FROM pg_stat_user_tables) as user_tables
""")
if stats:
print(f" Size: {stats[0]['size']}, Tables: {stats[0]['user_tables']}")
db_conn.close()
db.close()
# Reload cluster configuration
cluster.reload()
print("Cluster maintenance completed")
except Exception as e:
print(f"Maintenance failed: {e}")import postgresql.cluster as pg_cluster
import postgresql.configfile as pg_config
cluster = pg_cluster.Cluster("/path/to/cluster/data")
# Read current configuration
config_file = os.path.join(cluster.data_directory, "postgresql.conf")
config = pg_config.read_config(config_file)
print("Current configuration:")
for key, value in config.items():
print(f" {key} = {value}")
# Update configuration settings
new_settings = {
'max_connections': 300,
'shared_buffers': '512MB',
'effective_cache_size': '2GB',
'random_page_cost': 1.1, # For SSD storage
'seq_page_cost': 1.0,
'log_min_duration_statement': 1000, # Log slow queries
'log_line_prefix': '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h ',
'log_statement': 'mod', # Log modifications
'checkpoint_segments': 64,
'checkpoint_completion_target': 0.9
}
try:
# Apply new settings
pg_cluster.configure_cluster(cluster, new_settings)
# Reload configuration (some settings require restart)
cluster.reload()
print("Configuration updated and reloaded")
# Check which settings require restart
import postgresql
db = postgresql.open('pq://postgres@localhost/postgres')
restart_required = db.query("""
SELECT name, setting, pending_restart
FROM pg_settings
WHERE pending_restart = true
""")
if restart_required:
print("Settings requiring restart:")
for setting in restart_required:
print(f" {setting['name']}: {setting['setting']}")
response = input("Restart cluster now? (y/n): ")
if response.lower() == 'y':
cluster.restart()
print("Cluster restarted successfully")
db.close()
except Exception as e:
print(f"Configuration update failed: {e}")import postgresql.cluster as pg_cluster
import postgresql
import os
import shutil
def setup_primary_secondary_clusters():
"""Set up primary-secondary cluster configuration."""
primary_data = "/path/to/primary/data"
secondary_data = "/path/to/secondary/data"
try:
# Create primary cluster
print("Creating primary cluster...")
primary = pg_cluster.create_cluster(primary_data, {
'encoding': 'UTF8',
'username': 'postgres',
'password': 'primary_password'
})
# Configure primary for replication
primary_settings = {
'wal_level': 'replica',
'max_wal_senders': 3,
'wal_keep_segments': 64,
'synchronous_commit': 'on',
'archive_mode': 'on',
'archive_command': 'cp %p /path/to/archive/%f'
}
pg_cluster.configure_cluster(primary, primary_settings)
# Start primary
primary.start()
print("Primary cluster started")
# Create replication user
db = postgresql.open('pq://postgres:primary_password@localhost/postgres')
db.execute("""
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'repl_password'
""")
db.close()
# Create base backup for secondary
print("Creating base backup for secondary...")
pg_cluster.backup_cluster(primary, secondary_data)
# Configure secondary
secondary = pg_cluster.Cluster(secondary_data)
# Create recovery configuration for secondary
recovery_conf = os.path.join(secondary_data, "recovery.conf")
with open(recovery_conf, 'w') as f:
f.write(f"""
standby_mode = 'on'
primary_conninfo = 'host=localhost port=5432 user=replicator password=repl_password'
trigger_file = '/tmp/postgresql.trigger'
""")
# Start secondary in standby mode
secondary.start()
print("Secondary cluster started in standby mode")
# Verify replication
print("Verifying replication...")
# Connect to primary and create test data
primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
primary_db.execute("CREATE TABLE IF NOT EXISTS repl_test (id serial, data text)")
primary_db.execute("INSERT INTO repl_test (data) VALUES ('test data')")
# Wait a moment for replication
import time
time.sleep(2)
# Connect to secondary and verify data
secondary_db = postgresql.open('pq://postgres:primary_password@localhost:5433/postgres')
result = secondary_db.query("SELECT * FROM repl_test")
if result:
print(f"Replication verified: {len(result)} rows replicated")
else:
print("Replication verification failed")
primary_db.close()
secondary_db.close()
return primary, secondary
except Exception as e:
print(f"High availability setup failed: {e}")
return None, None
# Set up HA clusters
primary, secondary = setup_primary_secondary_clusters()
if primary and secondary:
print("High availability cluster setup completed successfully")
# Monitor replication lag
def check_replication_lag():
try:
primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
lag_query = primary_db.query("""
SELECT
client_addr,
application_name,
state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) as lag
FROM pg_stat_replication
""")
print("Replication status:")
for row in lag_query:
print(f" Client: {row['client_addr']}, App: {row['application_name']}")
print(f" State: {row['state']}, Lag: {row['lag']}")
primary_db.close()
except Exception as e:
print(f"Replication monitoring failed: {e}")
check_replication_lag()import postgresql.cluster as pg_cluster
import postgresql
import psutil
import time
def monitor_cluster_health(cluster):
"""Monitor cluster health and performance metrics."""
try:
# Get cluster status
status = cluster.status()
print(f"Cluster Status: {status['state']}")
if status['state'] != 'running':
print("Cluster is not running!")
return False
# Connect to database for detailed monitoring
db = postgresql.open('pq://postgres@localhost/postgres')
# Database connection statistics
conn_stats = db.query("""
SELECT
count(*) as total_connections,
count(*) FILTER (WHERE state = 'active') as active_connections,
count(*) FILTER (WHERE state = 'idle') as idle_connections
FROM pg_stat_activity
""")
if conn_stats:
stats = conn_stats[0]
print(f"Connections - Total: {stats['total_connections']}, "
f"Active: {stats['active_connections']}, Idle: {stats['idle_connections']}")
# Database size information
db_sizes = db.query("""
SELECT
datname,
pg_size_pretty(pg_database_size(datname)) as size
FROM pg_database
WHERE datistemplate = false
ORDER BY pg_database_size(datname) DESC
""")
print("Database sizes:")
for db_info in db_sizes:
print(f" {db_info['datname']}: {db_info['size']}")
# Long running queries
long_queries = db.query("""
SELECT
pid,
usename,
datname,
query_start,
state,
substring(query, 1, 100) as query_snippet
FROM pg_stat_activity
WHERE state = 'active'
AND query_start < now() - interval '5 minutes'
ORDER BY query_start
""")
if long_queries:
print("Long running queries (>5 minutes):")
for query in long_queries:
print(f" PID {query['pid']}: {query['usename']}@{query['datname']}")
print(f" Started: {query['query_start']}")
print(f" Query: {query['query_snippet']}...")
# Checkpoint and WAL statistics
checkpoint_stats = db.query("""
SELECT
checkpoints_timed,
checkpoints_req,
checkpoint_write_time,
checkpoint_sync_time
FROM pg_stat_bgwriter
""")
if checkpoint_stats:
stats = checkpoint_stats[0]
print(f"Checkpoints - Timed: {stats['checkpoints_timed']}, "
f"Requested: {stats['checkpoints_req']}")
print(f"Checkpoint times - Write: {stats['checkpoint_write_time']}ms, "
f"Sync: {stats['checkpoint_sync_time']}ms")
# System resource usage
if status.get('pid'):
try:
process = psutil.Process(status['pid'])
cpu_percent = process.cpu_percent()
memory_info = process.memory_info()
print(f"System resources - CPU: {cpu_percent}%, "
f"Memory: {memory_info.rss / 1024 / 1024:.1f}MB")
except psutil.NoSuchProcess:
print("Could not get system resource information")
db.close()
return True
except Exception as e:
print(f"Health check failed: {e}")
return False
# Monitor cluster periodically
cluster = pg_cluster.Cluster("/path/to/cluster/data")
print("Starting cluster health monitoring...")
for i in range(5): # Monitor for 5 cycles
print(f"\n--- Health Check {i+1} ---")
health_ok = monitor_cluster_health(cluster)
if not health_ok:
print("Health check failed, attempting cluster restart...")
try:
cluster.restart()
print("Cluster restarted successfully")
except Exception as e:
print(f"Cluster restart failed: {e}")
break
time.sleep(60) # Wait 1 minute between checks
print("Health monitoring completed")Install with Tessl CLI
npx tessl i tessl/pypi-py-postgresql