or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md
tile.json

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/asyncpg@0.30.x

To install, run

npx @tessl/cli install tessl/pypi-asyncpg@0.30.0

index.mddocs/

AsyncPG

A high-performance, asyncio-native PostgreSQL database client designed specifically for Python's async/await syntax. AsyncPG implements the PostgreSQL server binary protocol directly, enabling advanced PostgreSQL features like prepared statements, scrollable cursors, and automatic encoding/decoding of composite types while maintaining maximum performance through Cython extensions.

Package Information

  • Package Name: asyncpg
  • Language: Python
  • Installation: pip install asyncpg
  • Optional Dependencies: pip install asyncpg[gssauth] for GSSAPI/SSPI authentication

Core Imports

import asyncpg

Common patterns:

# Direct connection
connection = await asyncpg.connect('postgresql://user:pass@localhost/dbname')

# Connection pooling
pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')

# Exception handling
from asyncpg import PostgresError, UniqueViolationError

Basic Usage

import asyncio
import asyncpg

async def main():
    # Establish a connection to the database
    conn = await asyncpg.connect('postgresql://user:pass@localhost/dbname')
    
    # Execute a simple query
    result = await conn.execute(
        "CREATE TABLE users(id serial, name text, email text)"
    )
    
    # Insert data
    await conn.execute(
        "INSERT INTO users(name, email) VALUES($1, $2)", 
        "Alice", "alice@example.com"
    )
    
    # Fetch multiple rows
    rows = await conn.fetch("SELECT * FROM users WHERE name = $1", "Alice")
    for row in rows:
        print(f"User: {row['name']} ({row['email']})")
    
    # Fetch a single value
    user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
    print(f"Total users: {user_count}")
    
    # Clean up
    await conn.close()

# Using connection pool
async def pool_example():
    pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')
    
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT * FROM users LIMIT 10")
        return result
    
    await pool.close()

asyncio.run(main())

Architecture

AsyncPG is built around several core components that provide comprehensive PostgreSQL functionality:

  • Connection: Single database session with query execution, transaction management, and type handling
  • Pool: Connection pooling with automatic lifecycle management and load balancing
  • Protocol: Binary protocol implementation optimized for performance with Cython
  • Record: Query result representation with both dict-like and tuple-like access patterns
  • Type System: Automatic encoding/decoding for PostgreSQL data types, arrays, and composite types
  • Transaction Management: Full transaction support including savepoints and isolation levels

This design enables asyncpg to serve as both a high-level database interface and a foundation for advanced PostgreSQL applications requiring maximum performance and full feature access.

Capabilities

Connection Management

Core functionality for establishing and managing database connections, including authentication, SSL/TLS support, and connection lifecycle management.

async def connect(
    dsn: str = None,
    *,
    host: str = None,
    port: int = None,
    user: str = None,
    password: typing.Union[str, typing.Callable[[], str]] = None,
    passfile: str = None,
    database: str = None,
    loop: asyncio.AbstractEventLoop = None,
    timeout: float = 60,
    statement_cache_size: int = 100,
    max_cached_statement_lifetime: float = 300,
    max_cacheable_statement_size: int = 15360,
    command_timeout: float = None,
    ssl: typing.Union[bool, ssl.SSLContext] = None,
    direct_tls: bool = None,
    connection_class: type = Connection,
    record_class: type = Record,
    server_settings: typing.Dict[str, str] = None,
    target_session_attrs: str = None,
    krbsrvname: str = None,
    gsslib: str = None
) -> Connection

Connection Management

Query Execution

Comprehensive query execution methods supporting various result formats, parameterized queries, prepared statements, and bulk operations.

async def execute(self, query: str, *args, timeout: float = None) -> str
async def executemany(self, command: str, args, *, timeout: float = None) -> None
async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None) -> typing.Any
async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[Record]

Query Execution

Prepared Statements

High-performance reusable query execution with server-side statement caching and optimized parameter binding.

async def prepare(
    self,
    query: str,
    *,
    name: str = None,
    timeout: float = None,
    record_class: type = None
) -> PreparedStatement

Prepared Statements

Cursor Operations

Scrollable cursors for efficient traversal of large result sets with configurable prefetch and memory management.

def cursor(
    self,
    query: str,
    *args,
    prefetch: int = None,
    timeout: float = None,
    record_class: type = None
) -> CursorFactory

Cursor Operations

Connection Pooling

Advanced connection pooling with configurable pool sizes, connection lifecycle management, and transparent connection acquisition/release.

def create_pool(
    dsn: str = None,
    *,
    min_size: int = 10,
    max_size: int = 10,
    max_queries: int = 50000,
    max_inactive_connection_lifetime: float = 300.0,
    connect: typing.Callable = None,
    setup: typing.Callable = None,
    init: typing.Callable = None,
    reset: typing.Callable = None,
    loop: asyncio.AbstractEventLoop = None,
    connection_class: type = Connection,
    record_class: type = Record,
    **connect_kwargs
) -> Pool

Connection Pooling

Transaction Management

Full transaction support including transaction contexts, savepoints, isolation levels, and read-only transactions.

def transaction(
    self, 
    *, 
    isolation: str = None, 
    readonly: bool = False, 
    deferrable: bool = False
) -> Transaction: ...

Transaction Management

COPY Operations

High-performance bulk data import/export using PostgreSQL's COPY protocol with support for various formats and streaming.

