0
# Advanced PostgreSQL Features
1
2
Large objects, server-side cursors, asynchronous operations, notifications (LISTEN/NOTIFY), replication support, and other PostgreSQL-specific advanced functionality. These features leverage PostgreSQL's unique capabilities for high-performance applications.
3
4
## Capabilities
5
6
### Large Objects (LOB)
7
8
PostgreSQL large object interface for handling binary data larger than typical field limits.
9
10
```python { .api }
11
class lobject:
12
"""PostgreSQL large object interface."""
13
14
def __init__(self, conn, oid=0, mode='r', new_oid=None, new_file=None):
15
"""
16
Initialize large object.
17
18
Parameters:
19
- conn (connection): Database connection
20
- oid (int): Existing large object OID (0 for new)
21
- mode (str): Access mode ('r', 'w', 'rw', 'n')
22
- new_oid (int, optional): OID for new object
23
- new_file (str, optional): File to import
24
"""
25
26
def read(self, size=-1):
27
"""
28
Read data from large object.
29
30
Parameters:
31
- size (int): Bytes to read (-1 for all)
32
33
Returns:
34
bytes: Data read from object
35
"""
36
37
def write(self, data):
38
"""
39
Write data to large object.
40
41
Parameters:
42
- data (bytes): Data to write
43
44
Returns:
45
int: Number of bytes written
46
"""
47
48
def seek(self, pos, whence=0):
49
"""
50
Seek to position in large object.
51
52
Parameters:
53
- pos (int): Position to seek to
54
- whence (int): Seek origin (0=start, 1=current, 2=end)
55
56
Returns:
57
int: New position
58
"""
59
60
def tell(self):
61
"""
62
Get current position.
63
64
Returns:
65
int: Current position in object
66
"""
67
68
def truncate(self, size=0):
69
"""
70
Truncate large object.
71
72
Parameters:
73
- size (int): New size
74
"""
75
76
def close(self):
77
"""Close large object."""
78
79
def export(self, filename):
80
"""
81
Export large object to file.
82
83
Parameters:
84
- filename (str): Target filename
85
"""
86
87
def unlink(self):
88
"""Delete large object from database."""
89
90
@property
91
def oid(self):
92
"""Large object OID."""
93
94
@property
95
def mode(self):
96
"""Access mode."""
97
98
@property
99
def closed(self):
100
"""Closed status."""
101
```
102
103
Usage examples:
104
105
```python
106
# Create new large object
107
lobj = conn.lobject(mode='w')
108
lobj.write(b'Large binary data here...')
109
oid = lobj.oid
110
lobj.close()
111
112
# Store OID in table
113
cur.execute("INSERT INTO documents (name, data_oid) VALUES (%s, %s)",
114
('document.pdf', oid))
115
116
# Read large object
117
cur.execute("SELECT data_oid FROM documents WHERE name = %s", ('document.pdf',))
118
oid = cur.fetchone()[0]
119
120
lobj = conn.lobject(oid, mode='r')
121
data = lobj.read()
122
lobj.close()
123
124
# Work with files
125
lobj = conn.lobject(mode='w', new_file='/path/to/large_file.bin')
126
stored_oid = lobj.oid
127
128
# Export large object
129
lobj = conn.lobject(oid, mode='r')
130
lobj.export('/path/to/exported_file.bin')
131
lobj.close()
132
```
133
134
### Asynchronous Operations
135
136
Support for non-blocking database operations with polling and wait callbacks.
137
138
```python { .api }
139
def set_wait_callback(f):
140
"""
141
Set global wait callback for async operations.
142
143
Parameters:
144
- f (callable): Callback function for waiting
145
"""
146
147
def get_wait_callback():
148
"""
149
Get current wait callback.
150
151
Returns:
152
callable/None: Current wait callback
153
"""
154
155
# Connection polling constants
156
POLL_OK: int # Operation completed
157
POLL_READ: int # Wait for socket read
158
POLL_WRITE: int # Wait for socket write
159
POLL_ERROR: int # Error occurred
160
```
161
162
Usage examples:
163
164
```python
165
import select
166
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
167
168
# Create async connection
169
conn = psycopg2.connect(..., async_=True)
170
171
# Wait for connection to complete
172
def wait_for_connection(conn):
173
while True:
174
state = conn.poll()
175
if state == POLL_OK:
176
break
177
elif state == POLL_READ:
178
select.select([conn.fileno()], [], [])
179
elif state == POLL_WRITE:
180
select.select([], [conn.fileno()], [])
181
else:
182
raise Exception("Connection failed")
183
184
wait_for_connection(conn)
185
186
# Async query execution
187
cur = conn.cursor()
188
cur.execute("SELECT * FROM large_table")
189
190
# Poll for query completion
191
def wait_for_query(conn):
192
while True:
193
state = conn.poll()
194
if state == POLL_OK:
195
return
196
elif state == POLL_READ:
197
select.select([conn.fileno()], [], [])
198
elif state == POLL_WRITE:
199
select.select([], [conn.fileno()], [])
200
201
wait_for_query(conn)
202
results = cur.fetchall()
203
204
# Custom wait callback
205
def custom_wait_callback(conn):
206
"""Custom wait callback using select."""
207
while True:
208
state = conn.poll()
209
if state == POLL_OK:
210
break
211
elif state == POLL_READ:
212
select.select([conn.fileno()], [], [], 1.0) # 1 second timeout
213
elif state == POLL_WRITE:
214
select.select([], [conn.fileno()], [], 1.0)
215
216
set_wait_callback(custom_wait_callback)
217
```
218
219
### Notifications (LISTEN/NOTIFY)
220
221
PostgreSQL's asynchronous notification system for inter-process communication.
222
223
```python { .api }
224
class Notify:
225
"""PostgreSQL notification message."""
226
227
@property
228
def channel(self):
229
"""Notification channel name."""
230
231
@property
232
def payload(self):
233
"""Notification payload data."""
234
235
@property
236
def pid(self):
237
"""Process ID that sent notification."""
238
239
# Connection notification methods
240
class connection:
241
def notifies(self):
242
"""
243
Get pending notifications.
244
245
Returns:
246
list: List of Notify objects
247
"""
248
```
249
250
Usage examples:
251
252
```python
253
# Setup listener connection
254
listener_conn = psycopg2.connect(...)
255
listener_conn.autocommit = True
256
257
# Listen for notifications
258
cur = listener_conn.cursor()
259
cur.execute("LISTEN order_updates")
260
cur.execute("LISTEN inventory_changes")
261
262
# Setup notifier connection
263
notifier_conn = psycopg2.connect(...)
264
notifier_conn.autocommit = True
265
266
# Send notification
267
notifier_cur = notifier_conn.cursor()
268
notifier_cur.execute("NOTIFY order_updates, 'Order 12345 shipped'")
269
270
# Check for notifications (polling)
271
listener_conn.poll()
272
notifies = listener_conn.notifies()
273
for notify in notifies:
274
print(f"Channel: {notify.channel}")
275
print(f"Payload: {notify.payload}")
276
print(f"From PID: {notify.pid}")
277
278
# Async notification handling
279
def notification_handler():
280
while True:
281
# Wait for data
282
select.select([listener_conn], [], [])
283
284
# Poll connection
285
listener_conn.poll()
286
287
# Process notifications
288
while listener_conn.notifies():
289
notify = listener_conn.notifies().pop(0)
290
handle_notification(notify)
291
292
def handle_notification(notify):
293
if notify.channel == 'order_updates':
294
process_order_update(notify.payload)
295
elif notify.channel == 'inventory_changes':
296
process_inventory_change(notify.payload)
297
```
298
299
### Replication Support
300
301
Support for PostgreSQL streaming replication (physical and logical).
302
303
```python { .api }
304
class ReplicationConnection(connection):
305
"""Connection for replication operations."""
306
307
def __init__(self, *args, **kwargs):
308
"""Initialize replication connection."""
309
310
class ReplicationCursor(cursor):
311
"""Cursor for replication operations."""
312
313
def start_replication(self, slot_name=None, decode=False, start_lsn=None,
314
timeline=None, options=None):
315
"""
316
Start replication stream.
317
318
Parameters:
319
- slot_name (str, optional): Replication slot name
320
- decode (bool): Logical decoding mode
321
- start_lsn (str, optional): Starting LSN
322
- timeline (int, optional): Timeline ID
323
- options (dict, optional): Additional options
324
"""
325
326
def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False):
327
"""
328
Send replication feedback.
329
330
Parameters:
331
- write_lsn (int): Write LSN
332
- flush_lsn (int): Flush LSN
333
- apply_lsn (int): Apply LSN
334
- reply (bool): Request reply
335
"""
336
337
def create_replication_slot(self, slot_name, output_plugin=None):
338
"""
339
Create replication slot.
340
341
Parameters:
342
- slot_name (str): Slot name
343
- output_plugin (str, optional): Output plugin for logical slots
344
"""
345
346
def drop_replication_slot(self, slot_name):
347
"""
348
Drop replication slot.
349
350
Parameters:
351
- slot_name (str): Slot name to drop
352
"""
353
354
class ReplicationMessage:
355
"""Replication stream message."""
356
357
@property
358
def data_start(self):
359
"""Message data start LSN."""
360
361
@property
362
def wal_end(self):
363
"""WAL end LSN."""
364
365
@property
366
def send_time(self):
367
"""Message send time."""
368
369
@property
370
def payload(self):
371
"""Message payload data."""
372
373
# Replication constants
374
REPLICATION_PHYSICAL: int # Physical replication mode
375
REPLICATION_LOGICAL: int # Logical replication mode
376
```
377
378
Usage examples:
379
380
```python
381
from psycopg2.extras import (
382
ReplicationConnection,
383
REPLICATION_PHYSICAL,
384
REPLICATION_LOGICAL
385
)
386
387
# Physical replication
388
repl_conn = psycopg2.connect(
389
host='master-server',
390
user='replication_user',
391
connection_factory=ReplicationConnection
392
)
393
394
cur = repl_conn.cursor()
395
396
# Create physical replication slot
397
cur.create_replication_slot('physical_slot')
398
399
# Start physical replication
400
cur.start_replication(slot_name='physical_slot')
401
402
# Process replication messages
403
def consume_stream():
404
for msg in cur:
405
# Process WAL data
406
process_wal_message(msg)
407
408
# Send feedback periodically
409
cur.send_feedback(flush_lsn=msg.data_start)
410
411
# Logical replication
412
logical_conn = psycopg2.connect(
413
connection_factory=ReplicationConnection,
414
...
415
)
416
417
logical_cur = logical_conn.cursor()
418
419
# Create logical replication slot
420
logical_cur.create_replication_slot('logical_slot', 'test_decoding')
421
422
# Start logical replication
423
logical_cur.start_replication(
424
slot_name='logical_slot',
425
decode=True,
426
options={'include-xids': 0, 'skip-empty-xacts': 1}
427
)
428
429
# Process logical changes
430
for msg in logical_cur:
431
change_data = msg.payload.decode('utf-8')
432
process_logical_change(change_data)
433
```
434
435
### Connection Information and Diagnostics
436
437
Access to connection state and PostgreSQL server information.
438
439
```python { .api }
440
class ConnectionInfo:
441
"""Connection information and state."""
442
443
@property
444
def dbname(self):
445
"""Database name."""
446
447
@property
448
def user(self):
449
"""Connected user."""
450
451
@property
452
def password(self):
453
"""Connection password (masked)."""
454
455
@property
456
def host(self):
457
"""Server host."""
458
459
@property
460
def port(self):
461
"""Server port."""
462
463
@property
464
def options(self):
465
"""Connection options."""
466
467
@property
468
def dsn_parameters(self):
469
"""All DSN parameters as dict."""
470
471
@property
472
def status(self):
473
"""Connection status."""
474
475
@property
476
def transaction_status(self):
477
"""Transaction status."""
478
479
@property
480
def protocol_version(self):
481
"""Protocol version."""
482
483
@property
484
def server_version(self):
485
"""Server version."""
486
487
@property
488
def error_message(self):
489
"""Last error message."""
490
491
@property
492
def backend_pid(self):
493
"""Backend process ID."""
494
495
@property
496
def needs_password(self):
497
"""Whether connection needs password."""
498
499
@property
500
def used_password(self):
501
"""Whether password was used."""
502
503
@property
504
def ssl_in_use(self):
505
"""Whether SSL is in use."""
506
507
class Diagnostics:
508
"""Error diagnostics information."""
509
510
def __init__(self, exception):
511
"""Initialize from exception."""
512
513
@property
514
def severity(self):
515
"""Error severity."""
516
517
@property
518
def sqlstate(self):
519
"""SQL state code."""
520
521
@property
522
def message_primary(self):
523
"""Primary error message."""
524
525
@property
526
def message_detail(self):
527
"""Detailed error message."""
528
529
@property
530
def message_hint(self):
531
"""Error hint."""
532
533
@property
534
def statement_position(self):
535
"""Error position in statement."""
536
537
@property
538
def internal_position(self):
539
"""Internal statement position."""
540
541
@property
542
def internal_query(self):
543
"""Internal query causing error."""
544
545
@property
546
def context(self):
547
"""Error context."""
548
549
@property
550
def schema_name(self):
551
"""Schema name related to error."""
552
553
@property
554
def table_name(self):
555
"""Table name related to error."""
556
557
@property
558
def column_name(self):
559
"""Column name related to error."""
560
561
@property
562
def datatype_name(self):
563
"""Data type name related to error."""
564
565
@property
566
def constraint_name(self):
567
"""Constraint name related to error."""
568
569
@property
570
def source_file(self):
571
"""Source file where error occurred."""
572
573
@property
574
def source_line(self):
575
"""Source line where error occurred."""
576
577
@property
578
def source_function(self):
579
"""Source function where error occurred."""
580
```
581
582
Usage examples:
583
584
```python
585
# Connection information
586
conn_info = conn.info
587
print(f"Database: {conn_info.dbname}")
588
print(f"User: {conn_info.user}")
589
print(f"Host: {conn_info.host}:{conn_info.port}")
590
print(f"Server version: {conn_info.server_version}")
591
print(f"Backend PID: {conn_info.backend_pid}")
592
print(f"SSL in use: {conn_info.ssl_in_use}")
593
594
# Detailed error diagnostics
595
try:
596
cur.execute("INSERT INTO users (id, email) VALUES (1, 'invalid-email')")
597
except psycopg2.IntegrityError as e:
598
diag = psycopg2.extensions.Diagnostics(e)
599
print(f"Error: {diag.message_primary}")
600
print(f"Detail: {diag.message_detail}")
601
print(f"Hint: {diag.message_hint}")
602
print(f"SQL State: {diag.sqlstate}")
603
print(f"Table: {diag.table_name}")
604
print(f"Constraint: {diag.constraint_name}")
605
```
606
607
### Security Features
608
609
Password encryption and security-related functionality.
610
611
```python { .api }
612
def encrypt_password(password, user, scope=None, algorithm=None):
613
"""
614
Encrypt password for PostgreSQL authentication.
615
616
Parameters:
617
- password (str): Plain text password
618
- user (str): Username
619
- scope (connection, optional): Connection scope
620
- algorithm (str, optional): Encryption algorithm
621
622
Returns:
623
str: Encrypted password string
624
"""
625
```
626
627
Usage examples:
628
629
```python
630
# Encrypt password for storage
631
encrypted = psycopg2.extensions.encrypt_password('mypassword', 'myuser')
632
print(encrypted) # 'md5...' or 'SCRAM-SHA-256$...'
633
634
# Use with connection
635
conn = psycopg2.connect(
636
host='localhost',
637
user='myuser',
638
password=encrypted # Can use pre-encrypted password
639
)
640
```
641
642
## Types
643
644
### Replication Types
645
646
```python { .api }
647
ReplicationSlotInfo = {
648
'slot_name': str,
649
'consistent_point': str,
650
'snapshot_name': str,
651
'output_plugin': str
652
}
653
654
ReplicationOptions = {
655
'include-xids': int,
656
'skip-empty-xacts': int,
657
'include-rewrites': int,
658
'pretty-print': int
659
}
660
```
661
662
### Large Object Constants
663
664
```python { .api }
665
# Access modes
666
INV_READ: int = 0x40 # Read access
667
INV_WRITE: int = 0x80 # Write access
668
669
# Seek origins
670
SEEK_SET: int = 0 # From beginning
671
SEEK_CUR: int = 1 # From current position
672
SEEK_END: int = 2 # From end
673
```