An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive type system supporting all PostgreSQL data types, custom type registration, automatic encoding/decoding, geometric types, arrays, composite types, and performance-optimized codecs.
AsyncPG automatically handles all standard PostgreSQL data types with optimized codecs.
smallint, integer, bigint → Python intdecimal, numeric → Python decimal.Decimalreal, double precision → Python floatmoney → Python decimal.Decimalchar, varchar, text → Python strbytea → Python bytesdate → Python datetime.datetime, timetz → Python datetime.timetimestamp, timestamptz → Python datetime.datetimeinterval → Python datetime.timedeltaboolean → Python booluuid → Python uuid.UUIDjson, jsonb → Python dict, list, or JSON-serializable typesinet → Python ipaddress.IPv4Address or ipaddress.IPv6Addresscidr → Python ipaddress.IPv4Network or ipaddress.IPv6Networkmacaddr → Python strAutomatic handling of PostgreSQL arrays with full nesting support.
# Insert arrays
await conn.execute(
"INSERT INTO data(tags, scores) VALUES($1, $2)",
["python", "database", "async"], # text[]
[95, 87, 92] # integer[]
)
# Fetch arrays
row = await conn.fetchrow("SELECT tags, scores FROM data WHERE id = 1")
print(row['tags']) # ['python', 'database', 'async']
print(row['scores']) # [95, 87, 92]
# Multi-dimensional arrays
matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
await conn.execute("INSERT INTO matrices(data) VALUES($1)", matrix)
# Array operations in queries
users = await conn.fetch(
"SELECT * FROM users WHERE $1 = ANY(roles)",
"admin"
)Support for PostgreSQL's geometric data types with specialized Python classes.
# Geometric types from asyncpg.types
class Point:
"""PostgreSQL point type (x, y)."""
x: float
y: float
def __init__(self, x: float, y: float): ...
class Box:
"""PostgreSQL box type (rectangular area)."""
high: Point
low: Point
def __init__(self, high: Point, low: Point): ...
class Path:
"""PostgreSQL path type (series of connected points)."""
is_closed: bool
points: typing.List[Point]
def __init__(self, points: typing.List[Point], is_closed: bool = False): ...
class Polygon:
"""PostgreSQL polygon type (closed path)."""
points: typing.List[Point]
def __init__(self, points: typing.List[Point]): ...
class Line:
"""PostgreSQL line type (infinite line)."""
a: float
b: float
c: float
def __init__(self, a: float, b: float, c: float): ...
class LineSegment:
"""PostgreSQL line segment type (finite line)."""
p1: Point
p2: Point
def __init__(self, p1: Point, p2: Point): ...
class Circle:
"""PostgreSQL circle type (center point and radius)."""
center: Point
radius: float
def __init__(self, center: Point, radius: float): ...
class BitString:
"""PostgreSQL bit string type."""
value: str
def __init__(self, value: str): ...from asyncpg.types import Point, Box, Circle, Polygon
# Insert geometric data
location = Point(40.7128, -74.0060) # New York City
await conn.execute(
"INSERT INTO locations(name, coordinates) VALUES($1, $2)",
"NYC", location
)
# Create geometric objects
area = Box(Point(0, 0), Point(10, 10))
region = Circle(Point(5, 5), 3.0)
shape = Polygon([Point(0, 0), Point(5, 0), Point(5, 5), Point(0, 5)])
await conn.execute(
"INSERT INTO regions(area, circle, shape) VALUES($1, $2, $3)",
area, region, shape
)
# Query with geometric operations
nearby = await conn.fetch(
"SELECT * FROM locations WHERE coordinates <-> $1 < $2",
Point(40.7580, -73.9855), # Times Square
2.0 # Within 2 units
)Support for PostgreSQL range types with comprehensive range operations.
class Range:
"""PostgreSQL range type representation."""
def __init__(
self,
lower: typing.Any = None,
upper: typing.Any = None,
*,
lower_inc: bool = True,
upper_inc: bool = False,
empty: bool = False
) -> None: ...
@property
def lower(self) -> typing.Any: ...
@property
def upper(self) -> typing.Any: ...
@property
def lower_inc(self) -> bool: ...
@property
def upper_inc(self) -> bool: ...
@property
def lower_inf(self) -> bool: ...
@property
def upper_inf(self) -> bool: ...
@property
def isempty(self) -> bool: ...
def issubset(self, other: Range) -> bool: ...
def issuperset(self, other: Range) -> bool: ...from asyncpg.types import Range
from datetime import datetime, date
# Date ranges
availability = Range(
date(2024, 1, 1),
date(2024, 12, 31),
upper_inc=True
)
# Time ranges
business_hours = Range(
datetime(2024, 1, 1, 9, 0),
datetime(2024, 1, 1, 17, 0)
)
# Numeric ranges
price_range = Range(100, 500, lower_inc=True, upper_inc=False)
await conn.execute(
"INSERT INTO products(name, price_range, availability) VALUES($1, $2, $3)",
"Widget", price_range, availability
)
# Range queries
products = await conn.fetch(
"SELECT * FROM products WHERE price_range @> $1",
250 # Contains this value
)
overlapping = await conn.fetch(
"SELECT * FROM events WHERE time_range && $1",
business_hours # Overlaps with business hours
)Register custom encoders and decoders for specialized data types.
async def set_type_codec(
self,
typename: str,
*,
schema: str = 'public',
encoder: typing.Callable,
decoder: typing.Callable,
format: str = 'text'
) -> None:
"""
Set an encoder/decoder pair for the specified data type.
Parameters:
typename: PostgreSQL type name
schema: Schema containing the type
encoder: Function to encode Python values to PostgreSQL format
decoder: Function to decode PostgreSQL values to Python objects
format: Codec format ('text' or 'binary')
"""
async def reset_type_codec(
self,
typename: str,
*,
schema: str = 'public'
) -> None:
"""
Reset typename codec to the default implementation.
Parameters:
typename: PostgreSQL type name
schema: Schema containing the type
"""
async def set_builtin_type_codec(
self,
typename: str,
*,
schema: str = 'public',
codec_name: str,
format: str = None
) -> None:
"""
Set a builtin codec for the specified scalar data type.
Parameters:
typename: PostgreSQL type name
schema: Schema containing the type
codec_name: Name of builtin codec to use
format: Codec format ('text' or 'binary')
"""import json
from datetime import datetime
from decimal import Decimal
# JSON codec for custom JSON handling
def encode_json(value):
return json.dumps(value, ensure_ascii=False)
def decode_json(value):
return json.loads(value)
await conn.set_type_codec(
'json',
encoder=encode_json,
decoder=decode_json
)
# Custom enum codec
def encode_status(value):
return value.value if hasattr(value, 'value') else str(value)
def decode_status(value):
from enum import Enum
class Status(Enum):
PENDING = 'pending'
ACTIVE = 'active'
INACTIVE = 'inactive'
return Status(value)
await conn.set_type_codec(
'user_status',
encoder=encode_status,
decoder=decode_status
)
# Money type with custom precision
def encode_money(value):
"""Encode Decimal to money format."""
return str(value)
def decode_money(value):
"""Decode money to Decimal with proper precision."""
# Remove currency symbol and convert
cleaned = value.replace('$', '').replace(',', '')
return Decimal(cleaned)
await conn.set_type_codec(
'money',
encoder=encode_money,
decoder=decode_money
)Automatic handling of PostgreSQL composite types (user-defined record types).
# Create composite type in PostgreSQL
await conn.execute("""
CREATE TYPE address AS (
street text,
city text,
state text,
zip_code text
)
""")
# Python representation
class Address:
def __init__(self, street, city, state, zip_code):
self.street = street
self.city = city
self.state = state
self.zip_code = zip_code
# Register composite type codec
def encode_address(addr):
return (addr.street, addr.city, addr.state, addr.zip_code)
def decode_address(row):
return Address(*row)
await conn.set_type_codec(
'address',
encoder=encode_address,
decoder=decode_address
)
# Use composite type
home_address = Address("123 Main St", "Anytown", "NY", "12345")
await conn.execute(
"INSERT INTO users(name, address) VALUES($1, $2)",
"John Doe", home_address
)
user = await conn.fetchrow("SELECT name, address FROM users WHERE id = 1")
print(f"User: {user['name']}")
print(f"Address: {user['address'].street}, {user['address'].city}")Access PostgreSQL type information for dynamic type handling.
class Type:
"""Database data type information."""
oid: int # PostgreSQL type OID
name: str # Type name (e.g., 'int4', 'text')
kind: str # Type kind ('scalar', 'array', 'composite', 'range')
schema: str # Schema name
class Attribute:
"""Database relation attribute."""
name: str # Attribute name
type: Type # Attribute data type# Get type information
type_info = await conn.fetchrow(
"SELECT * FROM pg_type WHERE typname = $1",
'json'
)
# Introspect table structure
columns = await conn.fetch("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
""", 'users')
# Dynamic type handling based on introspection
for col in columns:
print(f"Column: {col['column_name']}, Type: {col['data_type']}")
# Set appropriate codec based on type
if col['data_type'] == 'jsonb':
await conn.set_type_codec(
'jsonb',
encoder=json.dumps,
decoder=json.loads
)Manage type system state and handle schema changes.
async def reload_schema_state(self) -> None:
"""
Indicate that database schema information must be reloaded.
Call this after schema changes that affect type definitions,
such as creating new types, modifying composite types, or
changing type codecs.
"""# After schema changes, reload type information
await conn.execute("CREATE TYPE new_enum AS ENUM ('a', 'b', 'c')")
await conn.reload_schema_state()
# Register codec for new type
def encode_enum(value):
return value.value if hasattr(value, 'value') else str(value)
def decode_enum(value):
from enum import Enum
class NewEnum(Enum):
A = 'a'
B = 'b'
C = 'c'
return NewEnum(value)
await conn.set_type_codec('new_enum', encoder=encode_enum, decoder=decode_enum)Optimize type handling for maximum performance.
# Use binary format for better performance with large datasets
await conn.set_type_codec(
'large_json_type',
encoder=json.dumps,
decoder=json.loads,
format='binary' # Faster for large values
)
# Pre-register commonly used codecs at connection setup
async def setup_connection(conn):
"""Setup function called for each new connection."""
# JSON codec
await conn.set_type_codec(
'jsonb',
encoder=json.dumps,
decoder=json.loads
)
# Custom enum types
await conn.set_type_codec('status_enum', encoder=str, decoder=StatusEnum)
# UUID codec optimization
await conn.set_builtin_type_codec('uuid', codec_name='uuid')
# Use setup function with connection pool
pool = asyncpg.create_pool(dsn, setup=setup_connection)# Core type system classes
class Type:
"""Database data type information."""
oid: int
name: str
kind: str
schema: str
class Attribute:
"""Database relation attribute."""
name: str
type: Type
class ServerVersion:
"""PostgreSQL server version tuple."""
major: int
minor: int
micro: int
releaselevel: str
serial: int
# Geometric types
class Point:
x: float
y: float
def __init__(self, x: float, y: float): ...
class Box:
high: Point
low: Point
def __init__(self, high: Point, low: Point): ...
class Path:
is_closed: bool
points: typing.List[Point]
def __init__(self, points: typing.List[Point], is_closed: bool = False): ...
class Polygon:
points: typing.List[Point]
def __init__(self, points: typing.List[Point]): ...
class Line:
a: float
b: float
c: float
def __init__(self, a: float, b: float, c: float): ...
class LineSegment:
p1: Point
p2: Point
def __init__(self, p1: Point, p2: Point): ...
class Circle:
center: Point
radius: float
def __init__(self, center: Point, radius: float): ...
class BitString:
value: str
def __init__(self, value: str): ...
# Range types
class Range:
lower: typing.Any
upper: typing.Any
lower_inc: bool
upper_inc: bool
lower_inf: bool
upper_inf: bool
isempty: bool
def __init__(self, lower: typing.Any = None, upper: typing.Any = None, *, lower_inc: bool = True, upper_inc: bool = False, empty: bool = False): ...
def issubset(self, other: Range) -> bool: ...
def issuperset(self, other: Range) -> bool: ...
# Type codec signatures
Encoder = typing.Callable[[typing.Any], typing.Union[str, bytes]]
Decoder = typing.Callable[[typing.Union[str, bytes]], typing.Any]