An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive database connection functionality including establishment, authentication, configuration, and lifecycle management. AsyncPG provides both single connections and connection pooling with full PostgreSQL feature support.
Create database connections with extensive configuration options including authentication, SSL/TLS, timeouts, and PostgreSQL-specific settings.
async def connect(
dsn: str = None,
*,
host: str = None,
port: int = None,
user: str = None,
password: typing.Union[str, typing.Callable[[], str]] = None,
passfile: str = None,
database: str = None,
loop: asyncio.AbstractEventLoop = None,
timeout: float = 60,
statement_cache_size: int = 100,
max_cached_statement_lifetime: float = 300,
max_cacheable_statement_size: int = 15360,
command_timeout: float = None,
ssl: typing.Union[bool, ssl.SSLContext] = None,
direct_tls: bool = None,
connection_class: type = Connection,
record_class: type = Record,
server_settings: typing.Dict[str, str] = None,
target_session_attrs: str = None,
krbsrvname: str = None,
gsslib: str = None
) -> Connection
"""
Establish a connection to a PostgreSQL server.
Parameters:
dsn: PostgreSQL connection string (postgresql://user:pass@host:port/database)
host: Database server host name or IP address
port: Database server port number (default: 5432)
user: Database user name
password: Database password
passfile: Path to password file (.pgpass format)
database: Database name to connect to
timeout: Connection timeout in seconds
statement_cache_size: Maximum number of prepared statements to cache
max_cached_statement_lifetime: Maximum lifetime of cached statements
max_cacheable_statement_size: Maximum size of statements to cache
command_timeout: Default timeout for commands
ssl: SSL/TLS configuration (True, False, or SSLContext)
direct_tls: Use direct TLS connection (without STARTTLS)
connection_class: Custom connection class to use
record_class: Custom record class for query results
server_settings: PostgreSQL server parameters to set on connection
target_session_attrs: Required server attributes ('read-write', 'read-only', etc.)
krbsrvname: Kerberos service name for GSSAPI authentication
gsslib: GSSAPI library to use ('gssapi' or 'sspi')
Returns:
Connection instance
"""# Basic connection
conn = await asyncpg.connect('postgresql://user:pass@localhost/mydb')
# Connection with SSL
conn = await asyncpg.connect(
host='localhost',
user='postgres',
password='secret',
database='mydb',
ssl=True
)
# Advanced configuration
conn = await asyncpg.connect(
'postgresql://user:pass@localhost/mydb',
statement_cache_size=200,
command_timeout=30.0,
server_settings={
'application_name': 'my_app',
'timezone': 'UTC'
}
)Monitor and control connection state including closure detection, server information, and connection reset.
def is_closed(self) -> bool:
"""Return True if the connection is closed."""
def is_in_transaction(self) -> bool:
"""Return True if the connection is currently in a transaction."""
async def close(self, *, timeout: float = None) -> None:
"""
Close the connection gracefully.
Parameters:
timeout: Maximum time to wait for graceful closure
"""
def terminate(self) -> None:
"""Terminate the connection immediately without waiting."""
async def reset(self, *, timeout: float = None) -> None:
"""
Reset the connection state.
Resets all connection state including:
- Aborts current transaction
- Closes all cursors
- Resets session state
- Clears statement cache
Parameters:
timeout: Maximum time to wait for reset
"""Access PostgreSQL server details including version, process ID, and connection settings.
def get_server_pid(self) -> int:
"""Return the process ID of the PostgreSQL server."""
def get_server_version(self) -> ServerVersion:
"""Return the version of the connected PostgreSQL server."""
def get_settings(self) -> ConnectionSettings:
"""Return current connection settings and codec information."""
def get_reset_query(self) -> str:
"""Return the query that will be sent to reset connection state."""AsyncPG supports all PostgreSQL authentication methods:
# Install with GSSAPI support
# pip install asyncpg[gssauth]
conn = await asyncpg.connect(
host='database.company.com',
user='user@COMPANY.COM',
database='mydb',
krbsrvname='postgres',
gsslib='gssapi'
)Comprehensive SSL/TLS support with various security levels and certificate validation.
import ssl
# Simple SSL
conn = await asyncpg.connect(dsn, ssl=True)
# SSL with verification disabled (testing only)
conn = await asyncpg.connect(dsn, ssl='prefer')
# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_REQUIRED
conn = await asyncpg.connect(dsn, ssl=ssl_context)
# Client certificate authentication
ssl_context = ssl.create_default_context()
ssl_context.load_cert_chain('/path/to/client.crt', '/path/to/client.key')
conn = await asyncpg.connect(dsn, ssl=ssl_context)Handle various connection-related errors with appropriate recovery strategies.
try:
conn = await asyncpg.connect(dsn, timeout=10.0)
except asyncpg.ConnectionFailureError:
# Network connectivity issues
print("Cannot reach database server")
except asyncpg.InvalidAuthorizationSpecificationError:
# Authentication failed
print("Invalid credentials")
except asyncpg.PostgresConnectionError:
# General connection problems
print("Connection error occurred")
except asyncio.TimeoutError:
# Connection timeout
print("Connection timed out")class Connection:
"""A representation of a database session."""
def is_closed(self) -> bool
def is_in_transaction(self) -> bool
def get_server_pid(self) -> int
def get_server_version(self) -> ServerVersion
def get_settings(self) -> ConnectionSettings
def get_reset_query(self) -> str
def terminate(self) -> None
async def close(self, *, timeout: float = None) -> None
async def reset(self, *, timeout: float = None) -> None
class ServerVersion:
"""PostgreSQL server version information."""
major: int
minor: int
micro: int
releaselevel: str
serial: int
class ConnectionSettings:
"""Connection configuration and codec information."""
...