0
# Advanced Features
1
2
Advanced PostgreSQL features including COPY operations, LISTEN/NOTIFY, advisory locks, and streaming results for high-performance database applications.
3
4
## Capabilities
5
6
### COPY Operations
7
8
High-performance bulk data import/export using PostgreSQL's COPY protocol for efficient data transfer.
9
10
```python { .api }
11
class CopyManager:
12
"""
13
Manager for PostgreSQL COPY operations providing bulk data import/export.
14
"""
15
16
def load_rows(statement, rows):
17
"""
18
Load rows using COPY FROM for bulk insert operations.
19
20
Parameters:
21
- statement (str): COPY FROM statement
22
- rows (iterable): Rows to insert (tuples or lists)
23
24
Returns:
25
int: Number of rows loaded
26
27
Raises:
28
CopyError: If COPY operation fails
29
"""
30
31
def dump_rows(statement):
32
"""
33
Dump rows using COPY TO for bulk export operations.
34
35
Parameters:
36
- statement (str): COPY TO statement
37
38
Returns:
39
Iterator: Iterator over exported row data
40
41
Raises:
42
CopyError: If COPY operation fails
43
"""
44
45
def load_file(statement, file_path):
46
"""
47
Load data from file using COPY FROM.
48
49
Parameters:
50
- statement (str): COPY FROM statement
51
- file_path (str): Path to source file
52
53
Returns:
54
int: Number of rows loaded
55
"""
56
57
def dump_file(statement, file_path):
58
"""
59
Dump data to file using COPY TO.
60
61
Parameters:
62
- statement (str): COPY TO statement
63
- file_path (str): Path to destination file
64
65
Returns:
66
int: Number of rows dumped
67
"""
68
```
69
70
### LISTEN/NOTIFY Support
71
72
Asynchronous notification system for real-time communication between database sessions.
73
74
```python { .api }
75
class NotificationManager:
76
"""
77
Manager for PostgreSQL LISTEN/NOTIFY asynchronous messaging.
78
"""
79
80
def listen(channel):
81
"""
82
Start listening for notifications on a channel.
83
84
Parameters:
85
- channel (str): Channel name to listen on
86
87
Raises:
88
NotificationError: If listen operation fails
89
"""
90
91
def unlisten(channel=None):
92
"""
93
Stop listening for notifications.
94
95
Parameters:
96
- channel (str, optional): Specific channel to unlisten (all if None)
97
98
Raises:
99
NotificationError: If unlisten operation fails
100
"""
101
102
def notify(channel, payload=None):
103
"""
104
Send notification to a channel.
105
106
Parameters:
107
- channel (str): Channel name to notify
108
- payload (str, optional): Optional message payload
109
110
Raises:
111
NotificationError: If notify operation fails
112
"""
113
114
def get_notifications():
115
"""
116
Get pending notifications (non-blocking).
117
118
Returns:
119
list: List of notification objects with channel, payload, pid
120
"""
121
122
def wait_for_notification(timeout=None):
123
"""
124
Wait for next notification (blocking).
125
126
Parameters:
127
- timeout (float, optional): Timeout in seconds (infinite if None)
128
129
Returns:
130
dict or None: Notification object or None if timeout
131
"""
132
133
class Notification:
134
"""Notification message from PostgreSQL."""
135
136
@property
137
def channel():
138
"""Channel name that received the notification."""
139
140
@property
141
def payload():
142
"""Optional payload data."""
143
144
@property
145
def pid():
146
"""Process ID of the notifying backend."""
147
```
148
149
### Advisory Locks
150
151
PostgreSQL advisory locks for application-level synchronization and coordination.
152
153
```python { .api }
154
class ALock:
155
"""
156
Base class for PostgreSQL advisory locks.
157
"""
158
159
def acquire(blocking=True):
160
"""
161
Acquire the advisory lock.
162
163
Parameters:
164
- blocking (bool): Whether to block until lock is available
165
166
Returns:
167
bool: True if lock acquired, False if non-blocking and unavailable
168
169
Raises:
170
LockError: If lock acquisition fails
171
"""
172
173
def release():
174
"""
175
Release the advisory lock.
176
177
Raises:
178
LockError: If lock release fails
179
"""
180
181
def __enter__():
182
"""Context manager entry - acquire lock."""
183
184
def __exit__(exc_type, exc_val, exc_tb):
185
"""Context manager exit - release lock."""
186
187
@property
188
def is_held():
189
"""
190
Check if lock is currently held by this session.
191
192
Returns:
193
bool: True if lock is held
194
"""
195
196
class ExclusiveLock(ALock):
197
"""
198
Exclusive advisory lock - only one session can hold it.
199
"""
200
201
def __init__(lock_id):
202
"""
203
Create exclusive advisory lock.
204
205
Parameters:
206
- lock_id (int or tuple): Lock identifier (int or pair of ints)
207
"""
208
209
class ShareLock(ALock):
210
"""
211
Shared advisory lock - multiple sessions can hold it simultaneously.
212
"""
213
214
def __init__(lock_id):
215
"""
216
Create shared advisory lock.
217
218
Parameters:
219
- lock_id (int or tuple): Lock identifier (int or pair of ints)
220
"""
221
```
222
223
### Streaming Results
224
225
Interfaces for streaming large result sets without loading all data into memory.
226
227
```python { .api }
228
class ResultStream:
229
"""
230
Streaming interface for large query results.
231
"""
232
233
def __iter__():
234
"""Iterate over result rows."""
235
236
def __next__():
237
"""Get next result row."""
238
239
def close():
240
"""Close the result stream."""
241
242
@property
243
def description():
244
"""
245
Get column description information.
246
247
Returns:
248
list: Column metadata
249
"""
250
251
def stream_query(connection, query, *parameters):
252
"""
253
Execute query and return streaming result interface.
254
255
Parameters:
256
- connection: Database connection
257
- query (str): SQL query to execute
258
- *parameters: Query parameters
259
260
Returns:
261
ResultStream: Streaming result interface
262
"""
263
```
264
265
### Connection Pooling Utilities
266
267
Utilities for managing connection pools and connection lifecycle.
268
269
```python { .api }
270
class ConnectionPool:
271
"""
272
Connection pool for managing database connections.
273
"""
274
275
def __init__(connector, min_size=1, max_size=10):
276
"""
277
Create connection pool.
278
279
Parameters:
280
- connector: Connection factory
281
- min_size (int): Minimum pool size
282
- max_size (int): Maximum pool size
283
"""
284
285
def get_connection():
286
"""
287
Get connection from pool.
288
289
Returns:
290
Connection: Database connection from pool
291
"""
292
293
def return_connection(connection):
294
"""
295
Return connection to pool.
296
297
Parameters:
298
- connection: Connection to return
299
"""
300
301
def close_all():
302
"""Close all connections in pool."""
303
304
@property
305
def size():
306
"""Current pool size."""
307
308
@property
309
def available():
310
"""Number of available connections."""
311
```
312
313
### Stored Procedure Interface
314
315
Interface for calling PostgreSQL stored procedures and functions with parameter binding and result handling.
316
317
```python { .api }
318
class StoredProcedure:
319
"""
320
Interface for calling PostgreSQL stored procedures and functions.
321
"""
322
323
def __call__(*args, **kw):
324
"""
325
Execute the stored procedure with provided arguments.
326
327
Parameters:
328
- *args: Positional parameters for the procedure
329
- **kw: Keyword parameters for the procedure
330
331
Returns:
332
Procedure result (varies by procedure type)
333
334
Raises:
335
ProcedureError: If procedure execution fails
336
"""
337
338
@property
339
def name():
340
"""
341
Get procedure name.
342
343
Returns:
344
str: Fully qualified procedure name
345
"""
346
347
@property
348
def parameter_types():
349
"""
350
Get parameter type information.
351
352
Returns:
353
List[int]: PostgreSQL type OIDs for parameters
354
"""
355
356
@property
357
def return_type():
358
"""
359
Get return type information.
360
361
Returns:
362
int: PostgreSQL type OID for return value
363
"""
364
```
365
366
## Usage Examples
367
368
### COPY Operations for Bulk Data
369
370
```python
371
import postgresql
372
import postgresql.copyman as copy_manager
373
import csv
374
375
db = postgresql.open('pq://user:pass@localhost/mydb')
376
377
# Create test table
378
db.execute("""
379
CREATE TABLE IF NOT EXISTS bulk_data (
380
id SERIAL PRIMARY KEY,
381
name TEXT,
382
value NUMERIC,
383
created_date DATE
384
)
385
""")
386
387
# Bulk insert using COPY
388
def bulk_insert_with_copy():
389
"""Bulk insert data using COPY for high performance."""
390
391
# Prepare data
392
data_rows = []
393
for i in range(10000):
394
data_rows.append((f"Item {i}", i * 1.5, '2023-01-01'))
395
396
# Use COPY for bulk insert
397
copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV)"
398
399
copy_mgr = copy_manager.CopyManager(db)
400
rows_loaded = copy_mgr.load_rows(copy_stmt, data_rows)
401
402
print(f"Loaded {rows_loaded} rows using COPY")
403
404
# Bulk export using COPY
405
def bulk_export_with_copy():
406
"""Export data using COPY for high performance."""
407
408
copy_stmt = "COPY bulk_data TO STDOUT WITH (FORMAT CSV, HEADER)"
409
410
copy_mgr = copy_manager.CopyManager(db)
411
412
# Export to file
413
copy_mgr.dump_file(copy_stmt, "/tmp/exported_data.csv")
414
print("Data exported to /tmp/exported_data.csv")
415
416
# Or stream export data
417
row_count = 0
418
for row_data in copy_mgr.dump_rows(copy_stmt):
419
row_count += 1
420
if row_count <= 5: # Show first 5 rows
421
print(f"Exported row: {row_data}")
422
423
print(f"Total rows exported: {row_count}")
424
425
# Import from CSV file
426
def import_csv_file():
427
"""Import data from CSV file using COPY."""
428
429
# Create CSV file
430
with open('/tmp/import_data.csv', 'w', newline='') as csvfile:
431
writer = csv.writer(csvfile)
432
writer.writerow(['name', 'value', 'created_date']) # Header
433
for i in range(5000):
434
writer.writerow([f"CSV Item {i}", i * 2.0, '2023-06-01'])
435
436
# Import using COPY
437
copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV, HEADER)"
438
439
copy_mgr = copy_manager.CopyManager(db)
440
rows_loaded = copy_mgr.load_file(copy_stmt, '/tmp/import_data.csv')
441
442
print(f"Imported {rows_loaded} rows from CSV file")
443
444
# Execute bulk operations
445
bulk_insert_with_copy()
446
bulk_export_with_copy()
447
import_csv_file()
448
```
449
450
### LISTEN/NOTIFY for Real-time Communication
451
452
```python
453
import postgresql
454
import postgresql.notifyman as notify
455
import threading
456
import time
457
458
# Set up two connections for demonstration
459
publisher = postgresql.open('pq://user:pass@localhost/mydb')
460
subscriber = postgresql.open('pq://user:pass@localhost/mydb')
461
462
def notification_publisher():
463
"""Publish notifications to channels."""
464
465
notifier = notify.NotificationManager(publisher)
466
467
for i in range(10):
468
# Send notifications to different channels
469
notifier.notify('events', f'Event {i}: Something happened')
470
notifier.notify('alerts', f'Alert {i}: Check system status')
471
472
print(f"Published notification {i}")
473
time.sleep(2)
474
475
# Send termination signal
476
notifier.notify('events', 'TERMINATE')
477
478
def notification_subscriber():
479
"""Subscribe to notifications and process them."""
480
481
listener = notify.NotificationManager(subscriber)
482
483
# Listen to multiple channels
484
listener.listen('events')
485
listener.listen('alerts')
486
487
print("Listening for notifications...")
488
489
while True:
490
# Wait for notifications (blocking)
491
notification = listener.wait_for_notification(timeout=30)
492
493
if notification:
494
channel = notification.channel
495
payload = notification.payload
496
sender_pid = notification.pid
497
498
print(f"Received on '{channel}': {payload} (from PID {sender_pid})")
499
500
# Check for termination signal
501
if payload == 'TERMINATE':
502
print("Termination signal received, stopping listener")
503
break
504
else:
505
print("Notification timeout")
506
break
507
508
# Clean up
509
listener.unlisten() # Unlisten from all channels
510
511
# Run publisher and subscriber in separate threads
512
subscriber_thread = threading.Thread(target=notification_subscriber)
513
publisher_thread = threading.Thread(target=notification_publisher)
514
515
subscriber_thread.start()
516
time.sleep(1) # Let subscriber start first
517
publisher_thread.start()
518
519
# Wait for both threads to complete
520
subscriber_thread.join()
521
publisher_thread.join()
522
523
publisher.close()
524
subscriber.close()
525
```
526
527
### Advisory Locks for Coordination
528
529
```python
530
import postgresql
531
import postgresql.alock as advisory_locks
532
import threading
533
import time
534
535
db1 = postgresql.open('pq://user:pass@localhost/mydb')
536
db2 = postgresql.open('pq://user:pass@localhost/mydb')
537
538
def exclusive_lock_example():
539
"""Demonstrate exclusive advisory locks."""
540
541
def worker(worker_id, connection):
542
lock = advisory_locks.ExclusiveLock(12345) # Lock ID
543
lock.connection = connection
544
545
print(f"Worker {worker_id}: Attempting to acquire exclusive lock")
546
547
# Try to acquire lock (blocking)
548
if lock.acquire():
549
print(f"Worker {worker_id}: Acquired exclusive lock")
550
551
# Simulate work
552
time.sleep(3)
553
554
print(f"Worker {worker_id}: Releasing exclusive lock")
555
lock.release()
556
else:
557
print(f"Worker {worker_id}: Failed to acquire lock")
558
559
# Start two workers competing for the same lock
560
thread1 = threading.Thread(target=worker, args=(1, db1))
561
thread2 = threading.Thread(target=worker, args=(2, db2))
562
563
thread1.start()
564
thread2.start()
565
566
thread1.join()
567
thread2.join()
568
569
def shared_lock_example():
570
"""Demonstrate shared advisory locks."""
571
572
def reader(reader_id, connection):
573
lock = advisory_locks.ShareLock(54321) # Shared lock ID
574
lock.connection = connection
575
576
print(f"Reader {reader_id}: Acquiring shared lock")
577
578
with lock: # Context manager automatically acquires/releases
579
print(f"Reader {reader_id}: Reading data (shared access)")
580
time.sleep(2)
581
print(f"Reader {reader_id}: Finished reading")
582
583
def writer(connection):
584
lock = advisory_locks.ExclusiveLock(54321) # Same ID as readers
585
lock.connection = connection
586
587
print("Writer: Waiting for exclusive access")
588
589
with lock:
590
print("Writer: Writing data (exclusive access)")
591
time.sleep(3)
592
print("Writer: Finished writing")
593
594
# Start multiple readers and one writer
595
reader_threads = []
596
for i in range(3):
597
thread = threading.Thread(target=reader, args=(i+1, db1))
598
reader_threads.append(thread)
599
thread.start()
600
601
time.sleep(1) # Let readers start first
602
603
writer_thread = threading.Thread(target=writer, args=(db2,))
604
writer_thread.start()
605
606
# Wait for all threads
607
for thread in reader_threads:
608
thread.join()
609
writer_thread.join()
610
611
def distributed_counter_example():
612
"""Implement distributed counter using advisory locks."""
613
614
# Create counter table
615
db1.execute("""
616
CREATE TABLE IF NOT EXISTS distributed_counter (
617
name TEXT PRIMARY KEY,
618
value INTEGER DEFAULT 0
619
)
620
""")
621
622
# Initialize counter
623
db1.execute("INSERT INTO distributed_counter (name, value) VALUES ('global', 0) ON CONFLICT (name) DO NOTHING")
624
625
def increment_counter(worker_id, connection, increments):
626
for i in range(increments):
627
# Use lock to ensure atomic counter increment
628
lock = advisory_locks.ExclusiveLock(99999) # Counter lock ID
629
lock.connection = connection
630
631
with lock:
632
# Read current value
633
current = connection.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
634
635
# Increment
636
new_value = current + 1
637
connection.execute("UPDATE distributed_counter SET value = $1 WHERE name = 'global'", new_value)
638
639
print(f"Worker {worker_id}: Incremented counter to {new_value}")
640
641
time.sleep(0.1) # Small delay between increments
642
643
# Start multiple workers incrementing the counter
644
workers = []
645
for i in range(3):
646
connection = postgresql.open('pq://user:pass@localhost/mydb')
647
thread = threading.Thread(target=increment_counter, args=(i+1, connection, 5))
648
workers.append((thread, connection))
649
thread.start()
650
651
# Wait for all workers
652
for thread, connection in workers:
653
thread.join()
654
connection.close()
655
656
# Check final counter value
657
final_value = db1.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
658
print(f"Final counter value: {final_value}")
659
660
print("=== Exclusive Lock Example ===")
661
exclusive_lock_example()
662
663
print("\n=== Shared Lock Example ===")
664
shared_lock_example()
665
666
print("\n=== Distributed Counter Example ===")
667
distributed_counter_example()
668
669
db1.close()
670
db2.close()
671
```
672
673
### Streaming Large Result Sets
674
675
```python
676
import postgresql
677
import time
678
679
db = postgresql.open('pq://user:pass@localhost/mydb')
680
681
# Create large test dataset
682
db.execute("""
683
CREATE TABLE IF NOT EXISTS large_dataset AS
684
SELECT
685
generate_series(1, 1000000) as id,
686
'Item ' || generate_series(1, 1000000) as name,
687
random() * 1000 as value,
688
NOW() - (random() * interval '365 days') as created_at
689
""")
690
691
def stream_large_results():
692
"""Stream large result sets to avoid memory issues."""
693
694
# Prepare streaming query
695
query = db.prepare("""
696
SELECT id, name, value, created_at
697
FROM large_dataset
698
WHERE value > $1
699
ORDER BY value DESC
700
""")
701
702
print("Starting streaming query...")
703
start_time = time.time()
704
705
# Stream results instead of loading all into memory
706
row_count = 0
707
total_value = 0
708
709
for row in query.rows(500): # Stream rows where value > 500
710
row_count += 1
711
total_value += row['value']
712
713
# Process row (show first 10)
714
if row_count <= 10:
715
print(f"Row {row_count}: ID={row['id']}, Name={row['name']}, Value={row['value']:.2f}")
716
elif row_count % 10000 == 0:
717
print(f"Processed {row_count} rows...")
718
719
end_time = time.time()
720
avg_value = total_value / row_count if row_count > 0 else 0
721
722
print(f"Streaming complete:")
723
print(f" Rows processed: {row_count}")
724
print(f" Average value: {avg_value:.2f}")
725
print(f" Time taken: {end_time - start_time:.2f} seconds")
726
727
def chunk_processing():
728
"""Process large datasets in chunks."""
729
730
query = db.prepare("SELECT * FROM large_dataset ORDER BY id")
731
732
print("Starting chunk processing...")
733
734
chunk_size = 0
735
chunk_count = 0
736
total_processed = 0
737
738
# Process data in chunks
739
for chunk in query.chunks():
740
chunk_count += 1
741
chunk_size = len(chunk)
742
total_processed += chunk_size
743
744
# Process chunk
745
chunk_sum = sum(row['value'] for row in chunk)
746
chunk_avg = chunk_sum / chunk_size
747
748
print(f"Chunk {chunk_count}: {chunk_size} rows, avg value: {chunk_avg:.2f}")
749
750
# Simulate processing time
751
time.sleep(0.1)
752
753
# Limit for demonstration
754
if chunk_count >= 10:
755
break
756
757
print(f"Chunk processing complete - {total_processed} rows in {chunk_count} chunks")
758
759
def memory_efficient_aggregation():
760
"""Perform aggregations on large datasets without loading all data."""
761
762
query = db.prepare("""
763
SELECT
764
EXTRACT(month FROM created_at) as month,
765
id, value
766
FROM large_dataset
767
ORDER BY created_at
768
""")
769
770
# Track monthly statistics
771
monthly_stats = {}
772
773
print("Computing monthly statistics...")
774
775
for row in query.rows():
776
month = int(row['month'])
777
value = row['value']
778
779
if month not in monthly_stats:
780
monthly_stats[month] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}
781
782
stats = monthly_stats[month]
783
stats['count'] += 1
784
stats['sum'] += value
785
stats['min'] = min(stats['min'], value)
786
stats['max'] = max(stats['max'], value)
787
788
# Display results
789
print("Monthly statistics:")
790
for month in sorted(monthly_stats.keys()):
791
stats = monthly_stats[month]
792
avg = stats['sum'] / stats['count']
793
print(f" Month {month:2d}: {stats['count']:6d} rows, "
794
f"avg: {avg:6.2f}, min: {stats['min']:6.2f}, max: {stats['max']:6.2f}")
795
796
# Run streaming examples
797
stream_large_results()
798
print("\n" + "="*50 + "\n")
799
chunk_processing()
800
print("\n" + "="*50 + "\n")
801
memory_efficient_aggregation()
802
803
db.close()
804
```
805
806
### Connection Pool Management
807
808
```python
809
import postgresql
810
import postgresql.pool as connection_pool
811
import threading
812
import time
813
import random
814
815
def connection_pool_example():
816
"""Demonstrate connection pooling for concurrent access."""
817
818
# Create connection factory
819
connector = postgresql.open('&pq://user:pass@localhost/mydb')
820
821
# Create connection pool
822
pool = connection_pool.ConnectionPool(
823
connector,
824
min_size=2,
825
max_size=5
826
)
827
828
def worker(worker_id, num_operations):
829
"""Worker function that uses pooled connections."""
830
831
for i in range(num_operations):
832
# Get connection from pool
833
conn = pool.get_connection()
834
835
try:
836
# Simulate database work
837
result = conn.query("SELECT $1 as worker_id, $2 as operation, NOW() as timestamp",
838
worker_id, i)
839
840
print(f"Worker {worker_id}, Op {i}: {result[0]['timestamp']}")
841
842
# Simulate processing time
843
time.sleep(random.uniform(0.1, 0.5))
844
845
finally:
846
# Always return connection to pool
847
pool.return_connection(conn)
848
849
# Small delay between operations
850
time.sleep(0.1)
851
852
print(f"Starting connection pool with {pool.size} connections")
853
854
# Start multiple workers
855
workers = []
856
for i in range(8): # More workers than pool size
857
thread = threading.Thread(target=worker, args=(i+1, 3))
858
workers.append(thread)
859
thread.start()
860
861
# Monitor pool status
862
def monitor_pool():
863
for _ in range(10):
864
print(f"Pool status - Size: {pool.size}, Available: {pool.available}")
865
time.sleep(1)
866
867
monitor_thread = threading.Thread(target=monitor_pool)
868
monitor_thread.start()
869
870
# Wait for all workers to complete
871
for thread in workers:
872
thread.join()
873
874
monitor_thread.join()
875
876
# Clean up
877
pool.close_all()
878
print("Connection pool closed")
879
880
# Run connection pool example
881
connection_pool_example()
882
```