CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

query-execution.mddocs/

Query Execution

Comprehensive query execution capabilities supporting various result formats, parameterized queries, prepared statements, bulk operations, and cursors for large result sets.

Capabilities

Basic Query Execution

Execute SQL commands and queries with automatic parameter binding and result processing.

async def execute(self, query: str, *args, timeout: float = None) -> str:
    """
    Execute an SQL command (or commands).
    
    Parameters:
    query: SQL command to execute
    *args: Query parameters for placeholders ($1, $2, etc.)
    timeout: Query timeout in seconds
    
    Returns:
    Command status string (e.g., 'SELECT 5', 'INSERT 0 1', 'UPDATE 3')
    """

async def executemany(self, command: str, args, *, timeout: float = None) -> None:
    """
    Execute an SQL command for each sequence of arguments.
    
    Parameters:
    command: SQL command template with placeholders
    args: Sequence of argument tuples/lists
    timeout: Query timeout in seconds
    """

Example Usage

# Execute DDL commands
await conn.execute("CREATE TABLE users(id serial, name text, email text)")
await conn.execute("CREATE INDEX idx_users_email ON users(email)")

# Execute DML with parameters  
result = await conn.execute(
    "INSERT INTO users(name, email) VALUES($1, $2)",
    "Alice", "alice@example.com"
)
print(result)  # "INSERT 0 1"

# Batch insert
rows = [
    ("Bob", "bob@example.com"),
    ("Charlie", "charlie@example.com"),
    ("David", "david@example.com")
]
await conn.executemany(
    "INSERT INTO users(name, email) VALUES($1, $2)", 
    rows
)

Result Fetching

Retrieve query results in various formats optimized for different use cases.

async def fetch(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]:
    """
    Run a query and return all results as a list of Records.
    
    Parameters:
    query: SQL query to execute
    *args: Query parameters for placeholders
    timeout: Query timeout in seconds
    record_class: Custom record class for results
    
    Returns:
    List of Record objects
    """

