CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-psycopg

PostgreSQL database adapter for Python

Pending
Overview
Eval results
Files

advanced-operations.mddocs/

Advanced Operations

High-performance operations including COPY for bulk data transfer, pipeline operations for batching, prepared statements, and server-side cursors for memory-efficient large result set processing.

Capabilities

COPY Operations

PostgreSQL's COPY protocol for high-performance bulk data transfer between Python and the database.

class Copy:
    """Synchronous COPY operations manager"""
    
    def __enter__(self) -> Copy:
        """Enter context manager for COPY operation"""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit context manager and finalize COPY"""
    
    def write(self, data: bytes) -> None:
        """
        Write raw bytes to COPY operation.
        
        Args:
            data: Raw bytes in COPY format
        """
    
    def write_row(self, row: Sequence[Any]) -> None:
        """
        Write single row to COPY operation.
        
        Args:
            row: Sequence of column values
        """
    
    def read(self) -> bytes:
        """
        Read raw bytes from COPY operation.
        
        Returns:
            Raw bytes in COPY format
        """
    
    def read_row(self) -> Sequence[Any] | None:
        """
        Read single row from COPY operation.
        
        Returns:
            Row as sequence of values, None if no more data
        """

class AsyncCopy:
    """Asynchronous COPY operations manager"""
    
    async def __aenter__(self) -> AsyncCopy:
        """Enter async context manager for COPY operation"""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit async context manager and finalize COPY"""
    
    async def write(self, data: bytes) -> None:
        """Async version of write()"""
    
    async def write_row(self, row: Sequence[Any]) -> None:
        """Async version of write_row()"""
    
    async def read(self) -> bytes:
        """Async version of read()"""
    
    async def read_row(self) -> Sequence[Any] | None:
        """Async version of read_row()"""

COPY Usage Examples

# Bulk insert using COPY FROM
data = [
    ("Alice", 30, "alice@example.com"),
    ("Bob", 25, "bob@example.com"),
    ("Charlie", 35, "charlie@example.com")
]

with conn.cursor() as cur:
    with cur.copy("COPY users (name, age, email) FROM STDIN") as copy:
        for row in data:
            copy.write_row(row)

# Bulk export using COPY TO
with conn.cursor() as cur:
    with cur.copy("COPY users TO STDOUT WITH CSV HEADER") as copy:
        while True:
            data = copy.read()
            if not data:
                break
            print(data.decode())

# Row-by-row export
with conn.cursor() as cur:
    with cur.copy("COPY users (name, email) TO STDOUT") as copy:
        while True:
            row = copy.read_row()
            if row is None:
                break
            name, email = row
            print(f"{name}: {email}")

Pipeline Operations

Batch multiple operations for improved performance by reducing network round-trips.

class Pipeline:
    """Synchronous pipeline for batching operations"""
    
    def __enter__(self) -> Pipeline:
        """Enter pipeline context manager"""
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit pipeline and process all queued operations"""
    
    def sync(self) -> None:
        """
        Force synchronization of all queued operations.
        Process results and handle any errors.
        """
    
    @property
    def status(self) -> PipelineStatus:
        """
        Get the current pipeline status.
        
        Returns:
            Current pipeline status (OFF, ON, or ABORTED)
        """
    
    @classmethod
    def is_supported(cls) -> bool:
        """
        Check if pipeline mode is supported by the current libpq version.
        
        Returns:
            True if pipeline mode is supported, False otherwise
        """

class AsyncPipeline:
    """Asynchronous pipeline for batching operations"""
    
    async def __aenter__(self) -> AsyncPipeline:
        """Enter async pipeline context manager"""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Exit async pipeline and process all queued operations"""
    
    async def sync(self) -> None:
        """Async version of sync()"""
    
    @property
    def status(self) -> PipelineStatus:
        """Get the current pipeline status"""
    
    @classmethod
    def is_supported(cls) -> bool:
        """Check if pipeline mode is supported"""

from psycopg.pq import PipelineStatus

class PipelineStatus(Enum):
    """Pipeline status enumeration"""
    OFF = 0  # Connection is not in pipeline mode
    ON = auto()  # Connection is in pipeline mode
    ABORTED = auto()  # Pipeline is aborted due to error

Pipeline Usage Examples

# Batch multiple INSERT operations
with conn.pipeline() as pipeline:
    with conn.cursor() as cur:
        for i in range(1000):
            cur.execute(
                "INSERT INTO items (name, value) VALUES (%s, %s)",
                (f"item_{i}", i * 10)
            )
    # All operations sent to server when pipeline exits

# Manual synchronization points
with conn.pipeline() as pipeline:
    with conn.cursor() as cur:
        # First batch
        for i in range(100):
            cur.execute("INSERT INTO batch1 (id) VALUES (%s)", (i,))
        
        pipeline.sync()  # Force processing of first batch
        
        # Second batch  
        for i in range(100):
            cur.execute("INSERT INTO batch2 (id) VALUES (%s)", (i,))
        # Second batch processed on pipeline exit

Prepared Statements

