PostgreSQL database adapter for Python
—
High-performance operations including COPY for bulk data transfer, pipeline operations for batching, prepared statements, and server-side cursors for memory-efficient large result set processing.
PostgreSQL's COPY protocol for high-performance bulk data transfer between Python and the database.
class Copy:
"""Synchronous COPY operations manager"""
def __enter__(self) -> Copy:
"""Enter context manager for COPY operation"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager and finalize COPY"""
def write(self, data: bytes) -> None:
"""
Write raw bytes to COPY operation.
Args:
data: Raw bytes in COPY format
"""
def write_row(self, row: Sequence[Any]) -> None:
"""
Write single row to COPY operation.
Args:
row: Sequence of column values
"""
def read(self) -> bytes:
"""
Read raw bytes from COPY operation.
Returns:
Raw bytes in COPY format
"""
def read_row(self) -> Sequence[Any] | None:
"""
Read single row from COPY operation.
Returns:
Row as sequence of values, None if no more data
"""
class AsyncCopy:
"""Asynchronous COPY operations manager"""
async def __aenter__(self) -> AsyncCopy:
"""Enter async context manager for COPY operation"""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context manager and finalize COPY"""
async def write(self, data: bytes) -> None:
"""Async version of write()"""
async def write_row(self, row: Sequence[Any]) -> None:
"""Async version of write_row()"""
async def read(self) -> bytes:
"""Async version of read()"""
async def read_row(self) -> Sequence[Any] | None:
"""Async version of read_row()"""# Bulk insert using COPY FROM
data = [
("Alice", 30, "alice@example.com"),
("Bob", 25, "bob@example.com"),
("Charlie", 35, "charlie@example.com")
]
with conn.cursor() as cur:
with cur.copy("COPY users (name, age, email) FROM STDIN") as copy:
for row in data:
copy.write_row(row)
# Bulk export using COPY TO
with conn.cursor() as cur:
with cur.copy("COPY users TO STDOUT WITH CSV HEADER") as copy:
while True:
data = copy.read()
if not data:
break
print(data.decode())
# Row-by-row export
with conn.cursor() as cur:
with cur.copy("COPY users (name, email) TO STDOUT") as copy:
while True:
row = copy.read_row()
if row is None:
break
name, email = row
print(f"{name}: {email}")Batch multiple operations for improved performance by reducing network round-trips.
class Pipeline:
"""Synchronous pipeline for batching operations"""
def __enter__(self) -> Pipeline:
"""Enter pipeline context manager"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit pipeline and process all queued operations"""
def sync(self) -> None:
"""
Force synchronization of all queued operations.
Process results and handle any errors.
"""
@property
def status(self) -> PipelineStatus:
"""
Get the current pipeline status.
Returns:
Current pipeline status (OFF, ON, or ABORTED)
"""
@classmethod
def is_supported(cls) -> bool:
"""
Check if pipeline mode is supported by the current libpq version.
Returns:
True if pipeline mode is supported, False otherwise
"""
class AsyncPipeline:
"""Asynchronous pipeline for batching operations"""
async def __aenter__(self) -> AsyncPipeline:
"""Enter async pipeline context manager"""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async pipeline and process all queued operations"""
async def sync(self) -> None:
"""Async version of sync()"""
@property
def status(self) -> PipelineStatus:
"""Get the current pipeline status"""
@classmethod
def is_supported(cls) -> bool:
"""Check if pipeline mode is supported"""
from psycopg.pq import PipelineStatus
class PipelineStatus(Enum):
"""Pipeline status enumeration"""
OFF = 0 # Connection is not in pipeline mode
ON = auto() # Connection is in pipeline mode
ABORTED = auto() # Pipeline is aborted due to error# Batch multiple INSERT operations
with conn.pipeline() as pipeline:
with conn.cursor() as cur:
for i in range(1000):
cur.execute(
"INSERT INTO items (name, value) VALUES (%s, %s)",
(f"item_{i}", i * 10)
)
# All operations sent to server when pipeline exits
# Manual synchronization points
with conn.pipeline() as pipeline:
with conn.cursor() as cur:
# First batch
for i in range(100):
cur.execute("INSERT INTO batch1 (id) VALUES (%s)", (i,))
pipeline.sync() # Force processing of first batch
# Second batch
for i in range(100):
cur.execute("INSERT INTO batch2 (id) VALUES (%s)", (i,))
# Second batch processed on pipeline exitAutomatic prepared statement management for improved performance with repeated queries.
class Connection:
def prepare(self, query: str, name: str | None = None) -> str:
"""
Explicitly prepare a statement for repeated execution.
Args:
query: SQL query to prepare
name: Optional name for prepared statement
Returns:
Name of prepared statement
"""
def prepared(self) -> dict[str, str]:
"""
Get mapping of prepared statement names to queries.
Returns:
Dictionary of {name: query} pairs
"""
# Automatic preparation with prepare=True
class Cursor:
def execute(
self,
query,
params=None,
*,
prepare: bool | None = None
) -> Cursor:
"""
Execute query with optional automatic preparation.
Args:
query: SQL query
params: Query parameters
prepare: True to prepare statement, None for auto
"""# Explicit preparation
with conn.cursor() as cur:
# Prepare statement manually
stmt_name = conn.prepare(
"SELECT * FROM users WHERE age > $1 AND city = $2"
)
# Use prepared statement multiple times
cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (25, "New York"))
result1 = cur.fetchall()
cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (30, "Boston"))
result2 = cur.fetchall()
# Automatic preparation
with conn.cursor() as cur:
query = "SELECT * FROM products WHERE price > %s"
# First execution prepares statement automatically
cur.execute(query, (100,), prepare=True)
expensive_products = cur.fetchall()
# Subsequent executions reuse prepared statement
cur.execute(query, (200,), prepare=True)
very_expensive = cur.fetchall()
cur.execute(query, (50,), prepare=True)
moderate_products = cur.fetchall()Handle PostgreSQL large objects (BLOBs) for storing binary data larger than 1GB.
class Connection:
def lobject(
self,
oid: int = 0,
mode: str = "r",
new_oid: int = 0
) -> LargeObject:
"""
Open or create large object.
Args:
oid: Object ID (0 to create new)
mode: Open mode ("r", "w", "rw")
new_oid: Specific OID for new object
Returns:
LargeObject instance
"""
class LargeObject:
"""Large object interface for binary data > 1GB"""
@property
def oid(self) -> int:
"""Large object OID"""
def read(self, size: int = -1) -> bytes:
"""
Read bytes from large object.
Args:
size: Number of bytes to read (-1 for all)
Returns:
Bytes read from object
"""
def write(self, data: bytes) -> int:
"""
Write bytes to large object.
Args:
data: Bytes to write
Returns:
Number of bytes written
"""
def seek(self, pos: int, whence: int = 0) -> int:
"""
Seek to position in large object.
Args:
pos: Position to seek to
whence: Seek mode (0=absolute, 1=relative, 2=from end)
Returns:
New position
"""
def tell(self) -> int:
"""Get current position in large object"""
def truncate(self, size: int) -> None:
"""Truncate large object to specified size"""
def close(self) -> None:
"""Close large object"""
def unlink(self) -> None:
"""Delete large object from database"""# Store large file as large object
def store_file_as_lob(conn, file_path):
"""Store file as PostgreSQL large object"""
with conn.lobject(0, "w") as lob:
with open(file_path, "rb") as f:
while True:
chunk = f.read(65536) # 64KB chunks
if not chunk:
break
lob.write(chunk)
return lob.oid # Return OID for future reference
# Retrieve large object as file
def retrieve_lob_as_file(conn, oid, output_path):
"""Retrieve large object and save as file"""
with conn.lobject(oid, "r") as lob:
with open(output_path, "wb") as f:
while True:
chunk = lob.read(65536)
if not chunk:
break
f.write(chunk)
# Stream large object data
def stream_lob_data(conn, oid):
"""Stream large object data in chunks"""
with conn.lobject(oid, "r") as lob:
while True:
chunk = lob.read(8192) # 8KB chunks
if not chunk:
break
yield chunkConnection pool management for high-performance applications (requires psycopg-pool package).
# Note: Requires separate psycopg-pool package
from psycopg_pool import ConnectionPool, AsyncConnectionPool
class ConnectionPool:
"""Synchronous connection pool"""
def __init__(
self,
conninfo: str = "",
*,
min_size: int = 4,
max_size: int | None = None,
open: bool = True,
name: str | None = None,
timeout: float = 30.0,
**kwargs
):
"""
Create connection pool.
Args:
conninfo: Connection string
min_size: Minimum pool size
max_size: Maximum pool size (None = unlimited)
open: Open pool immediately
name: Pool name for identification
timeout: Connection checkout timeout
"""
def getconn(self, timeout: float | None = None) -> Connection:
"""
Get connection from pool.
Args:
timeout: Checkout timeout (None = use pool default)
Returns:
Connection from pool
"""
def putconn(self, conn: Connection) -> None:
"""
Return connection to pool.
Args:
conn: Connection to return
"""
def open(self) -> None:
"""Open the connection pool"""
def close(self) -> None:
"""Close the connection pool"""
@property
def name(self) -> str | None:
"""Pool name"""
@property
def size(self) -> int:
"""Current pool size"""
@property
def available(self) -> int:
"""Available connections in pool"""
class AsyncConnectionPool:
"""Asynchronous connection pool"""
# Same interface as ConnectionPool but with async methods
async def getconn(self, timeout: float | None = None) -> AsyncConnection: ...
async def putconn(self, conn: AsyncConnection) -> None: ...
async def open(self) -> None: ...
async def close(self) -> None: ...from psycopg_pool import ConnectionPool
# Create connection pool
pool = ConnectionPool("dbname=mydb user=postgres", min_size=2, max_size=10)
# Use connection from pool
conn = pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users")
users = cur.fetchall()
finally:
pool.putconn(conn) # Always return to pool
# Context manager usage
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM products")
count = cur.fetchone()[0]
# Connection automatically returned to pool
# Cleanup
pool.close()Additional cursor capabilities for specialized use cases.
# Server-side cursor for large result sets
with conn.cursor(name="large_scan") as cur:
cur.execute("SELECT * FROM huge_table ORDER BY id")
# Process results in batches
while True:
batch = cur.fetchmany(1000)
if not batch:
break
process_batch(batch)
# Optional: update progress
print(f"Processed {cur.rownumber} rows")# Scrollable server-side cursor
with conn.cursor(name="scrollable", scrollable=True) as cur:
cur.execute("SELECT id, name FROM users ORDER BY name")
# Move around in result set
cur.scroll(10) # Skip first 10 rows
row = cur.fetchone()
cur.scroll(-5, mode="relative") # Go back 5 rows
row = cur.fetchone()
cur.scroll(0, mode="absolute") # Go to beginning
first_row = cur.fetchone()# Cursor that survives transaction boundaries
with conn.cursor(name="holdable", withhold=True) as cur:
cur.execute("SELECT * FROM users")
# Start transaction
conn.commit() # Cursor remains valid
# Continue fetching after transaction boundary
remaining_rows = cur.fetchall()Built-in capabilities for monitoring and optimizing database operations.
class Capabilities:
"""Database and driver capability detection"""
@property
def libpq_version(self) -> int:
"""libpq version number"""
@property
def has_prepare(self) -> bool:
"""True if prepared statements are supported"""
@property
def has_pipeline(self) -> bool:
"""True if pipeline mode is supported"""
# Global capabilities instance
capabilities: Capabilities# Check capabilities
if psycopg.capabilities.has_pipeline:
# Use pipeline for bulk operations
with conn.pipeline():
# ... bulk operations
else:
# Fall back to individual operations
# ... individual operations
# Monitor prepared statements
print("Prepared statements:", conn.prepared())
# Connection info for debugging
info = conn.info
print(f"Server version: {info.server_version}")
print(f"Backend PID: {info.backend_pid}")
print(f"Transaction status: {info.transaction_status}")# Configure connection for high performance
conn = psycopg.connect(
"dbname=mydb user=postgres",
# Connection options
application_name="MyApp",
connect_timeout=10,
# Performance options
prepare_threshold=5, # Auto-prepare after 5 executions
options="-c synchronous_commit=off" # Fast writes
)
# Configure adapters for performance
conn.adapters.register_dumper(MyClass, fast_dumper)
conn.adapters.register_loader(MY_TYPE_OID, fast_loader)# Use binary format for large result sets
with conn.cursor() as cur:
cur.execute("SELECT data FROM large_table", binary=True)
# Results returned in binary format (faster)
# Optimize fetch sizes
with conn.cursor() as cur:
cur.itersize = 1000 # Fetch 1000 rows per network round-trip
cur.execute("SELECT * FROM big_table")
for row in cur: # Efficient iteration
process_row(row)Install with Tessl CLI
npx tessl i tessl/pypi-psycopg