CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-py-postgresql

PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.

Pending
Overview
Eval results
Files

advanced-features.mddocs/

Advanced Features

Advanced PostgreSQL features including COPY operations, LISTEN/NOTIFY, advisory locks, and streaming results for high-performance database applications.

Capabilities

COPY Operations

High-performance bulk data import/export using PostgreSQL's COPY protocol for efficient data transfer.

class CopyManager:
    """
    Manager for PostgreSQL COPY operations providing bulk data import/export.
    """
    
    def load_rows(statement, rows):
        """
        Load rows using COPY FROM for bulk insert operations.
        
        Parameters:
        - statement (str): COPY FROM statement
        - rows (iterable): Rows to insert (tuples or lists)
        
        Returns:
        int: Number of rows loaded
        
        Raises:
        CopyError: If COPY operation fails
        """
    
    def dump_rows(statement):
        """
        Dump rows using COPY TO for bulk export operations.
        
        Parameters:
        - statement (str): COPY TO statement
        
        Returns:
        Iterator: Iterator over exported row data
        
        Raises:
        CopyError: If COPY operation fails
        """
    
    def load_file(statement, file_path):
        """
        Load data from file using COPY FROM.
        
        Parameters:
        - statement (str): COPY FROM statement
        - file_path (str): Path to source file
        
        Returns:
        int: Number of rows loaded
        """
    
    def dump_file(statement, file_path):
        """
        Dump data to file using COPY TO.
        
        Parameters:
        - statement (str): COPY TO statement  
        - file_path (str): Path to destination file
        
        Returns:
        int: Number of rows dumped
        """

LISTEN/NOTIFY Support

Asynchronous notification system for real-time communication between database sessions.

class NotificationManager:
    """
    Manager for PostgreSQL LISTEN/NOTIFY asynchronous messaging.
    """
    
    def listen(channel):
        """
        Start listening for notifications on a channel.
        
        Parameters:
        - channel (str): Channel name to listen on
        
        Raises:
        NotificationError: If listen operation fails
        """
    
    def unlisten(channel=None):
        """
        Stop listening for notifications.
        
        Parameters:  
        - channel (str, optional): Specific channel to unlisten (all if None)
        
        Raises:
        NotificationError: If unlisten operation fails
        """
    
    def notify(channel, payload=None):
        """
        Send notification to a channel.
        
        Parameters:
        - channel (str): Channel name to notify
        - payload (str, optional): Optional message payload
        
        Raises:
        NotificationError: If notify operation fails
        """
    
    def get_notifications():
        """
        Get pending notifications (non-blocking).
        
        Returns:
        list: List of notification objects with channel, payload, pid
        """
    
    def wait_for_notification(timeout=None):
        """
        Wait for next notification (blocking).
        
        Parameters:
        - timeout (float, optional): Timeout in seconds (infinite if None)
        
        Returns:
        dict or None: Notification object or None if timeout
        """

class Notification:
    """Notification message from PostgreSQL."""
    
    @property
    def channel():
        """Channel name that received the notification."""
    
    @property
    def payload():
        """Optional payload data."""
    
    @property
    def pid():
        """Process ID of the notifying backend."""

Advisory Locks

PostgreSQL advisory locks for application-level synchronization and coordination.

class ALock:
    """
    Base class for PostgreSQL advisory locks.
    """
    
    def acquire(blocking=True):
        """
        Acquire the advisory lock.
        
        Parameters:
        - blocking (bool): Whether to block until lock is available
        
        Returns:
        bool: True if lock acquired, False if non-blocking and unavailable
        
        Raises:
        LockError: If lock acquisition fails
        """
    
    def release():
        """
        Release the advisory lock.
        
        Raises:
        LockError: If lock release fails
        """
    
    def __enter__():
        """Context manager entry - acquire lock."""
    
    def __exit__(exc_type, exc_val, exc_tb):
        """Context manager exit - release lock."""
    
    @property
    def is_held():
        """
        Check if lock is currently held by this session.
        
        Returns:
        bool: True if lock is held
        """

class ExclusiveLock(ALock):
    """
    Exclusive advisory lock - only one session can hold it.
    """
    
    def __init__(lock_id):
        """
        Create exclusive advisory lock.
        
        Parameters:
        - lock_id (int or tuple): Lock identifier (int or pair of ints)
        """

