CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

AsyncPG

A high-performance, asyncio-native PostgreSQL database client designed specifically for Python's async/await syntax. AsyncPG implements the PostgreSQL server binary protocol directly, enabling advanced PostgreSQL features like prepared statements, scrollable cursors, and automatic encoding/decoding of composite types while maintaining maximum performance through Cython extensions.

Package Information

  • Package Name: asyncpg
  • Language: Python
  • Installation: pip install asyncpg
  • Optional Dependencies: pip install asyncpg[gssauth] for GSSAPI/SSPI authentication

Core Imports

import asyncpg

Common patterns:

# Direct connection
connection = await asyncpg.connect('postgresql://user:pass@localhost/dbname')

# Connection pooling
pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')

# Exception handling
from asyncpg import PostgresError, UniqueViolationError

Basic Usage

import asyncio
import asyncpg

async def main():
    # Establish a connection to the database
    conn = await asyncpg.connect('postgresql://user:pass@localhost/dbname')
    
    # Execute a simple query
    result = await conn.execute(
        "CREATE TABLE users(id serial, name text, email text)"
    )
    
    # Insert data
    await conn.execute(
        "INSERT INTO users(name, email) VALUES($1, $2)", 
        "Alice", "alice@example.com"
    )
    
    # Fetch multiple rows
    rows = await conn.fetch("SELECT * FROM users WHERE name = $1", "Alice")
    for row in rows:
        print(f"User: {row['name']} ({row['email']})")
    
    # Fetch a single value
    user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
    print(f"Total users: {user_count}")
    
    # Clean up
    await conn.close()

# Using connection pool
async def pool_example():
    pool = asyncpg.create_pool('postgresql://user:pass@localhost/dbname')
    
    async with pool.acquire() as conn:
        result = await conn.fetch("SELECT * FROM users LIMIT 10")
        return result
    
    await pool.close()

asyncio.run(main())

Architecture

AsyncPG is built around several core components that provide comprehensive PostgreSQL functionality:

  • Connection: Single database session with query execution, transaction management, and type handling
  • Pool: Connection pooling with automatic lifecycle management and load balancing
  • Protocol: Binary protocol implementation optimized for performance with Cython
  • Record: Query result representation with both dict-like and tuple-like access patterns
  • Type System: Automatic encoding/decoding for PostgreSQL data types, arrays, and composite types
  • Transaction Management: Full transaction support including savepoints and isolation levels

This design enables asyncpg to serve as both a high-level database interface and a foundation for advanced PostgreSQL applications requiring maximum performance and full feature access.

Capabilities

Connection Management

Core functionality for establishing and managing database connections, including authentication, SSL/TLS support, and connection lifecycle management.

async def connect(
    dsn: str = None,
    *,
    host: str = None,
    port: int = None,
    user: str = None,
    password: typing.Union[str, typing.Callable[[], str]] = None,
    passfile: str = None,
    database: str = None,
    loop: asyncio.AbstractEventLoop = None,
    timeout: float = 60,
    statement_cache_size: int = 100,
    max_cached_statement_lifetime: float = 300,
    max_cacheable_statement_size: int = 15360,
    command_timeout: float = None,
    ssl: typing.Union[bool, ssl.SSLContext] = None,
    direct_tls: bool = None,
    connection_class: type = Connection,
    record_class: type = Record,
    server_settings: typing.Dict[str, str] = None,
    target_session_attrs: str = None,
    krbsrvname: str = None,
    gsslib: str = None
) -> Connection

Connection Management

Query Execution

Comprehensive query execution methods supporting various result formats, parameterized queries, prepared statements, and bulk operations.

async def execute(self, query: str, *args, timeout: float = None) -> str
async def executemany(self, command: str, args, *, timeout: float = None) -> None
async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
async def fetchval(self, query: str, *args, column: int = 0, timeout: float = None) -> typing.Any
async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[Record]

Query Execution

Prepared Statements

High-performance reusable query execution with server-side statement caching and optimized parameter binding.

async def prepare(
    self,
    query: str,
    *,
    name: str = None,
    timeout: float = None,
    record_class: type = None
) -> PreparedStatement

Prepared Statements

Cursor Operations

Scrollable cursors for efficient traversal of large result sets with configurable prefetch and memory management.

def cursor(
    self,
    query: str,
    *args,
    prefetch: int = None,
    timeout: float = None,
    record_class: type = None
) -> CursorFactory

Cursor Operations

Connection Pooling

Advanced connection pooling with configurable pool sizes, connection lifecycle management, and transparent connection acquisition/release.

def create_pool(
    dsn: str = None,
    *,
    min_size: int = 10,
    max_size: int = 10,
    max_queries: int = 50000,
    max_inactive_connection_lifetime: float = 300.0,
    connect: typing.Callable = None,
    setup: typing.Callable = None,
    init: typing.Callable = None,
    reset: typing.Callable = None,
    loop: asyncio.AbstractEventLoop = None,
    connection_class: type = Connection,
    record_class: type = Record,
    **connect_kwargs
) -> Pool

Connection Pooling

Transaction Management

Full transaction support including transaction contexts, savepoints, isolation levels, and read-only transactions.

def transaction(
    self, 
    *, 
    isolation: str = None, 
    readonly: bool = False, 
    deferrable: bool = False
) -> Transaction: ...

Transaction Management

COPY Operations

High-performance bulk data import/export using PostgreSQL's COPY protocol with support for various formats and streaming.

