An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
npx @tessl/cli install tessl/pypi-asyncpg@0.30.0A high-performance, asyncio-native PostgreSQL database client designed specifically for Python's async/await syntax. AsyncPG implements the PostgreSQL server binary protocol directly, enabling advanced PostgreSQL features like prepared statements, scrollable cursors, and automatic encoding/decoding of composite types while maintaining maximum performance through Cython extensions.
pip install asyncpgpip install asyncpg[gssauth] for GSSAPI/SSPI authenticationimport asyncpgCommon patterns:
# Direct connection
connection = await asyncpg.connect('postgresql://user:pass@localhost/dbname')
# Connection pooling
pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')
# Exception handling
from asyncpg import PostgresError, UniqueViolationErrorimport asyncio
import asyncpg
async def main():
# Establish a connection to the database
conn = await asyncpg.connect('postgresql://user:pass@localhost/dbname')
# Execute a simple query
result = await conn.execute(
"CREATE TABLE users(id serial, name text, email text)"
)
# Insert data
await conn.execute(
"INSERT INTO users(name, email) VALUES($1, $2)",
"Alice", "alice@example.com"
)
# Fetch multiple rows
rows = await conn.fetch("SELECT * FROM users WHERE name = $1", "Alice")
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Fetch a single value
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
print(f"Total users: {user_count}")
# Clean up
await conn.close()
# Using connection pool
async def pool_example():
pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')
async with pool.acquire() as conn:
result = await conn.fetch("SELECT * FROM users LIMIT 10")
return result
await pool.close()
asyncio.run(main())AsyncPG is built around several core components that provide comprehensive PostgreSQL functionality:
This design enables asyncpg to serve as both a high-level database interface and a foundation for advanced PostgreSQL applications requiring maximum performance and full feature access.
Core functionality for establishing and managing database connections, including authentication, SSL/TLS support, and connection lifecycle management.
async def connect(
dsn: str = None,
*,
host: str = None,
port: int = None,
user: str = None,
password: typing.Union[str, typing.Callable[[], str]] = None,
passfile: str = None,
database: str = None,
loop: asyncio.AbstractEventLoop = None,
timeout: float = 60,
statement_cache_size: int = 100,
max_cached_statement_lifetime: float = 300,
max_cacheable_statement_size: int = 15360,
command_timeout: float = None,
ssl: typing.Union[bool, ssl.SSLContext] = None,
direct_tls: bool = None,
connection_class: type = Connection,
record_class: type = Record,
server_settings: typing.Dict[str, str] = None,
target_session_attrs: str = None,
krbsrvname: str = None,
gsslib: str = None
) -> ConnectionComprehensive query execution methods supporting various result formats, parameterized queries, prepared statements, and bulk operations.
async def execute(self, query: str, *args, timeout: float = None) -> str
async def executemany(self, command: str, args, *, timeout: float = None) -> None
async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None) -> typing.Any
async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[Record]High-performance reusable query execution with server-side statement caching and optimized parameter binding.
async def prepare(
self,
query: str,
*,
name: str = None,
timeout: float = None,
record_class: type = None
) -> PreparedStatementScrollable cursors for efficient traversal of large result sets with configurable prefetch and memory management.
def cursor(
self,
query: str,
*args,
prefetch: int = None,
timeout: float = None,
record_class: type = None
) -> CursorFactoryAdvanced connection pooling with configurable pool sizes, connection lifecycle management, and transparent connection acquisition/release.
def create_pool(
dsn: str = None,
*,
min_size: int = 10,
max_size: int = 10,
max_queries: int = 50000,
max_inactive_connection_lifetime: float = 300.0,
connect: typing.Callable = None,
setup: typing.Callable = None,
init: typing.Callable = None,
reset: typing.Callable = None,
loop: asyncio.AbstractEventLoop = None,
connection_class: type = Connection,
record_class: type = Record,
**connect_kwargs
) -> PoolFull transaction support including transaction contexts, savepoints, isolation levels, and read-only transactions.
def transaction(
self,
*,
isolation: str = None,
readonly: bool = False,
deferrable: bool = False
) -> Transaction: ...High-performance bulk data import/export using PostgreSQL's COPY protocol with support for various formats and streaming.
async def copy_from_table(
self,
table_name: str,
*,
output,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
format: str = None,
oids: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
encoding: str = None
) -> str
async def copy_from_query(
self,
query: str,
*args,
output,
timeout: float = None,
format: str = None,
oids: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
encoding: str = None
) -> str
async def copy_to_table(
self,
table_name: str,
*,
source,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
format: str = None,
oids: bool = None,
freeze: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
force_not_null: typing.List[str] = None,
force_null: typing.List[str] = None,
encoding: str = None,
where: str = None
) -> str
async def copy_records_to_table(
self,
table_name: str,
*,
records,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
where: str = None
) -> strComprehensive type system supporting all PostgreSQL data types, custom type registration, and automatic encoding/decoding.
async def set_type_codec(
self,
typename: str,
*,
schema: str = 'public',
encoder: typing.Callable,
decoder: typing.Callable,
format: str = 'text'
) -> None
async def reset_type_codec(
self,
typename: str,
*,
schema: str = 'public'
) -> None
async def set_builtin_type_codec(
self,
typename: str,
*,
schema: str = 'public',
codec_name: str,
format: str = None
) -> None
async def reload_schema_state(self) -> NoneComplete exception hierarchy mapping PostgreSQL error codes to Python exceptions with detailed error information.
class PostgresError(Exception): ...
class FatalPostgresError(PostgresError): ...
class InterfaceError(Exception): ...
class InterfaceWarning(UserWarning): ...
class DataError(InterfaceError, ValueError): ...
class IntegrityConstraintViolationError(PostgresError): ...
class UniqueViolationError(IntegrityConstraintViolationError): ...
class ForeignKeyViolationError(IntegrityConstraintViolationError): ...
class NotNullViolationError(IntegrityConstraintViolationError): ...
class CheckViolationError(IntegrityConstraintViolationError): ...Support for PostgreSQL's LISTEN/NOTIFY functionality and server log message handling.
async def add_listener(self, channel: str, callback: typing.Callable) -> None
async def remove_listener(self, channel: str, callback: typing.Callable) -> None
def add_log_listener(self, callback: typing.Callable) -> None
def remove_log_listener(self, callback: typing.Callable) -> None
def add_termination_listener(self, callback: typing.Callable) -> None
def remove_termination_listener(self, callback: typing.Callable) -> None
def add_query_logger(self, callback: typing.Callable) -> None
def remove_query_logger(self, callback: typing.Callable) -> Noneclass Connection:
"""A representation of a database session."""
def is_closed(self) -> bool
def get_server_pid(self) -> int
def get_server_version(self) -> ServerVersion
def get_settings(self) -> ConnectionSettings
def is_in_transaction(self) -> bool
async def close(self, *, timeout: float = None) -> None
async def reset(self, *, timeout: float = None) -> None
def terminate(self) -> None
def get_reset_query(self) -> str
class Pool:
"""A connection pool."""
def get_size(self) -> int
def get_min_size(self) -> int
def get_max_size(self) -> int
def get_idle_size(self) -> int
def is_closing(self) -> bool
async def acquire(self, *, timeout: float = None) -> PoolAcquireContext
async def release(self, connection: Connection, *, timeout: float = None) -> None
async def close(self) -> None
def terminate(self) -> None
async def expire_connections(self) -> None
def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None
class PreparedStatement:
"""A prepared statement."""
def get_name(self) -> str
def get_query(self) -> str
def get_statusmsg(self) -> str
def get_parameters(self) -> typing.Tuple[Type, ...]
def get_attributes(self) -> typing.Tuple[Attribute, ...]
class CursorFactory:
"""Factory for creating cursors."""
def __aiter__(self) -> CursorIterator
def __await__(self) -> Cursor
class Transaction:
"""A transaction context manager."""
async def start(self) -> None
async def commit(self) -> None
async def rollback(self) -> None
async def __aenter__(self) -> Transaction
async def __aexit__(self, extype, ex, tb) -> Noneclass Record:
"""Query result record with dict-like and tuple-like access."""
def get(self, key: str, default=None): ...
def keys(self) -> Iterator[str]: ...
def values(self) -> Iterator: ...
def items(self) -> Iterator[tuple[str, typing.Any]]: ...
def __getitem__(self, key: typing.Union[str, int, slice]): ...
def __len__(self) -> int: ...class Type:
"""Database data type information."""
oid: int
name: str
kind: str
schema: str
class Attribute:
"""Database attribute information."""
name: str
type: Type
class ServerVersion:
"""PostgreSQL server version tuple."""
major: int
minor: int
micro: int
releaselevel: str
serial: int
class Range:
"""PostgreSQL range type representation."""
lower: typing.Any
upper: typing.Any
lower_inc: bool
upper_inc: bool
lower_inf: bool
upper_inf: bool
isempty: bool
def issubset(self, other: Range) -> bool
def issuperset(self, other: Range) -> bool
class BitString:
"""PostgreSQL bit string type."""
value: str
class Point:
"""PostgreSQL point geometric type."""
x: float
y: float
class Box:
"""PostgreSQL box geometric type."""
high: Point
low: Point
class Path:
"""PostgreSQL path geometric type."""
is_closed: bool
points: typing.List[Point]
class Polygon:
"""PostgreSQL polygon geometric type."""
points: typing.List[Point]
class Line:
"""PostgreSQL line geometric type."""
a: float
b: float
c: float
class LineSegment:
"""PostgreSQL line segment geometric type."""
p1: Point
p2: Point
class Circle:
"""PostgreSQL circle geometric type."""
center: Point
radius: float
class ConnectionSettings:
"""Connection configuration and settings."""
...