PostgreSQL database adapter for Python with thread-safe connection pooling and SQL operations
—
Large objects, server-side cursors, asynchronous operations, notifications (LISTEN/NOTIFY), replication support, and other PostgreSQL-specific advanced functionality. These features leverage PostgreSQL's unique capabilities for high-performance applications.
PostgreSQL large object interface for handling binary data larger than typical field limits.
class lobject:
"""PostgreSQL large object interface."""
def __init__(self, conn, oid=0, mode='r', new_oid=None, new_file=None):
"""
Initialize large object.
Parameters:
- conn (connection): Database connection
- oid (int): Existing large object OID (0 for new)
- mode (str): Access mode ('r', 'w', 'rw', 'n')
- new_oid (int, optional): OID for new object
- new_file (str, optional): File to import
"""
def read(self, size=-1):
"""
Read data from large object.
Parameters:
- size (int): Bytes to read (-1 for all)
Returns:
bytes: Data read from object
"""
def write(self, data):
"""
Write data to large object.
Parameters:
- data (bytes): Data to write
Returns:
int: Number of bytes written
"""
def seek(self, pos, whence=0):
"""
Seek to position in large object.
Parameters:
- pos (int): Position to seek to
- whence (int): Seek origin (0=start, 1=current, 2=end)
Returns:
int: New position
"""
def tell(self):
"""
Get current position.
Returns:
int: Current position in object
"""
def truncate(self, size=0):
"""
Truncate large object.
Parameters:
- size (int): New size
"""
def close(self):
"""Close large object."""
def export(self, filename):
"""
Export large object to file.
Parameters:
- filename (str): Target filename
"""
def unlink(self):
"""Delete large object from database."""
@property
def oid(self):
"""Large object OID."""
@property
def mode(self):
"""Access mode."""
@property
def closed(self):
"""Closed status."""Usage examples:
# Create new large object
lobj = conn.lobject(mode='w')
lobj.write(b'Large binary data here...')
oid = lobj.oid
lobj.close()
# Store OID in table
cur.execute("INSERT INTO documents (name, data_oid) VALUES (%s, %s)",
('document.pdf', oid))
# Read large object
cur.execute("SELECT data_oid FROM documents WHERE name = %s", ('document.pdf',))
oid = cur.fetchone()[0]
lobj = conn.lobject(oid, mode='r')
data = lobj.read()
lobj.close()
# Work with files
lobj = conn.lobject(mode='w', new_file='/path/to/large_file.bin')
stored_oid = lobj.oid
# Export large object
lobj = conn.lobject(oid, mode='r')
lobj.export('/path/to/exported_file.bin')
lobj.close()Support for non-blocking database operations with polling and wait callbacks.
def set_wait_callback(f):
"""
Set global wait callback for async operations.
Parameters:
- f (callable): Callback function for waiting
"""
def get_wait_callback():
"""
Get current wait callback.
Returns:
callable/None: Current wait callback
"""
# Connection polling constants
POLL_OK: int # Operation completed
POLL_READ: int # Wait for socket read
POLL_WRITE: int # Wait for socket write
POLL_ERROR: int # Error occurredUsage examples:
import select
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
# Create async connection
conn = psycopg2.connect(..., async_=True)
# Wait for connection to complete
def wait_for_connection(conn):
while True:
state = conn.poll()
if state == POLL_OK:
break
elif state == POLL_READ:
select.select([conn.fileno()], [], [])
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [])
else:
raise Exception("Connection failed")
wait_for_connection(conn)
# Async query execution
cur = conn.cursor()
cur.execute("SELECT * FROM large_table")
# Poll for query completion
def wait_for_query(conn):
while True:
state = conn.poll()
if state == POLL_OK:
return
elif state == POLL_READ:
select.select([conn.fileno()], [], [])
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [])
wait_for_query(conn)
results = cur.fetchall()
# Custom wait callback
def custom_wait_callback(conn):
"""Custom wait callback using select."""
while True:
state = conn.poll()
if state == POLL_OK:
break
elif state == POLL_READ:
select.select([conn.fileno()], [], [], 1.0) # 1 second timeout
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [], 1.0)
set_wait_callback(custom_wait_callback)PostgreSQL's asynchronous notification system for inter-process communication.
class Notify:
"""PostgreSQL notification message."""
@property
def channel(self):
"""Notification channel name."""
@property
def payload(self):
"""Notification payload data."""
@property
def pid(self):
"""Process ID that sent notification."""
# Connection notification methods
class connection:
def notifies(self):
"""
Get pending notifications.
Returns:
list: List of Notify objects
"""Usage examples:
# Setup listener connection
listener_conn = psycopg2.connect(...)
listener_conn.autocommit = True
# Listen for notifications
cur = listener_conn.cursor()
cur.execute("LISTEN order_updates")
cur.execute("LISTEN inventory_changes")
# Setup notifier connection
notifier_conn = psycopg2.connect(...)
notifier_conn.autocommit = True
# Send notification
notifier_cur = notifier_conn.cursor()
notifier_cur.execute("NOTIFY order_updates, 'Order 12345 shipped'")
# Check for notifications (polling)
listener_conn.poll()
notifies = listener_conn.notifies()
for notify in notifies:
print(f"Channel: {notify.channel}")
print(f"Payload: {notify.payload}")
print(f"From PID: {notify.pid}")
# Async notification handling
def notification_handler():
while True:
# Wait for data
select.select([listener_conn], [], [])
# Poll connection
listener_conn.poll()
# Process notifications
while listener_conn.notifies():
notify = listener_conn.notifies().pop(0)
handle_notification(notify)
def handle_notification(notify):
if notify.channel == 'order_updates':
process_order_update(notify.payload)
elif notify.channel == 'inventory_changes':
process_inventory_change(notify.payload)Support for PostgreSQL streaming replication (physical and logical).
class ReplicationConnection(connection):
"""Connection for replication operations."""
def __init__(self, *args, **kwargs):
"""Initialize replication connection."""
class ReplicationCursor(cursor):
"""Cursor for replication operations."""
def start_replication(self, slot_name=None, decode=False, start_lsn=None,
timeline=None, options=None):
"""
Start replication stream.
Parameters:
- slot_name (str, optional): Replication slot name
- decode (bool): Logical decoding mode
- start_lsn (str, optional): Starting LSN
- timeline (int, optional): Timeline ID
- options (dict, optional): Additional options
"""
def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False):
"""
Send replication feedback.
Parameters:
- write_lsn (int): Write LSN
- flush_lsn (int): Flush LSN
- apply_lsn (int): Apply LSN
- reply (bool): Request reply
"""
def create_replication_slot(self, slot_name, output_plugin=None):
"""
Create replication slot.
Parameters:
- slot_name (str): Slot name
- output_plugin (str, optional): Output plugin for logical slots
"""
def drop_replication_slot(self, slot_name):
"""
Drop replication slot.
Parameters:
- slot_name (str): Slot name to drop
"""
class ReplicationMessage:
"""Replication stream message."""
@property
def data_start(self):
"""Message data start LSN."""
@property
def wal_end(self):
"""WAL end LSN."""
@property
def send_time(self):
"""Message send time."""
@property
def payload(self):
"""Message payload data."""
# Replication constants
REPLICATION_PHYSICAL: int # Physical replication mode
REPLICATION_LOGICAL: int # Logical replication modeUsage examples:
from psycopg2.extras import (
ReplicationConnection,
REPLICATION_PHYSICAL,
REPLICATION_LOGICAL
)
# Physical replication
repl_conn = psycopg2.connect(
host='master-server',
user='replication_user',
connection_factory=ReplicationConnection
)
cur = repl_conn.cursor()
# Create physical replication slot
cur.create_replication_slot('physical_slot')
# Start physical replication
cur.start_replication(slot_name='physical_slot')
# Process replication messages
def consume_stream():
for msg in cur:
# Process WAL data
process_wal_message(msg)
# Send feedback periodically
cur.send_feedback(flush_lsn=msg.data_start)
# Logical replication
logical_conn = psycopg2.connect(
connection_factory=ReplicationConnection,
...
)
logical_cur = logical_conn.cursor()
# Create logical replication slot
logical_cur.create_replication_slot('logical_slot', 'test_decoding')
# Start logical replication
logical_cur.start_replication(
slot_name='logical_slot',
decode=True,
options={'include-xids': 0, 'skip-empty-xacts': 1}
)
# Process logical changes
for msg in logical_cur:
change_data = msg.payload.decode('utf-8')
process_logical_change(change_data)Access to connection state and PostgreSQL server information.
class ConnectionInfo:
"""Connection information and state."""
@property
def dbname(self):
"""Database name."""
@property
def user(self):
"""Connected user."""
@property
def password(self):
"""Connection password (masked)."""
@property
def host(self):
"""Server host."""
@property
def port(self):
"""Server port."""
@property
def options(self):
"""Connection options."""
@property
def dsn_parameters(self):
"""All DSN parameters as dict."""
@property
def status(self):
"""Connection status."""
@property
def transaction_status(self):
"""Transaction status."""
@property
def protocol_version(self):
"""Protocol version."""
@property
def server_version(self):
"""Server version."""
@property
def error_message(self):
"""Last error message."""
@property
def backend_pid(self):
"""Backend process ID."""
@property
def needs_password(self):
"""Whether connection needs password."""
@property
def used_password(self):
"""Whether password was used."""
@property
def ssl_in_use(self):
"""Whether SSL is in use."""
class Diagnostics:
"""Error diagnostics information."""
def __init__(self, exception):
"""Initialize from exception."""
@property
def severity(self):
"""Error severity."""
@property
def sqlstate(self):
"""SQL state code."""
@property
def message_primary(self):
"""Primary error message."""
@property
def message_detail(self):
"""Detailed error message."""
@property
def message_hint(self):
"""Error hint."""
@property
def statement_position(self):
"""Error position in statement."""
@property
def internal_position(self):
"""Internal statement position."""
@property
def internal_query(self):
"""Internal query causing error."""
@property
def context(self):
"""Error context."""
@property
def schema_name(self):
"""Schema name related to error."""
@property
def table_name(self):
"""Table name related to error."""
@property
def column_name(self):
"""Column name related to error."""
@property
def datatype_name(self):
"""Data type name related to error."""
@property
def constraint_name(self):
"""Constraint name related to error."""
@property
def source_file(self):
"""Source file where error occurred."""
@property
def source_line(self):
"""Source line where error occurred."""
@property
def source_function(self):
"""Source function where error occurred."""Usage examples:
# Connection information
conn_info = conn.info
print(f"Database: {conn_info.dbname}")
print(f"User: {conn_info.user}")
print(f"Host: {conn_info.host}:{conn_info.port}")
print(f"Server version: {conn_info.server_version}")
print(f"Backend PID: {conn_info.backend_pid}")
print(f"SSL in use: {conn_info.ssl_in_use}")
# Detailed error diagnostics
try:
cur.execute("INSERT INTO users (id, email) VALUES (1, 'invalid-email')")
except psycopg2.IntegrityError as e:
diag = psycopg2.extensions.Diagnostics(e)
print(f"Error: {diag.message_primary}")
print(f"Detail: {diag.message_detail}")
print(f"Hint: {diag.message_hint}")
print(f"SQL State: {diag.sqlstate}")
print(f"Table: {diag.table_name}")
print(f"Constraint: {diag.constraint_name}")Password encryption and security-related functionality.
def encrypt_password(password, user, scope=None, algorithm=None):
"""
Encrypt password for PostgreSQL authentication.
Parameters:
- password (str): Plain text password
- user (str): Username
- scope (connection, optional): Connection scope
- algorithm (str, optional): Encryption algorithm
Returns:
str: Encrypted password string
"""Usage examples:
# Encrypt password for storage
encrypted = psycopg2.extensions.encrypt_password('mypassword', 'myuser')
print(encrypted) # 'md5...' or 'SCRAM-SHA-256$...'
# Use with connection
conn = psycopg2.connect(
host='localhost',
user='myuser',
password=encrypted # Can use pre-encrypted password
)ReplicationSlotInfo = {
'slot_name': str,
'consistent_point': str,
'snapshot_name': str,
'output_plugin': str
}
ReplicationOptions = {
'include-xids': int,
'skip-empty-xacts': int,
'include-rewrites': int,
'pretty-print': int
}# Access modes
INV_READ: int = 0x40 # Read access
INV_WRITE: int = 0x80 # Write access
# Seek origins
SEEK_SET: int = 0 # From beginning
SEEK_CUR: int = 1 # From current position
SEEK_END: int = 2 # From endInstall with Tessl CLI
npx tessl i tessl/pypi-psycopg2-binary