CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-psycopg2-binary

PostgreSQL database adapter for Python with thread-safe connection pooling and SQL operations

Pending
Overview
Eval results
Files

advanced-features.mddocs/

Advanced PostgreSQL Features

Large objects, server-side cursors, asynchronous operations, notifications (LISTEN/NOTIFY), replication support, and other PostgreSQL-specific advanced functionality. These features leverage PostgreSQL's unique capabilities for high-performance applications.

Capabilities

Large Objects (LOB)

PostgreSQL large object interface for handling binary data larger than typical field limits.

class lobject:
    """PostgreSQL large object interface."""
    
    def __init__(self, conn, oid=0, mode='r', new_oid=None, new_file=None):
        """
        Initialize large object.
        
        Parameters:
        - conn (connection): Database connection
        - oid (int): Existing large object OID (0 for new)
        - mode (str): Access mode ('r', 'w', 'rw', 'n')
        - new_oid (int, optional): OID for new object
        - new_file (str, optional): File to import
        """
    
    def read(self, size=-1):
        """
        Read data from large object.
        
        Parameters:
        - size (int): Bytes to read (-1 for all)
        
        Returns:
        bytes: Data read from object
        """
    
    def write(self, data):
        """
        Write data to large object.
        
        Parameters:
        - data (bytes): Data to write
        
        Returns:
        int: Number of bytes written
        """
    
    def seek(self, pos, whence=0):
        """
        Seek to position in large object.
        
        Parameters:
        - pos (int): Position to seek to
        - whence (int): Seek origin (0=start, 1=current, 2=end)
        
        Returns:
        int: New position
        """
    
    def tell(self):
        """
        Get current position.
        
        Returns:
        int: Current position in object
        """
    
    def truncate(self, size=0):
        """
        Truncate large object.
        
        Parameters:
        - size (int): New size
        """
    
    def close(self):
        """Close large object."""
    
    def export(self, filename):
        """
        Export large object to file.
        
        Parameters:
        - filename (str): Target filename
        """
    
    def unlink(self):
        """Delete large object from database."""
    
    @property
    def oid(self):
        """Large object OID."""
    
    @property
    def mode(self):
        """Access mode."""
    
    @property
    def closed(self):
        """Closed status."""

Usage examples:

# Create new large object
lobj = conn.lobject(mode='w')
lobj.write(b'Large binary data here...')
oid = lobj.oid
lobj.close()

# Store OID in table
cur.execute("INSERT INTO documents (name, data_oid) VALUES (%s, %s)", 
           ('document.pdf', oid))

# Read large object
cur.execute("SELECT data_oid FROM documents WHERE name = %s", ('document.pdf',))
oid = cur.fetchone()[0]

lobj = conn.lobject(oid, mode='r')
data = lobj.read()
lobj.close()

# Work with files
lobj = conn.lobject(mode='w', new_file='/path/to/large_file.bin')
stored_oid = lobj.oid

# Export large object
lobj = conn.lobject(oid, mode='r')
lobj.export('/path/to/exported_file.bin')
lobj.close()

Asynchronous Operations

Support for non-blocking database operations with polling and wait callbacks.

def set_wait_callback(f):
    """
    Set global wait callback for async operations.
    
    Parameters:
    - f (callable): Callback function for waiting
    """

def get_wait_callback():
    """
    Get current wait callback.
    
    Returns:
    callable/None: Current wait callback
    """

# Connection polling constants
POLL_OK: int        # Operation completed
POLL_READ: int      # Wait for socket read
POLL_WRITE: int     # Wait for socket write  
POLL_ERROR: int     # Error occurred

Usage examples:

import select
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE

# Create async connection
conn = psycopg2.connect(..., async_=True)

# Wait for connection to complete
def wait_for_connection(conn):
    while True:
        state = conn.poll()
        if state == POLL_OK:
            break
        elif state == POLL_READ:
            select.select([conn.fileno()], [], [])
        elif state == POLL_WRITE:
            select.select([], [conn.fileno()], [])
        else:
            raise Exception("Connection failed")

wait_for_connection(conn)

# Async query execution
cur = conn.cursor()
cur.execute("SELECT * FROM large_table")

# Poll for query completion
def wait_for_query(conn):
    while True:
        state = conn.poll()
        if state == POLL_OK:
            return
        elif state == POLL_READ:
            select.select([conn.fileno()], [], [])
        elif state == POLL_WRITE:
            select.select([], [conn.fileno()], [])

wait_for_query(conn)
results = cur.fetchall()

# Custom wait callback
def custom_wait_callback(conn):
    """Custom wait callback using select."""
    while True:
        state = conn.poll()
        if state == POLL_OK:
            break
        elif state == POLL_READ:
            select.select([conn.fileno()], [], [], 1.0)  # 1 second timeout
        elif state == POLL_WRITE:
            select.select([], [conn.fileno()], [], 1.0)

set_wait_callback(custom_wait_callback)

