An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive query execution capabilities supporting various result formats, parameterized queries, prepared statements, bulk operations, and cursors for large result sets.
Execute SQL commands and queries with automatic parameter binding and result processing.
async def execute(self, query: str, *args, timeout: float = None) -> str:
"""
Execute an SQL command (or commands).
Parameters:
query: SQL command to execute
*args: Query parameters for placeholders ($1, $2, etc.)
timeout: Query timeout in seconds
Returns:
Command status string (e.g., 'SELECT 5', 'INSERT 0 1', 'UPDATE 3')
"""
async def executemany(self, command: str, args, *, timeout: float = None) -> None:
"""
Execute an SQL command for each sequence of arguments.
Parameters:
command: SQL command template with placeholders
args: Sequence of argument tuples/lists
timeout: Query timeout in seconds
"""# Execute DDL commands
await conn.execute("CREATE TABLE users(id serial, name text, email text)")
await conn.execute("CREATE INDEX idx_users_email ON users(email)")
# Execute DML with parameters
result = await conn.execute(
"INSERT INTO users(name, email) VALUES($1, $2)",
"Alice", "alice@example.com"
)
print(result) # "INSERT 0 1"
# Batch insert
rows = [
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com"),
("David", "david@example.com")
]
await conn.executemany(
"INSERT INTO users(name, email) VALUES($1, $2)",
rows
)Retrieve query results in various formats optimized for different use cases.
async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]:
"""
Run a query and return all results as a list of Records.
Parameters:
query: SQL query to execute
*args: Query parameters for placeholders
timeout: Query timeout in seconds
record_class: Custom record class for results
Returns:
List of Record objects
"""
async def fetchval(self, query: str, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any:
"""
Run a query and return a single value from the first row.
Parameters:
query: SQL query to execute
*args: Query parameters for placeholders
column: Column index or name to return (default: 0)
timeout: Query timeout in seconds
Returns:
Single value or None if no results
"""
async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]:
"""
Run a query and return the first row.
Parameters:
query: SQL query to execute
*args: Query parameters for placeholders
timeout: Query timeout in seconds
record_class: Custom record class for result
Returns:
Record object or None if no results
"""
async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[typing.List[Record]]:
"""
Execute a query for each sequence of arguments and return all results.
Parameters:
query: SQL query template
args: Sequence of argument tuples/lists
timeout: Query timeout in seconds
record_class: Custom record class for results
Returns:
List of result lists, one per argument sequence
"""# Fetch all rows
users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
for user in users:
print(f"{user['name']} - {user['email']}")
# Fetch single value
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
print(f"Total users: {user_count}")
# Fetch first row
admin = await conn.fetchrow("SELECT * FROM users WHERE role = 'admin' LIMIT 1")
if admin:
print(f"Admin: {admin['name']}")
# Fetch by column name
latest_login = await conn.fetchval(
"SELECT last_login FROM users WHERE id = $1",
user_id,
column='last_login'
)
# Batch queries
user_ids = [1, 2, 3, 4, 5]
results = await conn.fetchmany(
"SELECT * FROM users WHERE id = $1",
[(uid,) for uid in user_ids]
)Create reusable prepared statements for improved performance with repeated queries.
async def prepare(self, query: str, *, name: str = None, timeout: float = None, record_class: type = None) -> PreparedStatement:
"""
Create a prepared statement for the specified query.
Parameters:
query: SQL query to prepare
name: Optional name for the prepared statement
timeout: Preparation timeout in seconds
record_class: Custom record class for results
Returns:
PreparedStatement instance
"""class PreparedStatement:
"""A prepared SQL statement."""
def get_name(self) -> str
def get_query(self) -> str
def get_statusmsg(self) -> str
def get_parameters(self) -> typing.Tuple[Type, ...]
def get_attributes(self) -> typing.Tuple[Attribute, ...]
async def fetch(self, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
async def fetchval(self, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any
async def fetchrow(self, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
async def execute(self, *args, timeout: float = None) -> str
async def executemany(self, args, *, timeout: float = None) -> None
def cursor(self, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory# Prepare a statement
stmt = await conn.prepare("SELECT * FROM users WHERE department = $1 AND active = $2")
# Use multiple times with different parameters
engineers = await stmt.fetch("Engineering", True)
sales = await stmt.fetch("Sales", True)
marketing = await stmt.fetch("Marketing", False)
# Prepared statement for updates
update_stmt = await conn.prepare("UPDATE users SET last_login = $1 WHERE id = $2")
await update_stmt.executemany([
(datetime.now(), 1),
(datetime.now(), 2),
(datetime.now(), 3)
])Handle large result sets efficiently with scrollable cursors and streaming.
def cursor(self, query: str, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory:
"""
Return a cursor factory for the specified query.
Parameters:
query: SQL query to execute
*args: Query parameters for placeholders
prefetch: Number of rows to prefetch (default: 50)
timeout: Query timeout in seconds
record_class: Custom record class for results
Returns:
Cursor factory (async context manager)
"""class CursorFactory:
"""Factory for creating cursors with async iteration support."""
def __aiter__(self) -> CursorIterator
def __await__(self) -> Cursor
async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
class Cursor:
"""Database cursor for iterating over large result sets."""
async def forward(self, count: int) -> typing.List[Record]
async def backwards(self, count: int) -> typing.List[Record]
def get_prefetch_size(self) -> int
def get_query(self) -> str
def get_args(self) -> typing.Tuple
async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
class CursorIterator:
"""Async iterator for cursor results."""
def __aiter__(self) -> CursorIterator
async def __anext__(self) -> Record# Process large result set with cursor
async with conn.cursor("SELECT * FROM large_table WHERE condition = $1", value) as cursor:
async for record in cursor:
process_record(record)
# Manual cursor control
async with conn.cursor("SELECT * FROM users ORDER BY id", prefetch=100) as cursor:
# Fetch first 50 rows
batch = await cursor.fetch(50)
while batch:
for row in batch:
print(f"User: {row['name']}")
# Fetch next batch
batch = await cursor.fetch(50)AsyncPG uses PostgreSQL's native parameter binding with numbered placeholders for security and performance.
# Correct parameter usage
users = await conn.fetch(
"SELECT * FROM users WHERE name = $1 AND age > $2",
"Alice", 25
)
# Multiple parameters
result = await conn.execute(
"UPDATE users SET email = $1, updated_at = $2 WHERE id = $3",
"newemail@example.com", datetime.now(), user_id
)
# Array parameters
ids = [1, 2, 3, 4, 5]
users = await conn.fetch("SELECT * FROM users WHERE id = ANY($1)", ids)
# JSON parameters
data = {"preferences": {"theme": "dark", "language": "en"}}
await conn.execute(
"UPDATE users SET metadata = $1 WHERE id = $2",
json.dumps(data), user_id
)Handle query execution errors with appropriate exception types.
try:
result = await conn.fetch("SELECT * FROM nonexistent_table")
except asyncpg.UndefinedTableError:
print("Table does not exist")
except asyncpg.PostgresSyntaxError:
print("SQL syntax error")
except asyncpg.DataError:
print("Data-related error")
except asyncio.TimeoutError:
print("Query timed out")class Record:
"""Query result record with dict-like and tuple-like access."""
def get(self, key: str, default: typing.Any = None) -> typing.Any
def keys(self) -> typing.Iterator[str]
def values(self) -> typing.Iterator[typing.Any]
def items(self) -> typing.Iterator[typing.Tuple[str, typing.Any]]
def __getitem__(self, key: typing.Union[str, int, slice]) -> typing.Any
def __len__(self) -> int
def __iter__(self) -> typing.Iterator[typing.Any]
def __contains__(self, key: object) -> bool
class PreparedStatement:
"""A prepared SQL statement for reuse."""
def get_name(self) -> str
def get_query(self) -> str
def get_statusmsg(self) -> str
def get_parameters(self) -> typing.Tuple[Type, ...]
def get_attributes(self) -> typing.Tuple[Attribute, ...]
class CursorFactory:
"""Factory for creating cursors."""
class Cursor:
"""Database cursor for streaming large result sets."""
class CursorIterator:
"""Async iterator for cursor results."""