PostgreSQL database adapter for Python with C optimizations for high-performance database operations
npx @tessl/cli install tessl/pypi-psycopg-binary@3.2.0PostgreSQL database adapter for Python with C optimizations that provide significant performance improvements over the pure Python implementation. This package contains pre-compiled C extensions that enhance core database operations including connection handling, query execution, and data type conversions.
pip install "psycopg[binary]" (recommended) or pip install psycopg-binaryImportant: psycopg-binary should not be imported directly. It provides optimization extensions that are automatically used by the main psycopg package when available.
# Import the main psycopg package - optimizations are used automatically
import psycopgThe package will only work correctly when psycopg is already imported:
# This will raise ImportError if psycopg is not imported first
import psycopg_binary # Only for accessing version informationpsycopg-binary works transparently as a drop-in performance enhancement. No code changes are required:
import psycopg
from datetime import datetime
# All operations automatically use C optimizations when psycopg-binary is installed
conn = psycopg.connect("dbname=test user=postgres")
cur = conn.cursor()
# Query execution is automatically optimized
cur.execute("SELECT * FROM users WHERE id = %s", (123,))
rows = cur.fetchall()
# Data type conversions use optimized implementations
cur.execute("INSERT INTO logs (timestamp, data) VALUES (%s, %s)",
(datetime.now(), {"key": "value"}))
conn.commit()
cur.close()
conn.close()To verify that binary optimizations are active:
import psycopg
# Check if binary optimizations are loaded
try:
import psycopg_binary
from psycopg_binary import pq
print(f"Binary optimizations active: {pq.__impl__}") # Should print "binary"
print(f"libpq version: {pq.version()}")
except ImportError:
print("Binary optimizations not available")psycopg-binary provides Cython-compiled optimizations organized into specialized modules:
psycopg_binary): Package metadata and entry point validation_psycopg): Core performance enhancements for data transformation, connection handling, and query executionpq): Optimized PostgreSQL protocol implementationstypes): Fast data type conversions and array handling_uuid): Optimized UUID operationsThe package integrates seamlessly with psycopg's architecture, replacing performance-critical components with C implementations while maintaining full API compatibility.
Access to package version and metadata for version compatibility verification.
# Version information (only accessible after psycopg is imported)
from psycopg_binary import __version__
# Version retrieval function
def get_version() -> str:
"""Get the psycopg-binary package version."""High-performance C implementations of core psycopg functionality including data transformation, connection management, and query execution.
from psycopg_binary._psycopg import Transformer
class Transformer:
"""
High-performance data transformation and adaptation context.
Provides optimized implementations for converting Python objects
to PostgreSQL wire format and vice versa.
"""
types: tuple[int, ...] | None
formats: list[Format] | None
def __init__(self, context: AdaptContext | None = None) -> None:
"""Initialize transformer with optional adaptation context."""
@classmethod
def from_context(cls, context: AdaptContext | None) -> "Transformer":
"""Create transformer from existing adaptation context."""
@property
def connection(self) -> BaseConnection[Any] | None:
"""Get the associated database connection."""
@property
def encoding(self) -> str:
"""Get the current character encoding."""
@property
def adapters(self) -> AdaptersMap:
"""Get the adapter registry for type conversions."""
@property
def pgresult(self) -> PGresult | None:
"""Get the current PostgreSQL result object."""
def set_pgresult(
self,
result: PGresult | None,
*,
set_loaders: bool = True,
format: Format | None = None,
) -> None:
"""Set the PostgreSQL result and configure type loaders."""
def set_dumper_types(self, types: Sequence[int], format: Format) -> None:
"""Configure dumpers for specific PostgreSQL types."""
def set_loader_types(self, types: Sequence[int], format: Format) -> None:
"""Configure loaders for specific PostgreSQL types."""
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[PyFormat]
) -> Sequence[Buffer | None]:
"""Convert Python objects to PostgreSQL wire format."""
def as_literal(self, obj: Any) -> bytes:
"""Convert object to PostgreSQL literal representation."""
def get_dumper(self, obj: Any, format: PyFormat) -> Dumper:
"""Get appropriate dumper for Python object."""
def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> list[Row]:
"""Load multiple rows from result set with optimized conversion."""
def load_row(self, row: int, make_row: RowMaker[Row]) -> Row:
"""Load single row from result set with optimized conversion."""
def load_sequence(self, record: Sequence[Buffer | None]) -> tuple[Any, ...]:
"""Convert PostgreSQL wire format to Python objects."""
def get_loader(self, oid: int, format: Format) -> Loader:
"""Get appropriate loader for PostgreSQL type OID."""Optimized asynchronous generators for database connection management and query execution.
from psycopg_binary._psycopg import (
connect, cancel, execute, send, fetch_many, fetch,
pipeline_communicate, wait_c
)
def connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[PGconn]:
"""
Optimized connection generator for establishing database connections.
Args:
conninfo: PostgreSQL connection string
timeout: Connection timeout in seconds (0 = no timeout)
Returns:
Generator yielding connection establishment states
"""
def cancel(
cancel_conn: PGcancelConn, *, timeout: float = 0.0
) -> PQGenConn[None]:
"""
Generator for canceling ongoing database operations.
Args:
cancel_conn: PostgreSQL cancellation connection
timeout: Cancellation timeout in seconds (0 = no timeout)
Returns:
Generator for cancellation process
"""
def execute(pgconn: PGconn) -> PQGen[list[PGresult]]:
"""
Optimized query execution generator.
Args:
pgconn: PostgreSQL connection object
Returns:
Generator yielding query execution results
"""
def send(pgconn: PGconn) -> PQGen[None]:
"""
Optimized generator for sending queries to PostgreSQL.
Args:
pgconn: PostgreSQL connection object
Returns:
Generator for send operation
"""
def fetch_many(pgconn: PGconn) -> PQGen[list[PGresult]]:
"""
Generator for fetching multiple results efficiently.
Args:
pgconn: PostgreSQL connection object
Returns:
Generator yielding multiple results
"""
def fetch(pgconn: PGconn) -> PQGen[PGresult | None]:
"""
Generator for fetching single results efficiently.
Args:
pgconn: PostgreSQL connection object
Returns:
Generator yielding single result or None
"""
def pipeline_communicate(
pgconn: PGconn, commands: Deque[PipelineCommand]
) -> PQGen[list[list[PGresult]]]:
"""
Optimized pipeline communication for batch operations.
Args:
pgconn: PostgreSQL connection object
commands: Queue of pipeline commands to execute
Returns:
Generator yielding batched results
"""
def wait_c(
gen: PQGen[RV], fileno: int, interval: float | None = None
) -> RV:
"""
Optimized waiting function for generators.
Args:
gen: Generator to wait for
fileno: File descriptor for polling
interval: Polling interval in seconds
Returns:
Generator result when complete
"""High-performance functions for PostgreSQL COPY operations with optimized text and binary format handling.
from psycopg_binary._psycopg import (
format_row_text, format_row_binary,
parse_row_text, parse_row_binary
)
def format_row_text(
row: Sequence[Any], tx: Transformer, out: bytearray | None = None
) -> bytearray:
"""
Format row data for PostgreSQL COPY text format.
Args:
row: Row data as sequence of values
tx: Transformer for type conversions
out: Optional output buffer to reuse
Returns:
Formatted row data as bytearray
"""
def format_row_binary(
row: Sequence[Any], tx: Transformer, out: bytearray | None = None
) -> bytearray:
"""
Format row data for PostgreSQL COPY binary format.
Args:
row: Row data as sequence of values
tx: Transformer for type conversions
out: Optional output buffer to reuse
Returns:
Formatted row data as bytearray
"""
def parse_row_text(data: Buffer, tx: Transformer) -> tuple[Any, ...]:
"""
Parse row data from PostgreSQL COPY text format.
Args:
data: Raw text format data
tx: Transformer for type conversions
Returns:
Parsed row data as tuple
"""
def parse_row_binary(data: Buffer, tx: Transformer) -> tuple[Any, ...]:
"""
Parse row data from PostgreSQL COPY binary format.
Args:
data: Raw binary format data
tx: Transformer for type conversions
Returns:
Parsed row data as tuple
"""Specialized functions for handling PostgreSQL arrays with optimized parsing and loading.
from psycopg_binary._psycopg import array_load_text, array_load_binary
def array_load_text(
data: Buffer, loader: Loader, delimiter: bytes = b","
) -> list[Any]:
"""
Load PostgreSQL array from text representation.
Args:
data: Array data in text format
loader: Type loader for array elements
delimiter: Element delimiter (default comma)
Returns:
Parsed array as Python list
"""
def array_load_binary(data: Buffer, tx: Transformer) -> list[Any]:
"""
Load PostgreSQL array from binary representation.
Args:
data: Array data in binary format
tx: Transformer for element type conversions
Returns:
Parsed array as Python list
"""Optimized libpq bindings providing direct access to PostgreSQL client library functionality with C-level performance.
from psycopg_binary import pq
# Module-level constants and functions
__impl__: str = "binary" # Implementation identifier
__build_version__: int # Build-time PostgreSQL version
def version() -> int:
"""
Get the libpq version number.
Returns:
Integer version number of the linked libpq library
"""
# Core libpq object classes
class PGconn:
"""
Optimized PostgreSQL connection object.
Provides C-level performance for connection management,
query execution, and result processing.
"""
class PGresult:
"""
Optimized PostgreSQL query result object.
Provides efficient access to query results with
minimal Python object overhead.
"""
class PGcancel:
"""
Optimized PostgreSQL query cancellation object.
Enables efficient cancellation of running queries
with reduced latency.
"""Enhanced UUID handling with optimized creation and conversion operations.
from psycopg_binary._uuid import UUID, SafeUUID_unknown
# Re-exported standard UUID class
UUID = uuid.UUID
# Re-exported SafeUUID enum member
SafeUUID_unknown = uuid.SafeUUID.unknown
class _WritableUUID(UUID):
"""
Internal optimization class for fast UUID creation.
This class has the same memory layout as UUID but allows
writing to normally read-only attributes for performance.
Used internally by C extensions for efficient UUID instantiation.
"""Core type definitions and imports required for the optimization interfaces:
from typing import Any, Sequence, Deque, TypeVar
from psycopg import BaseConnection
from psycopg import abc, pq
from psycopg.rows import Row, RowMaker
from psycopg.adapt import AdaptersMap, PyFormat
from psycopg.pq.abc import PGcancelConn, PGconn, PGresult
import uuid
# Type aliases for buffer operations
Buffer = bytes | bytearray | memoryview
# Format types for data conversion
Format = pq.Format # Binary or text format for PostgreSQL data
PyFormat = int # Python format code
# Core psycopg adaptation interfaces
AdaptContext = abc.AdaptContext
Dumper = abc.Dumper # Converts Python objects to PostgreSQL format
Loader = abc.Loader # Converts PostgreSQL data to Python objects
# Generator and pipeline interfaces
PipelineCommand = abc.PipelineCommand
PQGen = abc.PQGen[Any] # Generic PostgreSQL generator
PQGenConn = abc.PQGenConn[Any] # Connection generator
# Generic return value type
RV = TypeVar('RV') # Return value type variable
# Base connection type for optimization context
BaseConnectionType = BaseConnection[Any]pip install "psycopg[binary]" to install both psycopg and psycopg-binary with matching versionspip install psycopg-binary after installing psycopg separately (version compatibility required)Installing psycopg-binary provides significant performance improvements:
The optimizations are most beneficial for applications with high database throughput, frequent type conversions, or bulk data operations.