class ShareLock(ALock):
    """
    Shared advisory lock - multiple sessions can hold it simultaneously.
    """
    
    def __init__(lock_id):
        """
        Create shared advisory lock.
        
        Parameters:
        - lock_id (int or tuple): Lock identifier (int or pair of ints)
        """

Streaming Results

Interfaces for streaming large result sets without loading all data into memory.

class ResultStream:
    """
    Streaming interface for large query results.
    """
    
    def __iter__():
        """Iterate over result rows."""
    
    def __next__():
        """Get next result row."""
    
    def close():
        """Close the result stream."""
    
    @property
    def description():
        """
        Get column description information.
        
        Returns:
        list: Column metadata
        """

def stream_query(connection, query, *parameters):
    """
    Execute query and return streaming result interface.
    
    Parameters:
    - connection: Database connection
    - query (str): SQL query to execute
    - *parameters: Query parameters
    
    Returns:
    ResultStream: Streaming result interface
    """

Connection Pooling Utilities

Utilities for managing connection pools and connection lifecycle.

class ConnectionPool:
    """
    Connection pool for managing database connections.
    """
    
    def __init__(connector, min_size=1, max_size=10):
        """
        Create connection pool.
        
        Parameters:
        - connector: Connection factory
        - min_size (int): Minimum pool size
        - max_size (int): Maximum pool size
        """
    
    def get_connection():
        """
        Get connection from pool.
        
        Returns:
        Connection: Database connection from pool
        """
    
    def return_connection(connection):
        """
        Return connection to pool.
        
        Parameters:
        - connection: Connection to return
        """
    
    def close_all():
        """Close all connections in pool."""
    
    @property
    def size():
        """Current pool size."""
    
    @property
    def available():
        """Number of available connections."""

Stored Procedure Interface

Interface for calling PostgreSQL stored procedures and functions with parameter binding and result handling.

class StoredProcedure:
    """
    Interface for calling PostgreSQL stored procedures and functions.
    """
    
    def __call__(*args, **kw):
        """
        Execute the stored procedure with provided arguments.
        
        Parameters:
        - *args: Positional parameters for the procedure
        - **kw: Keyword parameters for the procedure
        
        Returns:
        Procedure result (varies by procedure type)
        
        Raises:
        ProcedureError: If procedure execution fails
        """
    
    @property
    def name():
        """
        Get procedure name.
        
        Returns:
        str: Fully qualified procedure name
        """
    
    @property
    def parameter_types():
        """
        Get parameter type information.
        
        Returns:
        List[int]: PostgreSQL type OIDs for parameters
        """
    
    @property
    def return_type():
        """
        Get return type information.
        
        Returns:
        int: PostgreSQL type OID for return value
        """

Usage Examples

COPY Operations for Bulk Data

import postgresql
import postgresql.copyman as copy_manager
import csv

db = postgresql.open('pq://user:pass@localhost/mydb')

# Create test table
db.execute("""
    CREATE TABLE IF NOT EXISTS bulk_data (
        id SERIAL PRIMARY KEY,
        name TEXT,
        value NUMERIC,
        created_date DATE
    )
""")

# Bulk insert using COPY
def bulk_insert_with_copy():
    """Bulk insert data using COPY for high performance."""
    
    # Prepare data
    data_rows = []
    for i in range(10000):
        data_rows.append((f"Item {i}", i * 1.5, '2023-01-01'))
    
    # Use COPY for bulk insert
    copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV)"
    
    copy_mgr = copy_manager.CopyManager(db)
    rows_loaded = copy_mgr.load_rows(copy_stmt, data_rows)
    
    print(f"Loaded {rows_loaded} rows using COPY")

# Bulk export using COPY
def bulk_export_with_copy():
    """Export data using COPY for high performance."""
    
    copy_stmt = "COPY bulk_data TO STDOUT WITH (FORMAT CSV, HEADER)"
    
    copy_mgr = copy_manager.CopyManager(db)
    
    # Export to file
    copy_mgr.dump_file(copy_stmt, "/tmp/exported_data.csv")
    print("Data exported to /tmp/exported_data.csv")
    
    # Or stream export data
    row_count = 0
    for row_data in copy_mgr.dump_rows(copy_stmt):
        row_count += 1
        if row_count <= 5:  # Show first 5 rows
            print(f"Exported row: {row_data}")
    
    print(f"Total rows exported: {row_count}")

