An asyncio PostgreSQL driver for high-performance database connectivity with Python async/await syntax
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
High-performance bulk data import/export using PostgreSQL's native COPY protocol with support for various formats, streaming, custom delimiters, and efficient handling of large datasets.
Export table data to files or file-like objects with comprehensive formatting options.
async def copy_from_table(
self,
table_name: str,
*,
output,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
format: str = None,
oids: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
encoding: str = None
) -> str
"""
Copy table contents to a file or file-like object.
Parameters:
table_name: Name of the table to copy from
output: File path (str) or file-like object to write to
columns: List of column names to copy (default: all columns)
schema_name: Schema name (default: current schema)
timeout: Operation timeout in seconds
format: Output format ('text', 'csv', 'binary')
oids: Include object IDs
delimiter: Field delimiter character
null: String representing NULL values
header: Include header row (CSV format)
quote: Quote character (CSV format)
escape: Escape character (CSV format)
force_quote: List of columns to always quote
encoding: Character encoding
Returns:
COPY command status string
"""# Copy table to file
with open('/tmp/users.csv', 'w') as f:
await conn.copy_from_table('users', output=f, format='csv', header=True)
# Copy specific columns
await conn.copy_from_table(
'orders',
output='/tmp/orders.csv',
columns=['id', 'customer_id', 'total', 'created_at'],
format='csv',
header=True
)
# Copy with custom formatting
import io
buffer = io.StringIO()
await conn.copy_from_table(
'products',
output=buffer,
format='csv',
delimiter='|',
null='NULL',
quote='"',
force_quote=['name', 'description']
)
csv_data = buffer.getvalue()
# Binary format for maximum performance
with open('/tmp/users.bin', 'wb') as f:
await conn.copy_from_table('users', output=f, format='binary')Export query results to files or file-like objects, enabling complex data transformations during export.
async def copy_from_query(
self,
query: str,
*args,
output,
timeout: float = None,
format: str = None,
oids: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
encoding: str = None
) -> str
"""
Copy the results of a query to a file or file-like object.
Parameters:
query: SQL query to execute
*args: Query parameters
output: File path (str) or file-like object to write to
timeout: Operation timeout in seconds
format: Output format ('text', 'csv', 'binary')
oids: Include object IDs
delimiter: Field delimiter character
null: String representing NULL values
header: Include header row (CSV format)
quote: Quote character (CSV format)
escape: Escape character (CSV format)
force_quote: List of columns to always quote
encoding: Character encoding
Returns:
COPY command status string
"""# Export query results
with open('/tmp/monthly_report.csv', 'w') as f:
await conn.copy_from_query(
"""
SELECT u.name, u.email, COUNT(o.id) as order_count, SUM(o.total) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.customer_id
WHERE o.created_at >= $1 AND o.created_at < $2
GROUP BY u.id, u.name, u.email
ORDER BY total_spent DESC
""",
start_date, end_date,
output=f,
format='csv',
header=True
)
# Stream large result set
import asyncio
async def stream_query_results(query, output_file):
with open(output_file, 'w') as f:
await conn.copy_from_query(
query,
output=f,
format='csv',
header=True
)
# Export with JSON aggregation
await conn.copy_from_query(
"""
SELECT customer_id,
json_agg(json_build_object('id', id, 'total', total, 'date', created_at)) as orders
FROM orders
WHERE created_at >= $1
GROUP BY customer_id
""",
datetime.now() - timedelta(days=30),
output='/tmp/customer_orders.csv',
format='csv'
)Import data from files or file-like objects into database tables with comprehensive parsing options.
async def copy_to_table(
self,
table_name: str,
*,
source,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
format: str = None,
oids: bool = None,
freeze: bool = None,
delimiter: str = None,
null: str = None,
header: bool = None,
quote: str = None,
escape: str = None,
force_quote: typing.Union[bool, typing.List[str]] = None,
force_not_null: typing.List[str] = None,
force_null: typing.List[str] = None,
encoding: str = None,
where: str = None
) -> str
"""
Copy data from a file or file-like object to the specified table.
Parameters:
table_name: Target table name
source: File path (str) or file-like object to read from
columns: List of target column names
schema_name: Schema name (default: current schema)
timeout: Operation timeout in seconds
format: Input format ('text', 'csv', 'binary')
oids: Expect object IDs in input
freeze: Freeze imported rows (performance optimization)
delimiter: Field delimiter character
null: String representing NULL values
header: Skip header row (CSV format)
quote: Quote character (CSV format)
escape: Escape character (CSV format)
force_quote: List of columns that are quoted
force_not_null: List of columns that should never be NULL
force_null: List of columns that should be NULL if empty
encoding: Character encoding
where: WHERE clause for filtering during import
Returns:
COPY command status string
"""# Import CSV file
with open('/tmp/users.csv', 'r') as f:
result = await conn.copy_to_table(
'users',
source=f,
format='csv',
header=True,
columns=['name', 'email', 'age']
)
print(result) # "COPY 1000"
# Import with custom delimiter
await conn.copy_to_table(
'products',
source='/tmp/products.txt',
format='text',
delimiter='|',
null='\\N',
columns=['sku', 'name', 'price', 'category']
)
# Import with data transformation using WHERE clause
await conn.copy_to_table(
'orders',
source=data_file,
format='csv',
header=True,
where="total > 0 AND customer_id IS NOT NULL"
)
# Binary import for maximum performance
with open('/tmp/users.bin', 'rb') as f:
await conn.copy_to_table('users', source=f, format='binary')Import Python data structures directly to database tables using optimized binary COPY protocol.
async def copy_records_to_table(
self,
table_name: str,
*,
records,
columns: typing.List[str] = None,
schema_name: str = None,
timeout: float = None,
where: str = None
) -> str
"""
Copy a list of records to the specified table using binary COPY.
Parameters:
table_name: Target table name
records: Iterable of records (tuples, lists, or dicts)
columns: List of target column names
schema_name: Schema name (default: current schema)
timeout: Operation timeout in seconds
where: WHERE clause for filtering during import
Returns:
COPY command status string
"""# Import list of tuples
users = [
("Alice", "alice@example.com", 25),
("Bob", "bob@example.com", 30),
("Charlie", "charlie@example.com", 35)
]
result = await conn.copy_records_to_table(
'users',
records=users,
columns=['name', 'email', 'age']
)
# Import list of dictionaries
orders = [
{"customer_id": 1, "total": 99.99, "status": "pending"},
{"customer_id": 2, "total": 149.50, "status": "shipped"},
{"customer_id": 3, "total": 75.25, "status": "delivered"}
]
await conn.copy_records_to_table(
'orders',
records=orders,
columns=['customer_id', 'total', 'status']
)
# Stream large datasets
async def generate_records():
for i in range(1000000):
yield (f"user_{i}", f"user_{i}@example.com", random.randint(18, 80))
await conn.copy_records_to_table(
'users',
records=generate_records(),
columns=['name', 'email', 'age']
)Handle large datasets efficiently with streaming and memory-conscious processing.
import asyncio
from asyncio import StreamWriter, StreamReader
async def stream_csv_to_table(file_path: str, table_name: str):
"""Stream large CSV file to database without loading into memory."""
with open(file_path, 'r') as f:
# Skip header
header = f.readline()
# Process in chunks
chunk_size = 10000
records = []
for line_num, line in enumerate(f, 1):
# Parse CSV line
values = line.strip().split(',')
records.append(values)
# Insert chunk when full
if len(records) >= chunk_size:
await conn.copy_records_to_table(
table_name,
records=records,
columns=['col1', 'col2', 'col3']
)
records = []
print(f"Processed {line_num} lines")
# Insert remaining records
if records:
await conn.copy_records_to_table(
table_name,
records=records,
columns=['col1', 'col2', 'col3']
)
# Async generator for streaming
async def async_record_generator():
"""Generate records asynchronously for streaming import."""
for i in range(1000000):
# Simulate async data generation/fetching
if i % 1000 == 0:
await asyncio.sleep(0.01) # Yield control
yield {
'id': i,
'name': f'record_{i}',
'timestamp': datetime.now(),
'data': json.dumps({'value': random.random()})
}
# Use async generator
await conn.copy_records_to_table(
'streaming_data',
records=async_record_generator(),
columns=['id', 'name', 'timestamp', 'data']
)Handle COPY-specific errors and data validation issues.
try:
await conn.copy_to_table(
'users',
source='/tmp/users.csv',
format='csv',
header=True
)
except asyncpg.DataError as e:
print(f"Data format error: {e}")
# Handle malformed data
except asyncpg.UniqueViolationError:
print("Duplicate key violation during import")
# Handle constraint violations
except FileNotFoundError:
print("Source file not found")
except asyncpg.UndefinedTableError:
print("Target table does not exist")
# Validate data before import
def validate_records(records):
"""Validate records before COPY operation."""
valid_records = []
errors = []
for i, record in enumerate(records):
try:
# Validate email format
if '@' not in record.get('email', ''):
raise ValueError("Invalid email format")
# Validate required fields
if not record.get('name'):
raise ValueError("Name is required")
valid_records.append(record)
except ValueError as e:
errors.append(f"Record {i}: {e}")
return valid_records, errors
# Use validation
records, errors = validate_records(input_records)
if errors:
print(f"Found {len(errors)} validation errors")
for error in errors[:10]: # Show first 10 errors
print(f" {error}")
if records:
await conn.copy_records_to_table('users', records=records)Optimize COPY operations for maximum throughput and efficiency.
# Use binary format for best performance
await conn.copy_records_to_table(
'large_table',
records=records,
columns=columns
# Binary format is used automatically for copy_records_to_table
)
# Batch processing for memory efficiency
async def batch_import(records, table_name, batch_size=10000):
"""Import large datasets in batches."""
total_imported = 0
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
result = await conn.copy_records_to_table(
table_name,
records=batch,
columns=['col1', 'col2', 'col3']
)
# Parse result to get row count
rows_imported = int(result.split()[-1])
total_imported += rows_imported
print(f"Imported {rows_imported} rows (total: {total_imported})")
return total_imported
# Disable autocommit for large imports (use transactions)
async with conn.transaction():
await conn.copy_records_to_table(
'huge_table',
records=million_records,
columns=columns
)
# Single commit after all data is imported# COPY operations work with various data types:
# File-like objects
import io
from typing import Union, TextIO, BinaryIO
FileSource = Union[str, TextIO, BinaryIO, io.StringIO, io.BytesIO]
# Record types for copy_records_to_table
Record = Union[tuple, list, dict]
Records = Union[list[Record], typing.Iterator[Record], typing.AsyncIterator[Record]]