An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Scrollable cursors for efficient traversal of large result sets with configurable prefetch and memory management. Cursors provide an iterator-based interface for handling query results that are too large to fit in memory at once.
Create cursors for query results with configurable prefetch settings and memory management.
def cursor(
self,
query: str,
*args,
prefetch: int = None,
timeout: float = None,
record_class: type = None
) -> CursorFactoryParameters:
query: SQL query string with optional parameter placeholders*args: Query parametersprefetch: Number of rows to prefetch (default: 50)timeout: Query execution timeout in secondsrecord_class: Custom record class for query resultsUsage Example:
import asyncpg
async def example():
conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
# Create a cursor for a large result set
cursor_factory = conn.cursor(
'SELECT * FROM large_table WHERE status = $1',
'active',
prefetch=100
)
# Iterate through results
async for row in cursor_factory:
process_row(row)
await conn.close()Use cursors as async context managers for automatic resource cleanup.
async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> NoneUsage Example:
async with conn.cursor('SELECT * FROM users ORDER BY id') as cursor:
async for row in cursor:
print(f"User: {row['name']} ({row['email']})")Cursors implement the async iterator protocol for seamless integration with async for loops.
def __aiter__(self) -> CursorIterator
async def __anext__(self) -> RecordUsage Example:
cursor = conn.cursor('SELECT * FROM products WHERE category = $1', 'electronics')
async for product in cursor:
if product['price'] > 100:
print(f"Premium product: {product['name']}")
# Can break early to save resources
if product['id'] > 1000:
breakControl cursor behavior and retrieve cursor state information.
async def forward(self, count: int) -> typing.List[Record]
async def backwards(self, count: int) -> typing.List[Record]
def get_prefetch_size(self) -> int
def get_query(self) -> str
def get_args(self) -> typing.TupleUsage Example:
cursor = await conn.cursor('SELECT * FROM logs ORDER BY timestamp DESC')
# Move forward by specific amounts
recent_logs = await cursor.forward(10)
more_logs = await cursor.forward(20)
# Check cursor state
print(f"Prefetch size: {cursor.get_prefetch_size()}")
print(f"Query: {cursor.get_query()}")class CursorFactory:
"""Factory for creating cursors with async iteration support."""
def __aiter__(self) -> CursorIterator
def __await__(self) -> Cursor
async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
class Cursor:
"""A cursor for iterating through query results."""
async def forward(self, count: int) -> typing.List[Record]
async def backwards(self, count: int) -> typing.List[Record]
def get_prefetch_size(self) -> int
def get_query(self) -> str
def get_args(self) -> typing.Tuple
async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
class CursorIterator:
"""Async iterator for cursor results."""
def __aiter__(self) -> CursorIterator
async def __anext__(self) -> RecordCursors provide efficient memory usage for large result sets:
Configure how many rows are fetched at once to balance memory usage and network efficiency:
# Small prefetch for memory-constrained environments
cursor = conn.cursor('SELECT * FROM huge_table', prefetch=10)
# Large prefetch for high-throughput scenarios
cursor = conn.cursor('SELECT * FROM logs', prefetch=1000)
# Default prefetch (typically 50 rows)
cursor = conn.cursor('SELECT * FROM users')Process results without loading entire result set into memory:
async def process_large_dataset():
async with conn.cursor('SELECT * FROM analytics_data') as cursor:
batch = []
async for row in cursor:
batch.append(row)
# Process in batches to control memory usage
if len(batch) >= 100:
await process_batch(batch)
batch.clear()
# Process remaining rows
if batch:
await process_batch(batch)