# Import from CSV file
def import_csv_file():
    """Import data from CSV file using COPY."""
    
    # Create CSV file
    with open('/tmp/import_data.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['name', 'value', 'created_date'])  # Header
        for i in range(5000):
            writer.writerow([f"CSV Item {i}", i * 2.0, '2023-06-01'])
    
    # Import using COPY
    copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV, HEADER)"
    
    copy_mgr = copy_manager.CopyManager(db)
    rows_loaded = copy_mgr.load_file(copy_stmt, '/tmp/import_data.csv')
    
    print(f"Imported {rows_loaded} rows from CSV file")

# Execute bulk operations
bulk_insert_with_copy()
bulk_export_with_copy()
import_csv_file()

LISTEN/NOTIFY for Real-time Communication

import postgresql
import postgresql.notifyman as notify
import threading
import time

# Set up two connections for demonstration
publisher = postgresql.open('pq://user:pass@localhost/mydb')
subscriber = postgresql.open('pq://user:pass@localhost/mydb')

def notification_publisher():
    """Publish notifications to channels."""
    
    notifier = notify.NotificationManager(publisher)
    
    for i in range(10):
        # Send notifications to different channels
        notifier.notify('events', f'Event {i}: Something happened')
        notifier.notify('alerts', f'Alert {i}: Check system status')
        
        print(f"Published notification {i}")
        time.sleep(2)
    
    # Send termination signal
    notifier.notify('events', 'TERMINATE')

def notification_subscriber():
    """Subscribe to notifications and process them."""
    
    listener = notify.NotificationManager(subscriber)
    
    # Listen to multiple channels
    listener.listen('events')
    listener.listen('alerts')
    
    print("Listening for notifications...")
    
    while True:
        # Wait for notifications (blocking)
        notification = listener.wait_for_notification(timeout=30)
        
        if notification:
            channel = notification.channel
            payload = notification.payload
            sender_pid = notification.pid
            
            print(f"Received on '{channel}': {payload} (from PID {sender_pid})")
            
            # Check for termination signal
            if payload == 'TERMINATE':
                print("Termination signal received, stopping listener")
                break
        else:
            print("Notification timeout")
            break
    
    # Clean up
    listener.unlisten()  # Unlisten from all channels

# Run publisher and subscriber in separate threads
subscriber_thread = threading.Thread(target=notification_subscriber)
publisher_thread = threading.Thread(target=notification_publisher)

subscriber_thread.start()
time.sleep(1)  # Let subscriber start first
publisher_thread.start()

# Wait for both threads to complete
subscriber_thread.join()
publisher_thread.join()

publisher.close()
subscriber.close()

Advisory Locks for Coordination

import postgresql
import postgresql.alock as advisory_locks
import threading
import time

db1 = postgresql.open('pq://user:pass@localhost/mydb')
db2 = postgresql.open('pq://user:pass@localhost/mydb')

def exclusive_lock_example():
    """Demonstrate exclusive advisory locks."""
    
    def worker(worker_id, connection):
        lock = advisory_locks.ExclusiveLock(12345)  # Lock ID
        lock.connection = connection
        
        print(f"Worker {worker_id}: Attempting to acquire exclusive lock")
        
        # Try to acquire lock (blocking)
        if lock.acquire():
            print(f"Worker {worker_id}: Acquired exclusive lock")
            
            # Simulate work
            time.sleep(3)
            
            print(f"Worker {worker_id}: Releasing exclusive lock")
            lock.release()
        else:
            print(f"Worker {worker_id}: Failed to acquire lock")
    
    # Start two workers competing for the same lock
    thread1 = threading.Thread(target=worker, args=(1, db1))
    thread2 = threading.Thread(target=worker, args=(2, db2))
    
    thread1.start()
    thread2.start()
    
    thread1.join()
    thread2.join()