async def fetchval(self, query: str, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any:
    """
    Run a query and return a single value from the first row.
    
    Parameters: 
    query: SQL query to execute
    *args: Query parameters for placeholders
    column: Column index or name to return (default: 0)
    timeout: Query timeout in seconds
    
    Returns:
    Single value or None if no results
    """

async def fetchrow(self, query: str, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]:
    """
    Run a query and return the first row.
    
    Parameters:
    query: SQL query to execute
    *args: Query parameters for placeholders  
    timeout: Query timeout in seconds
    record_class: Custom record class for result
    
    Returns:
    Record object or None if no results
    """

async def fetchmany(self, query: str, args, *, timeout: float = None, record_class: type = None) -> typing.List[typing.List[Record]]:
    """
    Execute a query for each sequence of arguments and return all results.
    
    Parameters:
    query: SQL query template
    args: Sequence of argument tuples/lists
    timeout: Query timeout in seconds
    record_class: Custom record class for results
    
    Returns:
    List of result lists, one per argument sequence
    """

Example Usage

# Fetch all rows
users = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
for user in users:
    print(f"{user['name']} - {user['email']}")

# Fetch single value  
user_count = await conn.fetchval("SELECT COUNT(*) FROM users")
print(f"Total users: {user_count}")

# Fetch first row
admin = await conn.fetchrow("SELECT * FROM users WHERE role = 'admin' LIMIT 1")
if admin:
    print(f"Admin: {admin['name']}")

# Fetch by column name
latest_login = await conn.fetchval(
    "SELECT last_login FROM users WHERE id = $1", 
    user_id, 
    column='last_login'
)

# Batch queries
user_ids = [1, 2, 3, 4, 5] 
results = await conn.fetchmany(
    "SELECT * FROM users WHERE id = $1",
    [(uid,) for uid in user_ids]
)

Prepared Statements

Create reusable prepared statements for improved performance with repeated queries.

async def prepare(self, query: str, *, name: str = None, timeout: float = None, record_class: type = None) -> PreparedStatement:
    """
    Create a prepared statement for the specified query.
    
    Parameters:
    query: SQL query to prepare
    name: Optional name for the prepared statement
    timeout: Preparation timeout in seconds
    record_class: Custom record class for results
    
    Returns:
    PreparedStatement instance
    """

PreparedStatement Methods

class PreparedStatement:
    """A prepared SQL statement."""
    
    def get_name(self) -> str
    def get_query(self) -> str
    def get_statusmsg(self) -> str
    def get_parameters(self) -> typing.Tuple[Type, ...]
    def get_attributes(self) -> typing.Tuple[Attribute, ...]
    
    async def fetch(self, *args, timeout: float = None, record_class: type = None) -> typing.List[Record]
    async def fetchval(self, *args, column: typing.Union[int, str] = 0, timeout: float = None) -> typing.Any
    async def fetchrow(self, *args, timeout: float = None, record_class: type = None) -> typing.Optional[Record]
    async def execute(self, *args, timeout: float = None) -> str
    async def executemany(self, args, *, timeout: float = None) -> None
    
    def cursor(self, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory

Example Usage

# Prepare a statement
stmt = await conn.prepare("SELECT * FROM users WHERE department = $1 AND active = $2")

# Use multiple times with different parameters
engineers = await stmt.fetch("Engineering", True)
sales = await stmt.fetch("Sales", True) 
marketing = await stmt.fetch("Marketing", False)

# Prepared statement for updates
update_stmt = await conn.prepare("UPDATE users SET last_login = $1 WHERE id = $2")
await update_stmt.executemany([
    (datetime.now(), 1),
    (datetime.now(), 2), 
    (datetime.now(), 3)
])

Cursors

Handle large result sets efficiently with scrollable cursors and streaming.

def cursor(self, query: str, *args, prefetch: int = None, timeout: float = None, record_class: type = None) -> CursorFactory:
    """
    Return a cursor factory for the specified query.
    
    Parameters:
    query: SQL query to execute
    *args: Query parameters for placeholders
    prefetch: Number of rows to prefetch (default: 50) 
    timeout: Query timeout in seconds
    record_class: Custom record class for results
    
    Returns:
    Cursor factory (async context manager)
    """

Cursor Methods

class CursorFactory:
    """Factory for creating cursors with async iteration support."""
    
    def __aiter__(self) -> CursorIterator
    def __await__(self) -> Cursor
    async def __aenter__(self) -> Cursor
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

class Cursor:
    """Database cursor for iterating over large result sets."""
    
    async def forward(self, count: int) -> typing.List[Record]
    async def backwards(self, count: int) -> typing.List[Record]
    def get_prefetch_size(self) -> int
    def get_query(self) -> str
    def get_args(self) -> typing.Tuple
    
    async def __aenter__(self) -> Cursor
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

class CursorIterator:
    """Async iterator for cursor results."""
    
    def __aiter__(self) -> CursorIterator
    async def __anext__(self) -> Record

Example Usage

# Process large result set with cursor
async with conn.cursor("SELECT * FROM large_table WHERE condition = $1", value) as cursor:
    async for record in cursor:
        process_record(record)

# Manual cursor control
async with conn.cursor("SELECT * FROM users ORDER BY id", prefetch=100) as cursor:
    # Fetch first 50 rows
    batch = await cursor.fetch(50)
    
    while batch:
        for row in batch:
            print(f"User: {row['name']}")
        
        # Fetch next batch
        batch = await cursor.fetch(50)

Query Parameterization

AsyncPG uses PostgreSQL's native parameter binding with numbered placeholders for security and performance.

# Correct parameter usage
users = await conn.fetch(
    "SELECT * FROM users WHERE name = $1 AND age > $2", 
    "Alice", 25
)

# Multiple parameters
result = await conn.execute(
    "UPDATE users SET email = $1, updated_at = $2 WHERE id = $3",
    "newemail@example.com", datetime.now(), user_id
)

# Array parameters
ids = [1, 2, 3, 4, 5]
users = await conn.fetch("SELECT * FROM users WHERE id = ANY($1)", ids)

# JSON parameters
data = {"preferences": {"theme": "dark", "language": "en"}}
await conn.execute(
    "UPDATE users SET metadata = $1 WHERE id = $2",
    json.dumps(data), user_id
)

Error Handling

Handle query execution errors with appropriate exception types.

try:
    result = await conn.fetch("SELECT * FROM nonexistent_table")
except asyncpg.UndefinedTableError:
    print("Table does not exist")
except asyncpg.PostgresSyntaxError:
    print("SQL syntax error")
except asyncpg.DataError:
    print("Data-related error")
except asyncio.TimeoutError:
    print("Query timed out")

Types

class Record:
    """Query result record with dict-like and tuple-like access."""
    
    def get(self, key: str, default: typing.Any = None) -> typing.Any
    def keys(self) -> typing.Iterator[str]
    def values(self) -> typing.Iterator[typing.Any]
    def items(self) -> typing.Iterator[typing.Tuple[str, typing.Any]]
    def __getitem__(self, key: typing.Union[str, int, slice]) -> typing.Any
    def __len__(self) -> int
    def __iter__(self) -> typing.Iterator[typing.Any]
    def __contains__(self, key: object) -> bool

class PreparedStatement:
    """A prepared SQL statement for reuse."""
    
    def get_name(self) -> str
    def get_query(self) -> str
    def get_statusmsg(self) -> str
    def get_parameters(self) -> typing.Tuple[Type, ...]
    def get_attributes(self) -> typing.Tuple[Attribute, ...]

class CursorFactory:
    """Factory for creating cursors."""
    
class Cursor:
    """Database cursor for streaming large result sets."""
    
class CursorIterator:
    """Async iterator for cursor results."""

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json