0
# Cluster Management
1
2
PostgreSQL cluster and installation management tools for controlling local PostgreSQL instances, configuration, and maintenance operations.
3
4
## Capabilities
5
6
### Cluster Interface
7
8
Interface for managing PostgreSQL cluster instances including startup, shutdown, and configuration.
9
10
```python { .api }
11
class Cluster:
12
"""
13
PostgreSQL cluster management interface for local cluster control.
14
"""
15
16
def start():
17
"""
18
Start the PostgreSQL cluster.
19
20
Raises:
21
ClusterError: If cluster cannot be started
22
"""
23
24
def stop():
25
"""
26
Stop the PostgreSQL cluster gracefully.
27
28
Raises:
29
ClusterError: If cluster cannot be stopped
30
"""
31
32
def restart():
33
"""
34
Restart the PostgreSQL cluster.
35
36
Raises:
37
ClusterError: If cluster cannot be restarted
38
"""
39
40
def initialize():
41
"""
42
Initialize a new PostgreSQL cluster (initdb).
43
44
Raises:
45
ClusterError: If cluster initialization fails
46
"""
47
48
def status():
49
"""
50
Get cluster status information.
51
52
Returns:
53
dict: Cluster status including state, PID, uptime
54
"""
55
56
def reload():
57
"""
58
Reload cluster configuration without restart.
59
60
Raises:
61
ClusterError: If configuration reload fails
62
"""
63
64
@property
65
def data_directory():
66
"""
67
Get cluster data directory path.
68
69
Returns:
70
str: Path to PostgreSQL data directory
71
"""
72
73
@property
74
def configuration():
75
"""
76
Get cluster configuration settings.
77
78
Returns:
79
dict: Configuration parameters and values
80
"""
81
82
@property
83
def log_file():
84
"""
85
Get path to cluster log file.
86
87
Returns:
88
str: Path to PostgreSQL log file
89
"""
90
```
91
92
### Installation Interface
93
94
Interface for PostgreSQL installation detection and information retrieval.
95
96
```python { .api }
97
class Installation:
98
"""
99
PostgreSQL installation information and utilities.
100
"""
101
102
def version():
103
"""
104
Get PostgreSQL version information.
105
106
Returns:
107
dict: Version details including major, minor, patch versions
108
"""
109
110
def bin_directory():
111
"""
112
Get PostgreSQL binary directory path.
113
114
Returns:
115
str: Path to PostgreSQL binaries (pg_ctl, psql, etc.)
116
"""
117
118
def lib_directory():
119
"""
120
Get PostgreSQL library directory path.
121
122
Returns:
123
str: Path to PostgreSQL libraries
124
"""
125
126
def share_directory():
127
"""
128
Get PostgreSQL shared data directory path.
129
130
Returns:
131
str: Path to PostgreSQL shared files
132
"""
133
134
def config_directory():
135
"""
136
Get default configuration directory path.
137
138
Returns:
139
str: Path to default configuration files
140
"""
141
142
def find_clusters():
143
"""
144
Find existing PostgreSQL clusters on the system.
145
146
Returns:
147
list: List of cluster data directories found
148
"""
149
```
150
151
### Cluster Creation and Configuration
152
153
Functions for creating and configuring new PostgreSQL clusters.
154
155
```python { .api }
156
def create_cluster(data_directory, **options):
157
"""
158
Create a new PostgreSQL cluster.
159
160
Parameters:
161
- data_directory (str): Path for cluster data directory
162
- **options: Initialization options (encoding, locale, auth_method, etc.)
163
164
Returns:
165
Cluster: New cluster instance
166
167
Raises:
168
ClusterError: If cluster creation fails
169
"""
170
171
def configure_cluster(cluster, settings):
172
"""
173
Configure cluster settings.
174
175
Parameters:
176
- cluster (Cluster): Cluster instance to configure
177
- settings (dict): Configuration settings to apply
178
179
Raises:
180
ClusterError: If configuration fails
181
"""
182
```
183
184
### Cluster Utilities
185
186
Utility functions for cluster maintenance and operations.
187
188
```python { .api }
189
def backup_cluster(cluster, backup_path):
190
"""
191
Create cluster backup using pg_basebackup.
192
193
Parameters:
194
- cluster (Cluster): Cluster to backup
195
- backup_path (str): Destination path for backup
196
197
Raises:
198
ClusterError: If backup fails
199
"""
200
201
def restore_cluster(backup_path, data_directory):
202
"""
203
Restore cluster from backup.
204
205
Parameters:
206
- backup_path (str): Path to backup files
207
- data_directory (str): Target data directory
208
209
Returns:
210
Cluster: Restored cluster instance
211
212
Raises:
213
ClusterError: If restore fails
214
"""
215
216
def vacuum_cluster(cluster, database=None):
217
"""
218
Perform vacuum maintenance on cluster.
219
220
Parameters:
221
- cluster (Cluster): Cluster to vacuum
222
- database (str, optional): Specific database to vacuum (all if None)
223
224
Raises:
225
ClusterError: If vacuum fails
226
"""
227
```
228
229
## Usage Examples
230
231
### Basic Cluster Management
232
233
```python
234
import postgresql.cluster as pg_cluster
235
import postgresql.installation as pg_install
236
237
# Find PostgreSQL installation
238
installation = pg_install.Installation()
239
print(f"PostgreSQL version: {installation.version()}")
240
print(f"Binary directory: {installation.bin_directory()}")
241
242
# Find existing clusters
243
clusters = installation.find_clusters()
244
print(f"Found {len(clusters)} existing clusters:")
245
for cluster_path in clusters:
246
print(f" {cluster_path}")
247
248
# Connect to existing cluster
249
if clusters:
250
cluster = pg_cluster.Cluster(clusters[0])
251
252
# Check cluster status
253
status = cluster.status()
254
print(f"Cluster status: {status}")
255
256
# Start cluster if not running
257
if status['state'] != 'running':
258
print("Starting cluster...")
259
cluster.start()
260
print("Cluster started successfully")
261
262
# Get cluster information
263
print(f"Data directory: {cluster.data_directory}")
264
print(f"Log file: {cluster.log_file}")
265
266
# Reload configuration
267
cluster.reload()
268
print("Configuration reloaded")
269
```
270
271
### Creating a New Cluster
272
273
```python
274
import postgresql.cluster as pg_cluster
275
import os
276
277
# Define cluster parameters
278
data_dir = "/path/to/new/cluster"
279
cluster_options = {
280
'encoding': 'UTF8',
281
'locale': 'en_US.UTF-8',
282
'auth_method': 'md5',
283
'username': 'postgres',
284
'password': 'secure_password'
285
}
286
287
try:
288
# Create new cluster
289
print(f"Creating cluster in {data_dir}...")
290
cluster = pg_cluster.create_cluster(data_dir, **cluster_options)
291
292
# Configure cluster settings
293
settings = {
294
'max_connections': 200,
295
'shared_buffers': '256MB',
296
'effective_cache_size': '1GB',
297
'maintenance_work_mem': '64MB',
298
'checkpoint_completion_target': 0.9,
299
'wal_buffers': '16MB',
300
'default_statistics_target': 100
301
}
302
303
pg_cluster.configure_cluster(cluster, settings)
304
print("Cluster configured with performance settings")
305
306
# Start the cluster
307
cluster.start()
308
print("Cluster started successfully")
309
310
# Verify cluster is running
311
status = cluster.status()
312
print(f"Cluster PID: {status.get('pid')}")
313
print(f"Uptime: {status.get('uptime')}")
314
315
except pg_cluster.ClusterError as e:
316
print(f"Cluster creation failed: {e}")
317
```
318
319
### Cluster Maintenance Operations
320
321
```python
322
import postgresql.cluster as pg_cluster
323
import postgresql
324
from datetime import datetime
325
326
# Connect to cluster management
327
cluster = pg_cluster.Cluster("/path/to/cluster/data")
328
329
# Create backup
330
backup_dir = f"/backups/cluster_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
331
332
try:
333
print("Creating cluster backup...")
334
pg_cluster.backup_cluster(cluster, backup_dir)
335
print(f"Backup created successfully: {backup_dir}")
336
337
# Perform maintenance
338
print("Performing cluster maintenance...")
339
340
# Connect to cluster for database operations
341
db = postgresql.open(f'pq://postgres@localhost/postgres')
342
343
# Get database list
344
databases = db.query("SELECT datname FROM pg_database WHERE datistemplate = false")
345
346
for db_row in databases:
347
db_name = db_row['datname']
348
print(f"Vacuuming database: {db_name}")
349
350
# Connect to specific database for vacuum
351
db_conn = postgresql.open(f'pq://postgres@localhost/{db_name}')
352
353
# Vacuum analyze
354
db_conn.execute("VACUUM ANALYZE")
355
356
# Get database statistics
357
stats = db_conn.query("""
358
SELECT
359
pg_size_pretty(pg_database_size(current_database())) as size,
360
(SELECT count(*) FROM pg_stat_user_tables) as user_tables
361
""")
362
363
if stats:
364
print(f" Size: {stats[0]['size']}, Tables: {stats[0]['user_tables']}")
365
366
db_conn.close()
367
368
db.close()
369
370
# Reload cluster configuration
371
cluster.reload()
372
print("Cluster maintenance completed")
373
374
except Exception as e:
375
print(f"Maintenance failed: {e}")
376
```
377
378
### Cluster Configuration Management
379
380
```python
381
import postgresql.cluster as pg_cluster
382
import postgresql.configfile as pg_config
383
384
cluster = pg_cluster.Cluster("/path/to/cluster/data")
385
386
# Read current configuration
387
config_file = os.path.join(cluster.data_directory, "postgresql.conf")
388
config = pg_config.read_config(config_file)
389
390
print("Current configuration:")
391
for key, value in config.items():
392
print(f" {key} = {value}")
393
394
# Update configuration settings
395
new_settings = {
396
'max_connections': 300,
397
'shared_buffers': '512MB',
398
'effective_cache_size': '2GB',
399
'random_page_cost': 1.1, # For SSD storage
400
'seq_page_cost': 1.0,
401
'log_min_duration_statement': 1000, # Log slow queries
402
'log_line_prefix': '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h ',
403
'log_statement': 'mod', # Log modifications
404
'checkpoint_segments': 64,
405
'checkpoint_completion_target': 0.9
406
}
407
408
try:
409
# Apply new settings
410
pg_cluster.configure_cluster(cluster, new_settings)
411
412
# Reload configuration (some settings require restart)
413
cluster.reload()
414
print("Configuration updated and reloaded")
415
416
# Check which settings require restart
417
import postgresql
418
db = postgresql.open('pq://postgres@localhost/postgres')
419
420
restart_required = db.query("""
421
SELECT name, setting, pending_restart
422
FROM pg_settings
423
WHERE pending_restart = true
424
""")
425
426
if restart_required:
427
print("Settings requiring restart:")
428
for setting in restart_required:
429
print(f" {setting['name']}: {setting['setting']}")
430
431
response = input("Restart cluster now? (y/n): ")
432
if response.lower() == 'y':
433
cluster.restart()
434
print("Cluster restarted successfully")
435
436
db.close()
437
438
except Exception as e:
439
print(f"Configuration update failed: {e}")
440
```
441
442
### High Availability Cluster Setup
443
444
```python
445
import postgresql.cluster as pg_cluster
446
import postgresql
447
import os
448
import shutil
449
450
def setup_primary_secondary_clusters():
451
"""Set up primary-secondary cluster configuration."""
452
453
primary_data = "/path/to/primary/data"
454
secondary_data = "/path/to/secondary/data"
455
456
try:
457
# Create primary cluster
458
print("Creating primary cluster...")
459
primary = pg_cluster.create_cluster(primary_data, {
460
'encoding': 'UTF8',
461
'username': 'postgres',
462
'password': 'primary_password'
463
})
464
465
# Configure primary for replication
466
primary_settings = {
467
'wal_level': 'replica',
468
'max_wal_senders': 3,
469
'wal_keep_segments': 64,
470
'synchronous_commit': 'on',
471
'archive_mode': 'on',
472
'archive_command': 'cp %p /path/to/archive/%f'
473
}
474
475
pg_cluster.configure_cluster(primary, primary_settings)
476
477
# Start primary
478
primary.start()
479
print("Primary cluster started")
480
481
# Create replication user
482
db = postgresql.open('pq://postgres:primary_password@localhost/postgres')
483
db.execute("""
484
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'repl_password'
485
""")
486
db.close()
487
488
# Create base backup for secondary
489
print("Creating base backup for secondary...")
490
pg_cluster.backup_cluster(primary, secondary_data)
491
492
# Configure secondary
493
secondary = pg_cluster.Cluster(secondary_data)
494
495
# Create recovery configuration for secondary
496
recovery_conf = os.path.join(secondary_data, "recovery.conf")
497
with open(recovery_conf, 'w') as f:
498
f.write(f"""
499
standby_mode = 'on'
500
primary_conninfo = 'host=localhost port=5432 user=replicator password=repl_password'
501
trigger_file = '/tmp/postgresql.trigger'
502
""")
503
504
# Start secondary in standby mode
505
secondary.start()
506
print("Secondary cluster started in standby mode")
507
508
# Verify replication
509
print("Verifying replication...")
510
511
# Connect to primary and create test data
512
primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
513
primary_db.execute("CREATE TABLE IF NOT EXISTS repl_test (id serial, data text)")
514
primary_db.execute("INSERT INTO repl_test (data) VALUES ('test data')")
515
516
# Wait a moment for replication
517
import time
518
time.sleep(2)
519
520
# Connect to secondary and verify data
521
secondary_db = postgresql.open('pq://postgres:primary_password@localhost:5433/postgres')
522
result = secondary_db.query("SELECT * FROM repl_test")
523
524
if result:
525
print(f"Replication verified: {len(result)} rows replicated")
526
else:
527
print("Replication verification failed")
528
529
primary_db.close()
530
secondary_db.close()
531
532
return primary, secondary
533
534
except Exception as e:
535
print(f"High availability setup failed: {e}")
536
return None, None
537
538
# Set up HA clusters
539
primary, secondary = setup_primary_secondary_clusters()
540
541
if primary and secondary:
542
print("High availability cluster setup completed successfully")
543
544
# Monitor replication lag
545
def check_replication_lag():
546
try:
547
primary_db = postgresql.open('pq://postgres:primary_password@localhost:5432/postgres')
548
lag_query = primary_db.query("""
549
SELECT
550
client_addr,
551
application_name,
552
state,
553
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) as lag
554
FROM pg_stat_replication
555
""")
556
557
print("Replication status:")
558
for row in lag_query:
559
print(f" Client: {row['client_addr']}, App: {row['application_name']}")
560
print(f" State: {row['state']}, Lag: {row['lag']}")
561
562
primary_db.close()
563
564
except Exception as e:
565
print(f"Replication monitoring failed: {e}")
566
567
check_replication_lag()
568
```
569
570
### Cluster Monitoring and Health Checks
571
572
```python
573
import postgresql.cluster as pg_cluster
574
import postgresql
575
import psutil
576
import time
577
578
def monitor_cluster_health(cluster):
579
"""Monitor cluster health and performance metrics."""
580
581
try:
582
# Get cluster status
583
status = cluster.status()
584
print(f"Cluster Status: {status['state']}")
585
586
if status['state'] != 'running':
587
print("Cluster is not running!")
588
return False
589
590
# Connect to database for detailed monitoring
591
db = postgresql.open('pq://postgres@localhost/postgres')
592
593
# Database connection statistics
594
conn_stats = db.query("""
595
SELECT
596
count(*) as total_connections,
597
count(*) FILTER (WHERE state = 'active') as active_connections,
598
count(*) FILTER (WHERE state = 'idle') as idle_connections
599
FROM pg_stat_activity
600
""")
601
602
if conn_stats:
603
stats = conn_stats[0]
604
print(f"Connections - Total: {stats['total_connections']}, "
605
f"Active: {stats['active_connections']}, Idle: {stats['idle_connections']}")
606
607
# Database size information
608
db_sizes = db.query("""
609
SELECT
610
datname,
611
pg_size_pretty(pg_database_size(datname)) as size
612
FROM pg_database
613
WHERE datistemplate = false
614
ORDER BY pg_database_size(datname) DESC
615
""")
616
617
print("Database sizes:")
618
for db_info in db_sizes:
619
print(f" {db_info['datname']}: {db_info['size']}")
620
621
# Long running queries
622
long_queries = db.query("""
623
SELECT
624
pid,
625
usename,
626
datname,
627
query_start,
628
state,
629
substring(query, 1, 100) as query_snippet
630
FROM pg_stat_activity
631
WHERE state = 'active'
632
AND query_start < now() - interval '5 minutes'
633
ORDER BY query_start
634
""")
635
636
if long_queries:
637
print("Long running queries (>5 minutes):")
638
for query in long_queries:
639
print(f" PID {query['pid']}: {query['usename']}@{query['datname']}")
640
print(f" Started: {query['query_start']}")
641
print(f" Query: {query['query_snippet']}...")
642
643
# Checkpoint and WAL statistics
644
checkpoint_stats = db.query("""
645
SELECT
646
checkpoints_timed,
647
checkpoints_req,
648
checkpoint_write_time,
649
checkpoint_sync_time
650
FROM pg_stat_bgwriter
651
""")
652
653
if checkpoint_stats:
654
stats = checkpoint_stats[0]
655
print(f"Checkpoints - Timed: {stats['checkpoints_timed']}, "
656
f"Requested: {stats['checkpoints_req']}")
657
print(f"Checkpoint times - Write: {stats['checkpoint_write_time']}ms, "
658
f"Sync: {stats['checkpoint_sync_time']}ms")
659
660
# System resource usage
661
if status.get('pid'):
662
try:
663
process = psutil.Process(status['pid'])
664
cpu_percent = process.cpu_percent()
665
memory_info = process.memory_info()
666
667
print(f"System resources - CPU: {cpu_percent}%, "
668
f"Memory: {memory_info.rss / 1024 / 1024:.1f}MB")
669
except psutil.NoSuchProcess:
670
print("Could not get system resource information")
671
672
db.close()
673
return True
674
675
except Exception as e:
676
print(f"Health check failed: {e}")
677
return False
678
679
# Monitor cluster periodically
680
cluster = pg_cluster.Cluster("/path/to/cluster/data")
681
682
print("Starting cluster health monitoring...")
683
for i in range(5): # Monitor for 5 cycles
684
print(f"\n--- Health Check {i+1} ---")
685
health_ok = monitor_cluster_health(cluster)
686
687
if not health_ok:
688
print("Health check failed, attempting cluster restart...")
689
try:
690
cluster.restart()
691
print("Cluster restarted successfully")
692
except Exception as e:
693
print(f"Cluster restart failed: {e}")
694
break
695
696
time.sleep(60) # Wait 1 minute between checks
697
698
print("Health monitoring completed")
699
```