async def copy_from_table(
    self,
    table_name: str,
    *,
    output,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str

async def copy_from_query(
    self,
    query: str,
    *args,
    output,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str

async def copy_to_table(
    self,
    table_name: str,
    *,
    source,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    freeze: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    force_not_null: typing.List[str] = None,
    force_null: typing.List[str] = None,
    encoding: str = None,
    where: str = None
) -> str

async def copy_records_to_table(
    self,
    table_name: str,
    *,
    records,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    where: str = None
) -> str

COPY Operations

Type System and Codecs

Comprehensive type system supporting all PostgreSQL data types, custom type registration, and automatic encoding/decoding.

async def set_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public',
    encoder: typing.Callable,
    decoder: typing.Callable,
    format: str = 'text'
) -> None

async def reset_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public'
) -> None

async def set_builtin_type_codec(
    self,
    typename: str,
    *,
    schema: str = 'public',
    codec_name: str,
    format: str = None
) -> None

async def reload_schema_state(self) -> None

Type System

Exception Handling

Complete exception hierarchy mapping PostgreSQL error codes to Python exceptions with detailed error information.

class PostgresError(Exception): ...
class FatalPostgresError(PostgresError): ...
class InterfaceError(Exception): ...
class InterfaceWarning(UserWarning): ...
class DataError(InterfaceError, ValueError): ...
class IntegrityConstraintViolationError(PostgresError): ...
class UniqueViolationError(IntegrityConstraintViolationError): ...
class ForeignKeyViolationError(IntegrityConstraintViolationError): ...
class NotNullViolationError(IntegrityConstraintViolationError): ...
class CheckViolationError(IntegrityConstraintViolationError): ...

Exception Handling

Listeners and Notifications

Support for PostgreSQL's LISTEN/NOTIFY functionality and server log message handling.

async def add_listener(self, channel: str, callback: typing.Callable) -> None
async def remove_listener(self, channel: str, callback: typing.Callable) -> None
def add_log_listener(self, callback: typing.Callable) -> None
def remove_log_listener(self, callback: typing.Callable) -> None
def add_termination_listener(self, callback: typing.Callable) -> None
def remove_termination_listener(self, callback: typing.Callable) -> None
def add_query_logger(self, callback: typing.Callable) -> None
def remove_query_logger(self, callback: typing.Callable) -> None

Listeners and Notifications

Types

Core Connection Types

class Connection:
    """A representation of a database session."""
    
    def is_closed(self) -> bool
    def get_server_pid(self) -> int
    def get_server_version(self) -> ServerVersion
    def get_settings(self) -> ConnectionSettings
    def is_in_transaction(self) -> bool
    async def close(self, *, timeout: float = None) -> None
    async def reset(self, *, timeout: float = None) -> None
    def terminate(self) -> None
    def get_reset_query(self) -> str

class Pool:
    """A connection pool."""
    
    def get_size(self) -> int
    def get_min_size(self) -> int
    def get_max_size(self) -> int
    def get_idle_size(self) -> int
    def is_closing(self) -> bool
    async def acquire(self, *, timeout: float = None) -> PoolAcquireContext
    async def release(self, connection: Connection, *, timeout: float = None) -> None
    async def close(self) -> None
    def terminate(self) -> None
    async def expire_connections(self) -> None
    def set_connect_args(self, dsn: str = None, **connect_kwargs) -> None

class PreparedStatement:
    """A prepared statement."""
    
    def get_name(self) -> str
    def get_query(self) -> str
    def get_statusmsg(self) -> str
    def get_parameters(self) -> typing.Tuple[Type, ...]
    def get_attributes(self) -> typing.Tuple[Attribute, ...]

class CursorFactory:
    """Factory for creating cursors."""
    
    def __aiter__(self) -> CursorIterator
    def __await__(self) -> Cursor

class Transaction:
    """A transaction context manager."""
    
    async def start(self) -> None
    async def commit(self) -> None
    async def rollback(self) -> None
    async def __aenter__(self) -> Transaction
    async def __aexit__(self, extype, ex, tb) -> None

Query Result Types

class Record:
    """Query result record with dict-like and tuple-like access."""
    
    def get(self, key: str, default=None): ...
    def keys(self) -> Iterator[str]: ...
    def values(self) -> Iterator: ...
    def items(self) -> Iterator[tuple[str, typing.Any]]: ...
    def __getitem__(self, key: typing.Union[str, int, slice]): ...
    def __len__(self) -> int: ...

PostgreSQL Data Types

class Type:
    """Database data type information."""
    oid: int
    name: str
    kind: str
    schema: str

class Attribute:
    """Database attribute information."""
    name: str
    type: Type

class ServerVersion:
    """PostgreSQL server version tuple."""
    major: int
    minor: int
    micro: int
    releaselevel: str
    serial: int

class Range:
    """PostgreSQL range type representation."""
    lower: typing.Any
    upper: typing.Any
    lower_inc: bool
    upper_inc: bool
    lower_inf: bool
    upper_inf: bool
    isempty: bool
    
    def issubset(self, other: Range) -> bool
    def issuperset(self, other: Range) -> bool

class BitString:
    """PostgreSQL bit string type."""
    value: str

class Point:
    """PostgreSQL point geometric type."""
    x: float
    y: float

class Box:
    """PostgreSQL box geometric type."""
    high: Point
    low: Point

class Path:
    """PostgreSQL path geometric type."""
    is_closed: bool
    points: typing.List[Point]

class Polygon:
    """PostgreSQL polygon geometric type."""
    points: typing.List[Point]

class Line:
    """PostgreSQL line geometric type."""
    a: float
    b: float
    c: float

class LineSegment:
    """PostgreSQL line segment geometric type."""
    p1: Point
    p2: Point

class Circle:
    """PostgreSQL circle geometric type."""
    center: Point
    radius: float

class ConnectionSettings:
    """Connection configuration and settings."""
    ...
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/asyncpg@0.30.x
Publish Source
CLI
Badge
tessl/pypi-asyncpg badge