Automatic prepared statement management for improved performance with repeated queries.

class Connection:
    def prepare(self, query: str, name: str | None = None) -> str:
        """
        Explicitly prepare a statement for repeated execution.
        
        Args:
            query: SQL query to prepare
            name: Optional name for prepared statement
            
        Returns:
            Name of prepared statement
        """
    
    def prepared(self) -> dict[str, str]:
        """
        Get mapping of prepared statement names to queries.
        
        Returns:
            Dictionary of {name: query} pairs
        """

# Automatic preparation with prepare=True
class Cursor:
    def execute(
        self, 
        query, 
        params=None, 
        *, 
        prepare: bool | None = None
    ) -> Cursor:
        """
        Execute query with optional automatic preparation.
        
        Args:
            query: SQL query
            params: Query parameters  
            prepare: True to prepare statement, None for auto
        """

Prepared Statement Examples

# Explicit preparation
with conn.cursor() as cur:
    # Prepare statement manually
    stmt_name = conn.prepare(
        "SELECT * FROM users WHERE age > $1 AND city = $2"
    )
    
    # Use prepared statement multiple times
    cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (25, "New York"))
    result1 = cur.fetchall()
    
    cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (30, "Boston"))
    result2 = cur.fetchall()

# Automatic preparation
with conn.cursor() as cur:
    query = "SELECT * FROM products WHERE price > %s"
    
    # First execution prepares statement automatically
    cur.execute(query, (100,), prepare=True)
    expensive_products = cur.fetchall()
    
    # Subsequent executions reuse prepared statement
    cur.execute(query, (200,), prepare=True)  
    very_expensive = cur.fetchall()
    
    cur.execute(query, (50,), prepare=True)
    moderate_products = cur.fetchall()

Large Object Support

Handle PostgreSQL large objects (BLOBs) for storing binary data larger than 1GB.

class Connection:
    def lobject(
        self, 
        oid: int = 0, 
        mode: str = "r", 
        new_oid: int = 0
    ) -> LargeObject:
        """
        Open or create large object.
        
        Args:
            oid: Object ID (0 to create new)
            mode: Open mode ("r", "w", "rw")
            new_oid: Specific OID for new object
            
        Returns:
            LargeObject instance
        """

class LargeObject:
    """Large object interface for binary data > 1GB"""
    
    @property
    def oid(self) -> int:
        """Large object OID"""
    
    def read(self, size: int = -1) -> bytes:
        """
        Read bytes from large object.
        
        Args:
            size: Number of bytes to read (-1 for all)
            
        Returns:
            Bytes read from object
        """
    
    def write(self, data: bytes) -> int:
        """
        Write bytes to large object.
        
        Args:
            data: Bytes to write
            
        Returns:
            Number of bytes written
        """
    
    def seek(self, pos: int, whence: int = 0) -> int:
        """
        Seek to position in large object.
        
        Args:
            pos: Position to seek to
            whence: Seek mode (0=absolute, 1=relative, 2=from end)
            
        Returns:
            New position
        """
    
    def tell(self) -> int:
        """Get current position in large object"""
    
    def truncate(self, size: int) -> None:
        """Truncate large object to specified size"""
    
    def close(self) -> None:
        """Close large object"""
    
    def unlink(self) -> None:
        """Delete large object from database"""

Large Object Examples

# Store large file as large object
def store_file_as_lob(conn, file_path):
    """Store file as PostgreSQL large object"""
    
    with conn.lobject(0, "w") as lob:
        with open(file_path, "rb") as f:
            while True:
                chunk = f.read(65536)  # 64KB chunks
                if not chunk:
                    break
                lob.write(chunk)
        
        return lob.oid  # Return OID for future reference

# Retrieve large object as file
def retrieve_lob_as_file(conn, oid, output_path):
    """Retrieve large object and save as file"""
    
    with conn.lobject(oid, "r") as lob:
        with open(output_path, "wb") as f:
            while True:
                chunk = lob.read(65536)
                if not chunk:
                    break
                f.write(chunk)

# Stream large object data
def stream_lob_data(conn, oid):
    """Stream large object data in chunks"""
    
    with conn.lobject(oid, "r") as lob:
        while True:
            chunk = lob.read(8192)  # 8KB chunks
            if not chunk:
                break
            yield chunk

Connection Pooling

Connection pool management for high-performance applications (requires psycopg-pool package).

# Note: Requires separate psycopg-pool package
from psycopg_pool import ConnectionPool, AsyncConnectionPool