async def copy_from_table(
    self,
    table_name: str,
    *,
    output,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str

async def copy_from_query(
    self,
    query: str,
    *args,
    output,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str

async def copy_to_table(
    self,
    table_name: str,
    *,
    source,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    freeze: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    force_not_null: typing.List[str] = None,
    force_null: typing.List[str] = None,
    encoding: str = None,
    where: str = None
) -> str

async def copy_records_to_table(
    self,
    table_name: str,
    *,
    records,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    where: str = None
) -> str

COPY Operations

Type System and Codecs

Comprehensive type system supporting all PostgreSQL data types, custom type registration, and automatic encoding/decoding.

async def set_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public',
    encoder: typing.Callable,
    decoder: typing.Callable,
    format: str = 'text'
) -> None

async def reset_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public'
) -> None

async def set_builtin_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public',
    codec_name: str,
    format: str = None
) -> None

async def reload_schema_state(self) -> None

Type System

Exception Handling

Complete exception hierarchy mapping PostgreSQL error codes to Python exceptions with detailed error information.

class PostgresError(Exception): ...
class FatalPostgresError(PostgresError): ...
class InterfaceError(Exception): ...
class InterfaceWarning(UserWarning): ...
class DataError(InterfaceError, ValueError): ...
class IntegrityConstraintViolationError(PostgresError): ...
class UniqueViolationError(IntegrityConstraintViolationError): ...
class ForeignKeyViolationError(IntegrityConstraintViolationError): ...
class NotNullViolationError(IntegrityConstraintViolationError): ...
class CheckViolationError(IntegrityConstraintViolationError): ...

Exception Handling

Listeners and Notifications

Support for PostgreSQL's LISTEN/NOTIFY functionality and server log message handling.

async def add_listener(self, channel: str, callback: typing.Callable) -> None
async def remove_listener(self, channel: str, callback: typing.Callable) -> None
def add_log_listener(self, callback: typing.Callable) -> None
def remove_log_listener(self, callback: typing.Callable) -> None
def add_termination_listener(self, callback: typing.Callable) -> None
def remove_termination_listener(self, callback: typing.Callable) -> None
def add_query_logger(self, callback: typing.Callable) -> None
def remove_query_logger(self, callback: typing.Callable) -> None

Listeners and Notifications

Types

Core Connection Types

class Connection:
    """A representation of a database session."""
    
    def is_closed(self) -> bool
    def get_server_pid(self) -> int
    def get_server_version(self) -> ServerVersion
    def get_settings(self) -> ConnectionSettings
    def is_in_transaction(self) -> bool
    async def close(self, *, timeout: float = None) -> None
    async def reset(self, *, timeout: float = None) -> None
    def terminate(self) -> None
    def get_reset_query(self) -> str

class Pool:
    """A connection pool."""
    
    def get_size(self) -> int
    def get_min_size(self) -> int
    def get_max_size(self) -> int
    def get_idle_size(self) -> int
    def is_closing(self) -> bool
    async def acquire(self, *, timeout: float = None) -> PoolAcquireContext
    async def release(self, connection: Connection, *, timeout: float = None) -> None
    async def close(self) -> None
    def terminate(self) -> None
    async def expire_connections(self) -> None
    def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None

class PreparedStatement:
    """A prepared statement."""
    
    def get_name(self) -> str
    def get_query(self) -> str
    def get_statusmsg(self) -> str
    def get_parameters(self) -> typing.Tuple[Type, ...]
    def get_attributes(self) -> typing.Tuple[Attribute, ...]

class CursorFactory:
    """Factory for creating cursors."""
    
    def __aiter__(self) -> CursorIterator
    def __await__(self) -> Cursor

class Transaction:
    """A transaction context manager."""
    
    async def start(self) -> None
    async def commit(self) -> None
    async def rollback(self) -> None
    async def __aenter__(self) -> Transaction
    async def __aexit__(self, extype, ex, tb) -> None

Query Result Types

class Record:
    """Query result record with dict-like and tuple-like access."""
    
    def get(self, key: str, default=None): ...
    def keys(self) -> Iterator[str]: ...
    def values(self) -> Iterator: ...
    def items(self) -> Iterator[tuple[str, typing.Any]]: ...
    def __getitem__(self, key: typing.Union[str, int, slice]): ...
    def __len__(self) -> int: ...

PostgreSQL Data Types

class Type:
    """Database data type information."""
    oid: int
    name: str
    kind: str
    schema: str

class Attribute:
    """Database attribute information."""
    name: str
    type: Type

class ServerVersion:
    """PostgreSQL server version tuple."""
    major: int
    minor: int
    micro: int
    releaselevel: str
    serial: int

class Range:
    """PostgreSQL range type representation."""
    lower: typing.Any
    upper: typing.Any
    lower_inc: bool
    upper_inc: bool
    lower_inf: bool
    upper_inf: bool
    isempty: bool
    
    def issubset(self, other: Range) -> bool
    def issuperset(self, other: Range) -> bool

class BitString:
    """PostgreSQL bit string type."""
    value: str

class Point:
    """PostgreSQL point geometric type."""
    x: float
    y: float

class Box:
    """PostgreSQL box geometric type."""
    high: Point
    low: Point

class Path:
    """PostgreSQL path geometric type."""
    is_closed: bool
    points: typing.List[Point]

class Polygon:
    """PostgreSQL polygon geometric type."""
    points: typing.List[Point]

class Line:
    """PostgreSQL line geometric type."""
    a: float
    b: float
    c: float

class LineSegment:
    """PostgreSQL line segment geometric type."""
    p1: Point
    p2: Point

class Circle:
    """PostgreSQL circle geometric type."""
    center: Point
    radius: float

class ConnectionSettings:
    """Connection configuration and settings."""
    ...