Notifications (LISTEN/NOTIFY)

PostgreSQL's asynchronous notification system for inter-process communication.

class Notify:
    """PostgreSQL notification message."""
    
    @property
    def channel(self):
        """Notification channel name."""
    
    @property
    def payload(self):
        """Notification payload data."""
    
    @property
    def pid(self):
        """Process ID that sent notification."""

# Connection notification methods
class connection:
    def notifies(self):
        """
        Get pending notifications.
        
        Returns:
        list: List of Notify objects
        """

Usage examples:

# Setup listener connection
listener_conn = psycopg2.connect(...)
listener_conn.autocommit = True

# Listen for notifications
cur = listener_conn.cursor()
cur.execute("LISTEN order_updates")
cur.execute("LISTEN inventory_changes")

# Setup notifier connection  
notifier_conn = psycopg2.connect(...)
notifier_conn.autocommit = True

# Send notification
notifier_cur = notifier_conn.cursor()
notifier_cur.execute("NOTIFY order_updates, 'Order 12345 shipped'")

# Check for notifications (polling)
listener_conn.poll()
notifies = listener_conn.notifies()
for notify in notifies:
    print(f"Channel: {notify.channel}")
    print(f"Payload: {notify.payload}") 
    print(f"From PID: {notify.pid}")

# Async notification handling
def notification_handler():
    while True:
        # Wait for data
        select.select([listener_conn], [], [])
        
        # Poll connection
        listener_conn.poll()
        
        # Process notifications
        while listener_conn.notifies():
            notify = listener_conn.notifies().pop(0)
            handle_notification(notify)

def handle_notification(notify):
    if notify.channel == 'order_updates':
        process_order_update(notify.payload)
    elif notify.channel == 'inventory_changes':
        process_inventory_change(notify.payload)

Replication Support

Support for PostgreSQL streaming replication (physical and logical).

class ReplicationConnection(connection):
    """Connection for replication operations."""
    
    def __init__(self, *args, **kwargs):
        """Initialize replication connection."""

class ReplicationCursor(cursor):
    """Cursor for replication operations."""
    
    def start_replication(self, slot_name=None, decode=False, start_lsn=None, 
                         timeline=None, options=None):
        """
        Start replication stream.
        
        Parameters:
        - slot_name (str, optional): Replication slot name
        - decode (bool): Logical decoding mode
        - start_lsn (str, optional): Starting LSN
        - timeline (int, optional): Timeline ID
        - options (dict, optional): Additional options
        """
    
    def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False):
        """
        Send replication feedback.
        
        Parameters:
        - write_lsn (int): Write LSN
        - flush_lsn (int): Flush LSN  
        - apply_lsn (int): Apply LSN
        - reply (bool): Request reply
        """
    
    def create_replication_slot(self, slot_name, output_plugin=None):
        """
        Create replication slot.
        
        Parameters:
        - slot_name (str): Slot name
        - output_plugin (str, optional): Output plugin for logical slots
        """
    
    def drop_replication_slot(self, slot_name):
        """
        Drop replication slot.
        
        Parameters:
        - slot_name (str): Slot name to drop
        """

class ReplicationMessage:
    """Replication stream message."""
    
    @property
    def data_start(self):
        """Message data start LSN."""
    
    @property  
    def wal_end(self):
        """WAL end LSN."""
    
    @property
    def send_time(self):
        """Message send time."""
    
    @property
    def payload(self):
        """Message payload data."""

# Replication constants
REPLICATION_PHYSICAL: int  # Physical replication mode
REPLICATION_LOGICAL: int   # Logical replication mode

Usage examples:

from psycopg2.extras import (
    ReplicationConnection, 
    REPLICATION_PHYSICAL,
    REPLICATION_LOGICAL
)

# Physical replication
repl_conn = psycopg2.connect(
    host='master-server',
    user='replication_user',
    connection_factory=ReplicationConnection
)

cur = repl_conn.cursor()

# Create physical replication slot
cur.create_replication_slot('physical_slot')

# Start physical replication
cur.start_replication(slot_name='physical_slot')

# Process replication messages
def consume_stream():
    for msg in cur:
        # Process WAL data
        process_wal_message(msg)
        
        # Send feedback periodically
        cur.send_feedback(flush_lsn=msg.data_start)

# Logical replication
logical_conn = psycopg2.connect(
    connection_factory=ReplicationConnection,
    ...
)

logical_cur = logical_conn.cursor()

# Create logical replication slot
logical_cur.create_replication_slot('logical_slot', 'test_decoding')

# Start logical replication
logical_cur.start_replication(
    slot_name='logical_slot',
    decode=True,
    options={'include-xids': 0, 'skip-empty-xacts': 1}
)

# Process logical changes
for msg in logical_cur:
    change_data = msg.payload.decode('utf-8')
    process_logical_change(change_data)

Connection Information and Diagnostics

Access to connection state and PostgreSQL server information.