class ConnectionPool:
    """Synchronous connection pool"""
    
    def __init__(
        self,
        conninfo: str = "",
        *,
        min_size: int = 4,
        max_size: int | None = None,
        open: bool = True,
        name: str | None = None,
        timeout: float = 30.0,
        **kwargs
    ):
        """
        Create connection pool.
        
        Args:
            conninfo: Connection string
            min_size: Minimum pool size
            max_size: Maximum pool size (None = unlimited)
            open: Open pool immediately
            name: Pool name for identification
            timeout: Connection checkout timeout
        """
    
    def getconn(self, timeout: float | None = None) -> Connection:
        """
        Get connection from pool.
        
        Args:
            timeout: Checkout timeout (None = use pool default)
            
        Returns:
            Connection from pool
        """
    
    def putconn(self, conn: Connection) -> None:
        """
        Return connection to pool.
        
        Args:
            conn: Connection to return
        """
    
    def open(self) -> None:
        """Open the connection pool"""
    
    def close(self) -> None:
        """Close the connection pool"""
    
    @property
    def name(self) -> str | None:
        """Pool name"""
    
    @property
    def size(self) -> int:
        """Current pool size"""
    
    @property
    def available(self) -> int:
        """Available connections in pool"""

class AsyncConnectionPool:
    """Asynchronous connection pool"""
    # Same interface as ConnectionPool but with async methods
    
    async def getconn(self, timeout: float | None = None) -> AsyncConnection: ...
    async def putconn(self, conn: AsyncConnection) -> None: ...
    async def open(self) -> None: ...
    async def close(self) -> None: ...

Connection Pool Examples

from psycopg_pool import ConnectionPool

# Create connection pool
pool = ConnectionPool("dbname=mydb user=postgres", min_size=2, max_size=10)

# Use connection from pool
conn = pool.getconn()
try:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        users = cur.fetchall()
finally:
    pool.putconn(conn)  # Always return to pool

# Context manager usage
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM products")
        count = cur.fetchone()[0]
# Connection automatically returned to pool

# Cleanup
pool.close()

Advanced Cursor Features

Additional cursor capabilities for specialized use cases.

Named Cursors (Server-Side)

# Server-side cursor for large result sets
with conn.cursor(name="large_scan") as cur:
    cur.execute("SELECT * FROM huge_table ORDER BY id")
    
    # Process results in batches
    while True:
        batch = cur.fetchmany(1000)
        if not batch:
            break
        
        process_batch(batch)
        
        # Optional: update progress
        print(f"Processed {cur.rownumber} rows")

Scrollable Cursors

# Scrollable server-side cursor
with conn.cursor(name="scrollable", scrollable=True) as cur:
    cur.execute("SELECT id, name FROM users ORDER BY name")
    
    # Move around in result set
    cur.scroll(10)  # Skip first 10 rows
    row = cur.fetchone()
    
    cur.scroll(-5, mode="relative")  # Go back 5 rows
    row = cur.fetchone()
    
    cur.scroll(0, mode="absolute")  # Go to beginning
    first_row = cur.fetchone()

Holdable Cursors

# Cursor that survives transaction boundaries
with conn.cursor(name="holdable", withhold=True) as cur:
    cur.execute("SELECT * FROM users")
    
    # Start transaction
    conn.commit()  # Cursor remains valid
    
    # Continue fetching after transaction boundary
    remaining_rows = cur.fetchall()

Performance Monitoring

Built-in capabilities for monitoring and optimizing database operations.

class Capabilities:
    """Database and driver capability detection"""
    
    @property
    def libpq_version(self) -> int:
        """libpq version number"""
    
    @property  
    def has_prepare(self) -> bool:
        """True if prepared statements are supported"""
    
    @property
    def has_pipeline(self) -> bool:
        """True if pipeline mode is supported"""

# Global capabilities instance
capabilities: Capabilities

Performance Examples

# Check capabilities
if psycopg.capabilities.has_pipeline:
    # Use pipeline for bulk operations
    with conn.pipeline():
        # ... bulk operations
else:
    # Fall back to individual operations
    # ... individual operations

# Monitor prepared statements
print("Prepared statements:", conn.prepared())

# Connection info for debugging
info = conn.info
print(f"Server version: {info.server_version}")
print(f"Backend PID: {info.backend_pid}")
print(f"Transaction status: {info.transaction_status}")

Advanced Configuration

Connection-Level Settings

# Configure connection for high performance
conn = psycopg.connect(
    "dbname=mydb user=postgres",
    
    # Connection options
    application_name="MyApp",
    connect_timeout=10,
    
    # Performance options  
    prepare_threshold=5,  # Auto-prepare after 5 executions
    options="-c synchronous_commit=off"  # Fast writes
)

# Configure adapters for performance
conn.adapters.register_dumper(MyClass, fast_dumper)
conn.adapters.register_loader(MY_TYPE_OID, fast_loader)

Query Optimization

# Use binary format for large result sets
with conn.cursor() as cur:
    cur.execute("SELECT data FROM large_table", binary=True)
    # Results returned in binary format (faster)

# Optimize fetch sizes
with conn.cursor() as cur:
    cur.itersize = 1000  # Fetch 1000 rows per network round-trip
    cur.execute("SELECT * FROM big_table")
    
    for row in cur:  # Efficient iteration
        process_row(row)

Install with Tessl CLI

npx tessl i tessl/pypi-psycopg

docs

advanced-operations.md

connections.md

cursors.md

error-handling.md

index.md

row-factories.md

sql-composition.md

type-system.md

tile.json