PostgreSQL database adapter for Python
—
Complete connection management for PostgreSQL databases with support for both synchronous and asynchronous operations, connection pooling, transaction control, two-phase commit, and comprehensive server configuration access.
Create database connections using connection strings or individual parameters, with support for SSL, connection pooling, and various authentication methods.
class Connection:
@classmethod
def connect(
cls,
conninfo: str = "",
*,
autocommit: bool = False,
prepare_threshold: int | None = 5,
row_factory: RowFactory | None = None,
cursor_factory: type[Cursor] | None = None,
host: str | None = None,
port: int | None = None,
dbname: str | None = None,
user: str | None = None,
password: str | None = None,
sslmode: str | None = None,
connect_timeout: int | None = None,
application_name: str | None = None,
**kwargs
) -> Connection:
"""
Create a new database connection.
Args:
conninfo: PostgreSQL connection string
autocommit: Enable auto-commit mode
prepare_threshold: Number of executions before auto-preparing statements
row_factory: Default row factory for cursors
cursor_factory: Default cursor factory class
host: Database host address
port: Database port number
dbname: Database name
user: Username for authentication
password: Password for authentication
sslmode: SSL connection mode ('disable', 'require', 'prefer', etc.)
connect_timeout: Connection timeout in seconds
application_name: Application name for connection tracking
Returns:
Connection: Active database connection
"""
class AsyncConnection:
@classmethod
async def connect(
cls,
conninfo: str = "",
*,
autocommit: bool = False,
prepare_threshold: int | None = 5,
row_factory: RowFactory | None = None,
cursor_factory: type[AsyncCursor] | None = None,
host: str | None = None,
port: int | None = None,
dbname: str | None = None,
user: str | None = None,
password: str | None = None,
sslmode: str | None = None,
connect_timeout: int | None = None,
application_name: str | None = None,
**kwargs
) -> AsyncConnection:
"""Async version of Connection.connect() with same parameters"""# Simple connection string
conn = psycopg.connect("dbname=mydb user=postgres host=localhost")
# Individual parameters
conn = psycopg.connect(
host="localhost",
port=5432,
dbname="mydb",
user="postgres",
password="secret"
)
# SSL connection
conn = psycopg.connect(
host="secure-db.example.com",
dbname="production",
user="app_user",
password="secure_password",
sslmode="require"
)
# Async connection
conn = await psycopg.AsyncConnection.connect("dbname=mydb user=postgres")Monitor and control connection state, including checking connection status, handling broken connections, and managing connection lifecycle.
class BaseConnection:
@property
def closed(self) -> int:
"""Connection status: 0=open, >0=closed"""
@property
def broken(self) -> bool:
"""True if connection is broken and unusable"""
def close(self) -> None:
"""Close the connection"""
def cancel(self) -> None:
"""Cancel any running operation"""Control database transactions with support for different isolation levels, savepoints, and explicit transaction boundaries.
class Connection:
def commit(self) -> None:
"""Commit current transaction"""
def rollback(self) -> None:
"""Rollback current transaction"""
@property
def autocommit(self) -> bool:
"""Auto-commit mode status"""
@autocommit.setter
def autocommit(self, value: bool) -> None:
"""Enable/disable auto-commit mode"""
@property
def isolation_level(self) -> IsolationLevel | None:
"""Current transaction isolation level"""
@isolation_level.setter
def isolation_level(self, value: IsolationLevel | None) -> None:
"""Set transaction isolation level"""
@property
def read_only(self) -> bool | None:
"""Read-only transaction mode"""
@read_only.setter
def read_only(self, value: bool | None) -> None:
"""Set read-only transaction mode"""
def transaction(
self,
*,
savepoint_name: str | None = None,
isolation_level: IsolationLevel | None = None,
read_only: bool | None = None,
deferrable: bool | None = None
) -> Transaction:
"""Create transaction context manager"""# Auto-commit mode
conn.autocommit = True
conn.execute("CREATE TABLE test (id serial, name text)")
# Manual transaction control
conn.autocommit = False
try:
conn.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
conn.execute("INSERT INTO users (name) VALUES (%s)", ("Bob",))
conn.commit()
except Exception:
conn.rollback()
raise
# Transaction context manager
with conn.transaction():
conn.execute("INSERT INTO users (name) VALUES (%s)", ("Charlie",))
# Automatically commits or rolls back
# Savepoint
with conn.transaction(savepoint_name="sp1"):
conn.execute("INSERT INTO users (name) VALUES (%s)", ("Dave",))
# Can rollback to this savepointSupport for distributed transactions using PostgreSQL's two-phase commit protocol for coordination across multiple databases.
class Connection:
def tpc_begin(self, xid: Xid) -> None:
"""Begin two-phase commit transaction"""
def tpc_prepare(self) -> None:
"""Prepare two-phase commit transaction"""
def tpc_commit(self, xid: Xid | None = None) -> None:
"""Commit prepared transaction"""
def tpc_rollback(self, xid: Xid | None = None) -> None:
"""Rollback prepared transaction"""
def tpc_recover(self) -> list[Xid]:
"""Get list of prepared transaction IDs"""
class Xid:
def __init__(
self,
format_id: int,
gtrid: str,
bqual: str = ""
):
"""
Transaction identifier for two-phase commit.
Args:
format_id: Format identifier
gtrid: Global transaction identifier
bqual: Branch qualifier
"""
@property
def format_id(self) -> int: ...
@property
def gtrid(self) -> str: ...
@property
def bqual(self) -> str: ...Execute queries directly on connections with automatic cursor management and result handling.
class Connection:
def execute(
self,
query,
params=None,
*,
prepare: bool | None = None,
binary: bool | None = None
) -> Cursor:
"""
Execute query and return cursor with results.
Args:
query: SQL query string or sql.Composable
params: Query parameters
prepare: Use prepared statements
binary: Use binary format for results
Returns:
Cursor with query results
"""
class AsyncConnection:
async def execute(
self,
query,
params=None,
*,
prepare: bool | None = None,
binary: bool | None = None
) -> AsyncCursor:
"""Async version of Connection.execute()"""Access detailed connection and server information including database parameters, server version, and connection status.
@property
def info(self) -> ConnectionInfo:
"""Connection information and server details"""
class ConnectionInfo:
@property
def dsn(self) -> str:
"""Data source name (connection string)"""
@property
def status(self) -> int:
"""Connection status code"""
@property
def transaction_status(self) -> int:
"""Current transaction status"""
@property
def pipeline_status(self) -> int:
"""Pipeline mode status"""
@property
def encoding(self) -> str:
"""Client encoding name"""
@property
def server_version(self) -> int:
"""PostgreSQL server version number"""
@property
def backend_pid(self) -> int:
"""Server backend process ID"""
@property
def secret_key(self) -> int:
"""Secret key for connection cancellation"""
@property
def timezone(self) -> str | None:
"""Server timezone setting"""
@property
def host(self) -> str | None:
"""Database host"""
@property
def hostaddr(self) -> str | None:
"""Database host IP address"""
@property
def port(self) -> int | None:
"""Database port"""
@property
def dbname(self) -> str | None:
"""Database name"""
@property
def user(self) -> str | None:
"""Database user"""
@property
def password(self) -> str | None:
"""Database password (masked)"""
def get_parameters(self) -> dict[str, str]:
"""Get all server configuration parameters"""
def parameter_status(self, name: str) -> str | None:
"""Get specific server parameter value"""Handle PostgreSQL LISTEN/NOTIFY messages for real-time communication between database clients.
class Connection:
def add_notify_handler(self, callback: Callable[[Notify], None]) -> None:
"""Add notification message handler"""
def remove_notify_handler(self, callback: Callable[[Notify], None]) -> None:
"""Remove notification message handler"""
def notifies(self) -> list[Notify]:
"""Get pending notification messages"""
class Notify:
@property
def channel(self) -> str:
"""Notification channel name"""
@property
def payload(self) -> str:
"""Notification message payload"""
@property
def pid(self) -> int:
"""Process ID of notifying backend"""Configure type adapters and customize how Python objects are converted to/from PostgreSQL data types.
@property
def adapters(self) -> AdaptersMap:
"""Type adapters registry for this connection"""
class AdaptersMap:
def register_loader(self, oid: int, loader: Callable) -> None:
"""Register custom type loader"""
def register_dumper(self, cls: type, dumper: Callable) -> None:
"""Register custom type dumper"""from enum import IntEnum
class IsolationLevel(IntEnum):
READ_UNCOMMITTED = 1
READ_COMMITTED = 2
REPEATABLE_READ = 3
SERIALIZABLE = 4Install with Tessl CLI
npx tessl i tessl/pypi-psycopg