PostgreSQL database adapter for Python with thread-safe connection pooling and SQL operations
—
Core database connectivity functionality providing connection management, cursor operations, transaction control, and SQL execution. This implements the DB API 2.0 specification with psycopg2-specific enhancements.
Create database connections using various parameter formats and connection factories.
def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
"""
Create a new database connection.
Parameters:
- dsn (str, optional): Connection string in PostgreSQL format
- connection_factory (callable, optional): Custom connection class
- cursor_factory (callable, optional): Default cursor factory for this connection
- **kwargs: Connection parameters (host, port, database, user, password, etc.)
Returns:
connection: Database connection object
"""Usage examples:
# Using connection string
conn = psycopg2.connect("host=localhost dbname=test user=postgres")
# Using keyword arguments
conn = psycopg2.connect(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret"
)
# With custom cursor factory
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="postgres",
password="secret",
cursor_factory=psycopg2.extras.DictCursor
)Utility functions for connection string handling, identifier quoting, and password encryption.
def parse_dsn(dsn):
"""
Parse connection string into keyword dictionary.
Parameters:
- dsn (str): Connection string in PostgreSQL format
Returns:
dict: Dictionary of connection parameters
Raises:
ProgrammingError: If DSN is not valid
"""
def make_dsn(dsn=None, **kwargs):
"""
Convert keywords into connection string.
Parameters:
- dsn (str, optional): Base connection string
- **kwargs: Connection parameters to include
Returns:
str: Formatted connection string
"""
def quote_ident(name, scope):
"""
Quote SQL identifier according to PostgreSQL rules.
Parameters:
- name (str): Identifier to quote
- scope (connection/cursor): Connection or cursor for encoding
Returns:
str: Quoted identifier
"""
def encrypt_password(password, user, scope=None, algorithm=None):
"""
Encrypt password for PostgreSQL authentication.
Parameters:
- password (str): Cleartext password
- user (str): Username
- scope (connection/cursor, optional): Connection scope
- algorithm (str, optional): Encryption algorithm ('md5', 'scram-sha-256')
Returns:
str: Encrypted password
"""Usage examples:
from psycopg2.extensions import parse_dsn, make_dsn, quote_ident, encrypt_password
# Parse connection string
params = parse_dsn('dbname=test user=postgres password=secret')
print(params) # {'dbname': 'test', 'user': 'postgres', 'password': 'secret'}
# Parse connection URI
uri_params = parse_dsn("postgresql://user@host/db?connect_timeout=10")
# Build connection string
dsn = make_dsn(host='localhost', database='mydb', user='postgres')
print(dsn) # "host=localhost database=mydb user=postgres"
# Override existing DSN
new_dsn = make_dsn('host=localhost dbname=test', user='admin')
# Quote SQL identifiers safely
table_name = quote_ident('user-table', conn)
print(table_name) # "user-table"
# Handle embedded quotes
complex_name = quote_ident('table"with"quotes', conn)
print(complex_name) # "table""with""quotes"
# Encrypt passwords for storage
encrypted = encrypt_password('mypassword', 'username', algorithm='md5')
print(encrypted) # md5<hash>
# Use connection's algorithm setting
encrypted = encrypt_password('mypassword', 'username', conn)
# SCRAM-SHA-256 encryption (requires libpq >= 10)
encrypted = encrypt_password('mypassword', 'username', conn, 'scram-sha-256')Database connection providing transaction management, cursor creation, and connection control.
class connection:
def cursor(self, name=None, cursor_factory=None, scrollable=None, withhold=False):
"""
Create a new cursor for this connection.
Parameters:
- name (str, optional): Server-side cursor name
- cursor_factory (callable, optional): Cursor class to instantiate
- scrollable (bool, optional): Server cursor scrollability
- withhold (bool, optional): Server cursor withhold capability
Returns:
cursor: New cursor object
"""
def commit(self):
"""Commit current transaction."""
def rollback(self):
"""Rollback current transaction."""
def close(self):
"""Close the connection."""
def set_isolation_level(self, level):
"""
Set transaction isolation level.
Parameters:
- level (int): Isolation level constant
"""
def set_client_encoding(self, encoding):
"""
Set client encoding.
Parameters:
- encoding (str): Encoding name
"""
def cancel(self):
"""Cancel current operation."""
def reset(self):
"""Reset connection to initial state."""
# Properties
@property
def closed(self):
"""Connection closed status (0=open, >0=closed)."""
@property
def status(self):
"""Connection status constant."""
@property
def encoding(self):
"""Current client encoding."""
@property
def isolation_level(self):
"""Current isolation level."""
@property
def autocommit(self):
"""Autocommit mode status."""
@autocommit.setter
def autocommit(self, value):
"""Set autocommit mode."""Database cursor for executing SQL statements and fetching results.
class cursor:
def execute(self, query, vars=None):
"""
Execute SQL statement.
Parameters:
- query (str): SQL statement with optional placeholders
- vars (sequence/dict, optional): Parameter values
"""
def executemany(self, query, vars_list):
"""
Execute SQL statement multiple times.
Parameters:
- query (str): SQL statement with placeholders
- vars_list (sequence): Sequence of parameter tuples/dicts
"""
def fetchone(self):
"""
Fetch next row.
Returns:
tuple/None: Next row or None if no more rows
"""
def fetchmany(self, size=None):
"""
Fetch multiple rows.
Parameters:
- size (int, optional): Number of rows to fetch
Returns:
list: List of row tuples
"""
def fetchall(self):
"""
Fetch all remaining rows.
Returns:
list: List of all remaining row tuples
"""
def close(self):
"""Close the cursor."""
def callproc(self, procname, parameters=None):
"""
Call stored procedure.
Parameters:
- procname (str): Procedure name
- parameters (sequence, optional): Procedure parameters
Returns:
sequence: Modified parameters
"""
def copy_from(self, file, table, sep='\t', null='\\N', size=8192, columns=None):
"""
Copy data from file to table.
Parameters:
- file: File-like object to read from
- table (str): Target table name
- sep (str): Field separator
- null (str): NULL representation
- size (int): Buffer size
- columns (sequence, optional): Column names
"""
def copy_to(self, file, table, sep='\t', null='\\N', columns=None):
"""
Copy table data to file.
Parameters:
- file: File-like object to write to
- table (str): Source table name
- sep (str): Field separator
- null (str): NULL representation
- columns (sequence, optional): Column names
"""
def copy_expert(self, sql, file, size=8192):
"""
Execute COPY command with expert control.
Parameters:
- sql (str): COPY command
- file: File-like object
- size (int): Buffer size
"""
def mogrify(self, operation, parameters=None):
"""
Return formatted query string.
Parameters:
- operation (str): SQL statement with placeholders
- parameters (sequence/dict, optional): Parameter values
Returns:
bytes: Formatted query string
"""
def setinputsizes(self, sizes):
"""Set input sizes (DB API 2.0 compliance - no-op)."""
def setoutputsize(self, size, column=None):
"""Set output size (DB API 2.0 compliance - no-op)."""
# Properties
@property
def description(self):
"""
Cursor result description.
Returns:
list/None: List of column descriptors or None
"""
@property
def rowcount(self):
"""Number of rows affected by last execute."""
@property
def rownumber(self):
"""Current 0-based row number."""
@property
def lastrowid(self):
"""Last inserted row ID (not supported by PostgreSQL)."""
@property
def query(self):
"""Last executed query as bytes."""
@property
def statusmessage(self):
"""Status message from last command."""
@property
def closed(self):
"""Cursor closed status."""
@property
def name(self):
"""Server-side cursor name."""
@property
def scrollable(self):
"""Server cursor scrollability."""
@property
def withhold(self):
"""Server cursor withhold capability."""# Basic transaction control
conn = psycopg2.connect(...)
# Explicit transaction
conn.commit() # Commit current transaction
conn.rollback() # Rollback current transaction
# Autocommit mode
conn.autocommit = True # Enable autocommit
conn.autocommit = False # Disable autocommit (default)
# Isolation levels
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
# Context manager (automatic commit/rollback)
with conn:
cur = conn.cursor()
cur.execute("INSERT INTO table VALUES (%s)", (value,))
# Automatic commit on success, rollback on exception# Named cursor for large result sets
cur = conn.cursor('my_cursor') # Server-side cursor
cur.execute("SELECT * FROM large_table")
# Iterate through results without loading all into memory
for row in cur:
process_row(row)
# Scrollable server cursor
cur = conn.cursor('scrollable_cursor', scrollable=True)
cur.execute("SELECT * FROM table ORDER BY id")
cur.scroll(100) # Skip 100 rows
row = cur.fetchone()
# Cursor with holdability
cur = conn.cursor('hold_cursor', withhold=True)# Create async connection
conn = psycopg2.connect(..., async_=True)
# Check connection state
if conn.poll() == psycopg2.extensions.POLL_OK:
print("Connection ready")
# Async query execution
cur = conn.cursor()
cur.execute("SELECT * FROM table")
# Poll for completion
while True:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
results = cur.fetchall()
break
elif state == psycopg2.extensions.POLL_READ:
# Wait for read
select.select([conn.fileno()], [], [])
elif state == psycopg2.extensions.POLL_WRITE:
# Wait for write
select.select([], [conn.fileno()], [])STATUS_SETUP: int = 0 # Connection being set up
STATUS_READY: int = 1 # Connection ready for commands
STATUS_BEGIN: int = 2 # Connection in transaction block
STATUS_IN_TRANSACTION: int = 2 # Alias for STATUS_BEGIN
STATUS_PREPARED: int = 5 # Connection with prepared transactionPOLL_OK: int = 0 # Operation completed
POLL_READ: int = 1 # Wait for read
POLL_WRITE: int = 2 # Wait for write
POLL_ERROR: int = 3 # Error occurredTRANSACTION_STATUS_IDLE: int = 0 # Not in a transaction
TRANSACTION_STATUS_ACTIVE: int = 1 # Command in progress
TRANSACTION_STATUS_INTRANS: int = 2 # In transaction block
TRANSACTION_STATUS_INERROR: int = 3 # In failed transaction
TRANSACTION_STATUS_UNKNOWN: int = 4 # Connection badColumnDescription = tuple[
str, # name - column name
int, # type_code - PostgreSQL type OID
int, # display_size - display size (not used)
int, # internal_size - internal size in bytes
int, # precision - numeric precision
int, # scale - numeric scale
bool # null_ok - nullable flag
]Install with Tessl CLI
npx tessl i tessl/pypi-psycopg2-binary