def shared_lock_example():
    """Demonstrate shared advisory locks."""
    
    def reader(reader_id, connection):
        lock = advisory_locks.ShareLock(54321)  # Shared lock ID
        lock.connection = connection
        
        print(f"Reader {reader_id}: Acquiring shared lock")
        
        with lock:  # Context manager automatically acquires/releases
            print(f"Reader {reader_id}: Reading data (shared access)")
            time.sleep(2)
            print(f"Reader {reader_id}: Finished reading")
    
    def writer(connection):
        lock = advisory_locks.ExclusiveLock(54321)  # Same ID as readers
        lock.connection = connection
        
        print("Writer: Waiting for exclusive access")
        
        with lock:
            print("Writer: Writing data (exclusive access)")
            time.sleep(3)
            print("Writer: Finished writing")
    
    # Start multiple readers and one writer
    reader_threads = []
    for i in range(3):
        thread = threading.Thread(target=reader, args=(i+1, db1))
        reader_threads.append(thread)
        thread.start()
    
    time.sleep(1)  # Let readers start first
    
    writer_thread = threading.Thread(target=writer, args=(db2,))
    writer_thread.start()
    
    # Wait for all threads
    for thread in reader_threads:
        thread.join()
    writer_thread.join()

def distributed_counter_example():
    """Implement distributed counter using advisory locks."""
    
    # Create counter table
    db1.execute("""
        CREATE TABLE IF NOT EXISTS distributed_counter (
            name TEXT PRIMARY KEY,
            value INTEGER DEFAULT 0
        )
    """)
    
    # Initialize counter
    db1.execute("INSERT INTO distributed_counter (name, value) VALUES ('global', 0) ON CONFLICT (name) DO NOTHING")
    
    def increment_counter(worker_id, connection, increments):
        for i in range(increments):
            # Use lock to ensure atomic counter increment
            lock = advisory_locks.ExclusiveLock(99999)  # Counter lock ID
            lock.connection = connection
            
            with lock:
                # Read current value
                current = connection.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
                
                # Increment
                new_value = current + 1
                connection.execute("UPDATE distributed_counter SET value = $1 WHERE name = 'global'", new_value)
                
                print(f"Worker {worker_id}: Incremented counter to {new_value}")
            
            time.sleep(0.1)  # Small delay between increments
    
    # Start multiple workers incrementing the counter
    workers = []
    for i in range(3):
        connection = postgresql.open('pq://user:pass@localhost/mydb')
        thread = threading.Thread(target=increment_counter, args=(i+1, connection, 5))
        workers.append((thread, connection))
        thread.start()
    
    # Wait for all workers
    for thread, connection in workers:
        thread.join()
        connection.close()
    
    # Check final counter value
    final_value = db1.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
    print(f"Final counter value: {final_value}")

print("=== Exclusive Lock Example ===")
exclusive_lock_example()

print("\n=== Shared Lock Example ===")
shared_lock_example()

print("\n=== Distributed Counter Example ===")
distributed_counter_example()

db1.close()
db2.close()

Streaming Large Result Sets

import postgresql
import time

db = postgresql.open('pq://user:pass@localhost/mydb')

# Create large test dataset
db.execute("""
    CREATE TABLE IF NOT EXISTS large_dataset AS
    SELECT 
        generate_series(1, 1000000) as id,
        'Item ' || generate_series(1, 1000000) as name,
        random() * 1000 as value,
        NOW() - (random() * interval '365 days') as created_at
""")

def stream_large_results():
    """Stream large result sets to avoid memory issues."""
    
    # Prepare streaming query
    query = db.prepare("""
        SELECT id, name, value, created_at 
        FROM large_dataset 
        WHERE value > $1 
        ORDER BY value DESC
    """)
    
    print("Starting streaming query...")
    start_time = time.time()
    
    # Stream results instead of loading all into memory
    row_count = 0
    total_value = 0
    
    for row in query.rows(500):  # Stream rows where value > 500
        row_count += 1
        total_value += row['value']
        
        # Process row (show first 10)
        if row_count <= 10:
            print(f"Row {row_count}: ID={row['id']}, Name={row['name']}, Value={row['value']:.2f}")
        elif row_count % 10000 == 0:
            print(f"Processed {row_count} rows...")
    
    end_time = time.time()
    avg_value = total_value / row_count if row_count > 0 else 0
    
    print(f"Streaming complete:")
    print(f"  Rows processed: {row_count}")
    print(f"  Average value: {avg_value:.2f}")
    print(f"  Time taken: {end_time - start_time:.2f} seconds")