class ConnectionInfo:
    """Connection information and state."""
    
    @property
    def dbname(self):
        """Database name."""
    
    @property
    def user(self):
        """Connected user."""
    
    @property
    def password(self):
        """Connection password (masked)."""
    
    @property
    def host(self):
        """Server host."""
    
    @property
    def port(self):
        """Server port."""
    
    @property
    def options(self):
        """Connection options."""
    
    @property
    def dsn_parameters(self):
        """All DSN parameters as dict."""
    
    @property
    def status(self):
        """Connection status."""
    
    @property
    def transaction_status(self):
        """Transaction status."""
    
    @property
    def protocol_version(self):
        """Protocol version."""
    
    @property  
    def server_version(self):
        """Server version."""
    
    @property
    def error_message(self):
        """Last error message."""
    
    @property
    def backend_pid(self):
        """Backend process ID."""
    
    @property
    def needs_password(self):
        """Whether connection needs password."""
    
    @property
    def used_password(self):
        """Whether password was used."""
    
    @property
    def ssl_in_use(self):
        """Whether SSL is in use."""

class Diagnostics:
    """Error diagnostics information."""
    
    def __init__(self, exception):
        """Initialize from exception."""
    
    @property
    def severity(self):
        """Error severity."""
    
    @property
    def sqlstate(self):
        """SQL state code."""
    
    @property
    def message_primary(self):
        """Primary error message."""
    
    @property
    def message_detail(self):
        """Detailed error message."""
    
    @property
    def message_hint(self):
        """Error hint."""
    
    @property
    def statement_position(self):
        """Error position in statement."""
    
    @property
    def internal_position(self):
        """Internal statement position."""
    
    @property
    def internal_query(self):
        """Internal query causing error."""
    
    @property
    def context(self):
        """Error context."""
    
    @property
    def schema_name(self):
        """Schema name related to error."""
    
    @property
    def table_name(self):
        """Table name related to error."""
    
    @property
    def column_name(self):
        """Column name related to error."""
    
    @property
    def datatype_name(self):
        """Data type name related to error."""
    
    @property
    def constraint_name(self):
        """Constraint name related to error."""
    
    @property
    def source_file(self):
        """Source file where error occurred."""
    
    @property
    def source_line(self):
        """Source line where error occurred."""
    
    @property
    def source_function(self):
        """Source function where error occurred."""

Usage examples:

# Connection information
conn_info = conn.info
print(f"Database: {conn_info.dbname}")
print(f"User: {conn_info.user}")
print(f"Host: {conn_info.host}:{conn_info.port}")
print(f"Server version: {conn_info.server_version}")
print(f"Backend PID: {conn_info.backend_pid}")
print(f"SSL in use: {conn_info.ssl_in_use}")

# Detailed error diagnostics
try:
    cur.execute("INSERT INTO users (id, email) VALUES (1, 'invalid-email')")
except psycopg2.IntegrityError as e:
    diag = psycopg2.extensions.Diagnostics(e)
    print(f"Error: {diag.message_primary}")
    print(f"Detail: {diag.message_detail}")
    print(f"Hint: {diag.message_hint}")
    print(f"SQL State: {diag.sqlstate}")
    print(f"Table: {diag.table_name}")
    print(f"Constraint: {diag.constraint_name}")

Security Features

Password encryption and security-related functionality.

def encrypt_password(password, user, scope=None, algorithm=None):
    """
    Encrypt password for PostgreSQL authentication.
    
    Parameters:
    - password (str): Plain text password
    - user (str): Username
    - scope (connection, optional): Connection scope
    - algorithm (str, optional): Encryption algorithm
    
    Returns:
    str: Encrypted password string
    """

Usage examples:

# Encrypt password for storage
encrypted = psycopg2.extensions.encrypt_password('mypassword', 'myuser')
print(encrypted)  # 'md5...' or 'SCRAM-SHA-256$...'

# Use with connection
conn = psycopg2.connect(
    host='localhost',
    user='myuser',
    password=encrypted  # Can use pre-encrypted password
)

Types

Replication Types

ReplicationSlotInfo = {
    'slot_name': str,
    'consistent_point': str,
    'snapshot_name': str,
    'output_plugin': str
}

ReplicationOptions = {
    'include-xids': int,
    'skip-empty-xacts': int,
    'include-rewrites': int,
    'pretty-print': int
}

Large Object Constants

# Access modes
INV_READ: int = 0x40    # Read access
INV_WRITE: int = 0x80   # Write access  

# Seek origins
SEEK_SET: int = 0       # From beginning
SEEK_CUR: int = 1       # From current position
SEEK_END: int = 2       # From end

Install with Tessl CLI

npx tessl i tessl/pypi-psycopg2-binary

docs

advanced-features.md

connection-pooling.md

connections-cursors.md

cursors-rows.md

error-handling.md

index.md

sql-composition.md

types-adaptation.md

tile.json