CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cursor-operations.mddocs/

Cursor Operations

Scrollable cursors for efficient traversal of large result sets with configurable prefetch and memory management. Cursors provide an iterator-based interface for handling query results that are too large to fit in memory at once.

Capabilities

Cursor Creation

Create cursors for query results with configurable prefetch settings and memory management.

def cursor(
    self,
    query: str,
    *args,
    prefetch: int = None,
    timeout: float = None,
    record_class: type = None
) -> CursorFactory

Parameters:

  • query: SQL query string with optional parameter placeholders
  • *args: Query parameters
  • prefetch: Number of rows to prefetch (default: 50)
  • timeout: Query execution timeout in seconds
  • record_class: Custom record class for query results

Usage Example:

import asyncpg

async def example():
    conn = await asyncpg.connect('postgresql://user:pass@localhost/db')
    
    # Create a cursor for a large result set
    cursor_factory = conn.cursor(
        'SELECT * FROM large_table WHERE status = $1',
        'active',
        prefetch=100
    )
    
    # Iterate through results
    async for row in cursor_factory:
        process_row(row)
    
    await conn.close()

Async Context Manager

Use cursors as async context managers for automatic resource cleanup.

async def __aenter__(self) -> Cursor
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

Usage Example:

async with conn.cursor('SELECT * FROM users ORDER BY id') as cursor:
    async for row in cursor:
        print(f"User: {row['name']} ({row['email']})")

Async Iterator Interface

Cursors implement the async iterator protocol for seamless integration with async for loops.

def __aiter__(self) -> CursorIterator
async def __anext__(self) -> Record

Usage Example:

cursor = conn.cursor('SELECT * FROM products WHERE category = $1', 'electronics')

async for product in cursor:
    if product['price'] > 100:
        print(f"Premium product: {product['name']}")
    
    # Can break early to save resources
    if product['id'] > 1000:
        break

Cursor Control

Control cursor behavior and retrieve cursor state information.

async def forward(self, count: int) -> typing.List[Record]
async def backwards(self, count: int) -> typing.List[Record]
def get_prefetch_size(self) -> int
def get_query(self) -> str
def get_args(self) -> typing.Tuple

Usage Example:

cursor = await conn.cursor('SELECT * FROM logs ORDER BY timestamp DESC')

# Move forward by specific amounts
recent_logs = await cursor.forward(10)
more_logs = await cursor.forward(20)

# Check cursor state
print(f"Prefetch size: {cursor.get_prefetch_size()}")
print(f"Query: {cursor.get_query()}")

Types

class CursorFactory:
    """Factory for creating cursors with async iteration support."""
    
    def __aiter__(self) -> CursorIterator
    def __await__(self) -> Cursor
    async def __aenter__(self) -> Cursor
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

class Cursor:
    """A cursor for iterating through query results."""
    
    async def forward(self, count: int) -> typing.List[Record]
    async def backwards(self, count: int) -> typing.List[Record]
    def get_prefetch_size(self) -> int
    def get_query(self) -> str
    def get_args(self) -> typing.Tuple
    
    async def __aenter__(self) -> Cursor
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None

class CursorIterator:
    """Async iterator for cursor results."""
    
    def __aiter__(self) -> CursorIterator
    async def __anext__(self) -> Record

Memory Management

Cursors provide efficient memory usage for large result sets:

Prefetch Configuration

Configure how many rows are fetched at once to balance memory usage and network efficiency:

# Small prefetch for memory-constrained environments
cursor = conn.cursor('SELECT * FROM huge_table', prefetch=10)

# Large prefetch for high-throughput scenarios
cursor = conn.cursor('SELECT * FROM logs', prefetch=1000)

# Default prefetch (typically 50 rows)
cursor = conn.cursor('SELECT * FROM users')

Streaming Processing

Process results without loading entire result set into memory:

async def process_large_dataset():
    async with conn.cursor('SELECT * FROM analytics_data') as cursor:
        batch = []
        
        async for row in cursor:
            batch.append(row)
            
            # Process in batches to control memory usage
            if len(batch) >= 100:
                await process_batch(batch)
                batch.clear()
        
        # Process remaining rows
        if batch:
            await process_batch(batch)

Performance Considerations

  1. Prefetch Size: Balance between memory usage and network round-trips
  2. Early Termination: Break from iteration loops when possible to free resources
  3. Context Management: Use async context managers for automatic cleanup
  4. Batch Processing: Process results in batches for better memory efficiency
  5. Connection Pooling: Cursors hold connections, so use pools appropriately

Use Cases

  • Large Reports: Generate reports from large datasets without memory exhaustion
  • Data Migration: Stream data between systems efficiently
  • ETL Operations: Transform data in streaming fashion
  • Search Results: Paginate through search results efficiently
  • Log Analysis: Process log files that exceed available memory

Best Practices

  1. Always Use Context Managers: Ensure proper resource cleanup
  2. Configure Prefetch Appropriately: Match prefetch size to your use case
  3. Process Incrementally: Avoid accumulating all results in memory
  4. Handle Exceptions: Proper error handling prevents resource leaks
  5. Monitor Memory Usage: Tune prefetch size based on memory constraints

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json