def chunk_processing():
    """Process large datasets in chunks."""
    
    query = db.prepare("SELECT * FROM large_dataset ORDER BY id")
    
    print("Starting chunk processing...")
    
    chunk_size = 0
    chunk_count = 0
    total_processed = 0
    
    # Process data in chunks
    for chunk in query.chunks():
        chunk_count += 1
        chunk_size = len(chunk)
        total_processed += chunk_size
        
        # Process chunk
        chunk_sum = sum(row['value'] for row in chunk)
        chunk_avg = chunk_sum / chunk_size
        
        print(f"Chunk {chunk_count}: {chunk_size} rows, avg value: {chunk_avg:.2f}")
        
        # Simulate processing time
        time.sleep(0.1)
        
        # Limit for demonstration
        if chunk_count >= 10:
            break
    
    print(f"Chunk processing complete - {total_processed} rows in {chunk_count} chunks")

def memory_efficient_aggregation():
    """Perform aggregations on large datasets without loading all data."""
    
    query = db.prepare("""
        SELECT 
            EXTRACT(month FROM created_at) as month,
            id, value
        FROM large_dataset 
        ORDER BY created_at
    """)
    
    # Track monthly statistics
    monthly_stats = {}
    
    print("Computing monthly statistics...")
    
    for row in query.rows():
        month = int(row['month'])
        value = row['value']
        
        if month not in monthly_stats:
            monthly_stats[month] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}
        
        stats = monthly_stats[month]
        stats['count'] += 1
        stats['sum'] += value
        stats['min'] = min(stats['min'], value)
        stats['max'] = max(stats['max'], value)
    
    # Display results
    print("Monthly statistics:")
    for month in sorted(monthly_stats.keys()):
        stats = monthly_stats[month]
        avg = stats['sum'] / stats['count']
        print(f"  Month {month:2d}: {stats['count']:6d} rows, "
              f"avg: {avg:6.2f}, min: {stats['min']:6.2f}, max: {stats['max']:6.2f}")

# Run streaming examples
stream_large_results()
print("\n" + "="*50 + "\n")
chunk_processing()
print("\n" + "="*50 + "\n")
memory_efficient_aggregation()

db.close()

Connection Pool Management

import postgresql
import postgresql.pool as connection_pool
import threading
import time
import random

def connection_pool_example():
    """Demonstrate connection pooling for concurrent access."""
    
    # Create connection factory
    connector = postgresql.open('&pq://user:pass@localhost/mydb')
    
    # Create connection pool
    pool = connection_pool.ConnectionPool(
        connector, 
        min_size=2, 
        max_size=5
    )
    
    def worker(worker_id, num_operations):
        """Worker function that uses pooled connections."""
        
        for i in range(num_operations):
            # Get connection from pool
            conn = pool.get_connection()
            
            try:
                # Simulate database work
                result = conn.query("SELECT $1 as worker_id, $2 as operation, NOW() as timestamp", 
                                    worker_id, i)
                
                print(f"Worker {worker_id}, Op {i}: {result[0]['timestamp']}")
                
                # Simulate processing time
                time.sleep(random.uniform(0.1, 0.5))
                
            finally:
                # Always return connection to pool
                pool.return_connection(conn)
            
            # Small delay between operations
            time.sleep(0.1)
    
    print(f"Starting connection pool with {pool.size} connections")
    
    # Start multiple workers
    workers = []
    for i in range(8):  # More workers than pool size
        thread = threading.Thread(target=worker, args=(i+1, 3))
        workers.append(thread)
        thread.start()
    
    # Monitor pool status
    def monitor_pool():
        for _ in range(10):
            print(f"Pool status - Size: {pool.size}, Available: {pool.available}")
            time.sleep(1)
    
    monitor_thread = threading.Thread(target=monitor_pool)
    monitor_thread.start()
    
    # Wait for all workers to complete
    for thread in workers:
        thread.join()
    
    monitor_thread.join()
    
    # Clean up
    pool.close_all()
    print("Connection pool closed")

# Run connection pool example
connection_pool_example()

Install with Tessl CLI

npx tessl i tessl/pypi-py-postgresql

docs

advanced-features.md

cluster-management.md

connection-management.md

dbapi-interface.md

exception-handling.md

index.md

query-execution.md

transaction-management.md

type-system.md

tile.json