CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-asyncpg

An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

copy-operations.mddocs/

COPY Operations

High-performance bulk data import/export using PostgreSQL's native COPY protocol with support for various formats, streaming, custom delimiters, and efficient handling of large datasets.

Capabilities

Copy From Table

Export table data to files or file-like objects with comprehensive formatting options.

async def copy_from_table(
    self,
    table_name: str,
    *,
    output,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str
    """
    Copy table contents to a file or file-like object.
    
    Parameters:
    table_name: Name of the table to copy from
    output: File path (str) or file-like object to write to
    columns: List of column names to copy (default: all columns)
    schema_name: Schema name (default: current schema)
    timeout: Operation timeout in seconds
    format: Output format ('text', 'csv', 'binary')
    oids: Include object IDs
    delimiter: Field delimiter character
    null: String representing NULL values
    header: Include header row (CSV format)
    quote: Quote character (CSV format)
    escape: Escape character (CSV format)  
    force_quote: List of columns to always quote
    encoding: Character encoding
    
    Returns:
    COPY command status string
    """

Example Usage

# Copy table to file
with open('/tmp/users.csv', 'w') as f:
    await conn.copy_from_table('users', output=f, format='csv', header=True)

# Copy specific columns
await conn.copy_from_table(
    'orders', 
    output='/tmp/orders.csv',
    columns=['id', 'customer_id', 'total', 'created_at'],
    format='csv',
    header=True
)

# Copy with custom formatting
import io
buffer = io.StringIO()
await conn.copy_from_table(
    'products',
    output=buffer,
    format='csv',
    delimiter='|',
    null='NULL',
    quote='"',
    force_quote=['name', 'description']
)
csv_data = buffer.getvalue()

# Binary format for maximum performance
with open('/tmp/users.bin', 'wb') as f:
    await conn.copy_from_table('users', output=f, format='binary')

Copy From Query

Export query results to files or file-like objects, enabling complex data transformations during export.

async def copy_from_query(
    self,
    query: str,
    *args,
    output,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    encoding: str = None
) -> str
    """
    Copy the results of a query to a file or file-like object.
    
    Parameters:
    query: SQL query to execute
    *args: Query parameters
    output: File path (str) or file-like object to write to
    timeout: Operation timeout in seconds
    format: Output format ('text', 'csv', 'binary')
    oids: Include object IDs
    delimiter: Field delimiter character
    null: String representing NULL values
    header: Include header row (CSV format)
    quote: Quote character (CSV format)
    escape: Escape character (CSV format)
    force_quote: List of columns to always quote
    encoding: Character encoding
    
    Returns:
    COPY command status string
    """

Example Usage

# Export query results
with open('/tmp/monthly_report.csv', 'w') as f:
    await conn.copy_from_query(
        """
        SELECT u.name, u.email, COUNT(o.id) as order_count, SUM(o.total) as total_spent
        FROM users u 
        LEFT JOIN orders o ON u.id = o.customer_id 
        WHERE o.created_at >= $1 AND o.created_at < $2
        GROUP BY u.id, u.name, u.email
        ORDER BY total_spent DESC
        """,
        start_date, end_date,
        output=f,
        format='csv',
        header=True
    )

# Stream large result set
import asyncio

async def stream_query_results(query, output_file):
    with open(output_file, 'w') as f:
        await conn.copy_from_query(
            query,
            output=f,
            format='csv',
            header=True
        )

# Export with JSON aggregation
await conn.copy_from_query(
    """
    SELECT customer_id, 
           json_agg(json_build_object('id', id, 'total', total, 'date', created_at)) as orders
    FROM orders 
    WHERE created_at >= $1 
    GROUP BY customer_id
    """,
    datetime.now() - timedelta(days=30),
    output='/tmp/customer_orders.csv',
    format='csv'
)

Copy To Table

Import data from files or file-like objects into database tables with comprehensive parsing options.

async def copy_to_table(
    self,
    table_name: str,
    *,
    source,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    format: str = None,
    oids: bool = None,
    freeze: bool = None,
    delimiter: str = None,
    null: str = None,
    header: bool = None,
    quote: str = None,
    escape: str = None,
    force_quote: typing.Union[bool, typing.List[str]] = None,
    force_not_null: typing.List[str] = None,
    force_null: typing.List[str] = None,
    encoding: str = None,
    where: str = None
) -> str
    """
    Copy data from a file or file-like object to the specified table.
    
    Parameters:
    table_name: Target table name
    source: File path (str) or file-like object to read from
    columns: List of target column names
    schema_name: Schema name (default: current schema)
    timeout: Operation timeout in seconds
    format: Input format ('text', 'csv', 'binary')
    oids: Expect object IDs in input
    freeze: Freeze imported rows (performance optimization)
    delimiter: Field delimiter character
    null: String representing NULL values
    header: Skip header row (CSV format)
    quote: Quote character (CSV format)
    escape: Escape character (CSV format)
    force_quote: List of columns that are quoted
    force_not_null: List of columns that should never be NULL
    force_null: List of columns that should be NULL if empty
    encoding: Character encoding
    where: WHERE clause for filtering during import
    
    Returns:
    COPY command status string
    """

Example Usage

# Import CSV file
with open('/tmp/users.csv', 'r') as f:
    result = await conn.copy_to_table(
        'users',
        source=f,
        format='csv',
        header=True,
        columns=['name', 'email', 'age']
    )
print(result)  # "COPY 1000"

# Import with custom delimiter
await conn.copy_to_table(
    'products',
    source='/tmp/products.txt',
    format='text',
    delimiter='|',
    null='\\N',
    columns=['sku', 'name', 'price', 'category']
)

# Import with data transformation using WHERE clause
await conn.copy_to_table(
    'orders',
    source=data_file,
    format='csv',
    header=True,
    where="total > 0 AND customer_id IS NOT NULL"
)

# Binary import for maximum performance
with open('/tmp/users.bin', 'rb') as f:
    await conn.copy_to_table('users', source=f, format='binary')

Copy Records To Table

Import Python data structures directly to database tables using optimized binary COPY protocol.

async def copy_records_to_table(
    self,
    table_name: str,
    *,
    records,
    columns: typing.List[str] = None,
    schema_name: str = None,
    timeout: float = None,
    where: str = None
) -> str
    """
    Copy a list of records to the specified table using binary COPY.
    
    Parameters:
    table_name: Target table name
    records: Iterable of records (tuples, lists, or dicts)
    columns: List of target column names
    schema_name: Schema name (default: current schema)
    timeout: Operation timeout in seconds
    where: WHERE clause for filtering during import
    
    Returns:
    COPY command status string
    """

Example Usage

# Import list of tuples
users = [
    ("Alice", "alice@example.com", 25),
    ("Bob", "bob@example.com", 30),
    ("Charlie", "charlie@example.com", 35)
]

result = await conn.copy_records_to_table(
    'users',
    records=users,
    columns=['name', 'email', 'age']
)

# Import list of dictionaries  
orders = [
    {"customer_id": 1, "total": 99.99, "status": "pending"},
    {"customer_id": 2, "total": 149.50, "status": "shipped"},
    {"customer_id": 3, "total": 75.25, "status": "delivered"}
]

await conn.copy_records_to_table(
    'orders',
    records=orders,
    columns=['customer_id', 'total', 'status']
)

# Stream large datasets
async def generate_records():
    for i in range(1000000):
        yield (f"user_{i}", f"user_{i}@example.com", random.randint(18, 80))

await conn.copy_records_to_table(
    'users',
    records=generate_records(),
    columns=['name', 'email', 'age']
)

Streaming COPY Operations

Handle large datasets efficiently with streaming and memory-conscious processing.

import asyncio
from asyncio import StreamWriter, StreamReader

async def stream_csv_to_table(file_path: str, table_name: str):
    """Stream large CSV file to database without loading into memory."""
    
    with open(file_path, 'r') as f:
        # Skip header
        header = f.readline()
        
        # Process in chunks
        chunk_size = 10000
        records = []
        
        for line_num, line in enumerate(f, 1):
            # Parse CSV line
            values = line.strip().split(',')
            records.append(values)
            
            # Insert chunk when full
            if len(records) >= chunk_size:
                await conn.copy_records_to_table(
                    table_name,
                    records=records,
                    columns=['col1', 'col2', 'col3']
                )
                records = []
                print(f"Processed {line_num} lines")
        
        # Insert remaining records
        if records:
            await conn.copy_records_to_table(
                table_name,
                records=records,
                columns=['col1', 'col2', 'col3'] 
            )

# Async generator for streaming
async def async_record_generator():
    """Generate records asynchronously for streaming import."""
    for i in range(1000000):
        # Simulate async data generation/fetching
        if i % 1000 == 0:
            await asyncio.sleep(0.01)  # Yield control
        
        yield {
            'id': i,
            'name': f'record_{i}',
            'timestamp': datetime.now(),
            'data': json.dumps({'value': random.random()})
        }

# Use async generator
await conn.copy_records_to_table(
    'streaming_data',
    records=async_record_generator(),
    columns=['id', 'name', 'timestamp', 'data']
)

Error Handling

Handle COPY-specific errors and data validation issues.

try:
    await conn.copy_to_table(
        'users',
        source='/tmp/users.csv',
        format='csv',
        header=True
    )
except asyncpg.DataError as e:
    print(f"Data format error: {e}")
    # Handle malformed data
except asyncpg.UniqueViolationError:
    print("Duplicate key violation during import")
    # Handle constraint violations
except FileNotFoundError:
    print("Source file not found")
except asyncpg.UndefinedTableError:
    print("Target table does not exist")

# Validate data before import
def validate_records(records):
    """Validate records before COPY operation."""
    valid_records = []
    errors = []
    
    for i, record in enumerate(records):
        try:
            # Validate email format
            if '@' not in record.get('email', ''):
                raise ValueError("Invalid email format")
            
            # Validate required fields
            if not record.get('name'):
                raise ValueError("Name is required")
                
            valid_records.append(record)
            
        except ValueError as e:
            errors.append(f"Record {i}: {e}")
    
    return valid_records, errors

# Use validation
records, errors = validate_records(input_records)
if errors:
    print(f"Found {len(errors)} validation errors")
    for error in errors[:10]:  # Show first 10 errors
        print(f"  {error}")

if records:
    await conn.copy_records_to_table('users', records=records)

Performance Optimization

Optimize COPY operations for maximum throughput and efficiency.

# Use binary format for best performance
await conn.copy_records_to_table(
    'large_table',
    records=records,
    columns=columns
    # Binary format is used automatically for copy_records_to_table
)

# Batch processing for memory efficiency
async def batch_import(records, table_name, batch_size=10000):
    """Import large datasets in batches."""
    total_imported = 0
    
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        
        result = await conn.copy_records_to_table(
            table_name,
            records=batch,
            columns=['col1', 'col2', 'col3']
        )
        
        # Parse result to get row count
        rows_imported = int(result.split()[-1])
        total_imported += rows_imported
        
        print(f"Imported {rows_imported} rows (total: {total_imported})")
    
    return total_imported

# Disable autocommit for large imports (use transactions)
async with conn.transaction():
    await conn.copy_records_to_table(
        'huge_table',
        records=million_records,
        columns=columns
    )
    # Single commit after all data is imported

Types

# COPY operations work with various data types:

# File-like objects
import io
from typing import Union, TextIO, BinaryIO

FileSource = Union[str, TextIO, BinaryIO, io.StringIO, io.BytesIO]

# Record types for copy_records_to_table
Record = Union[tuple, list, dict]
Records = Union[list[Record], typing.Iterator[Record], typing.AsyncIterator[Record]]

Install with Tessl CLI

npx tessl i tessl/pypi-asyncpg

docs

connection-management.md

connection-pooling.md

copy-operations.md

cursor-operations.md

exception-handling.md

index.md

listeners-notifications.md

prepared-statements.md

query-execution.md

transaction-management.md

type-system.md

tile.json