PostgreSQL driver and tools library providing PG-API and DB-API 2.0 interfaces for Python.
—
Advanced PostgreSQL features including COPY operations, LISTEN/NOTIFY, advisory locks, and streaming results for high-performance database applications.
High-performance bulk data import/export using PostgreSQL's COPY protocol for efficient data transfer.
class CopyManager:
"""
Manager for PostgreSQL COPY operations providing bulk data import/export.
"""
def load_rows(statement, rows):
"""
Load rows using COPY FROM for bulk insert operations.
Parameters:
- statement (str): COPY FROM statement
- rows (iterable): Rows to insert (tuples or lists)
Returns:
int: Number of rows loaded
Raises:
CopyError: If COPY operation fails
"""
def dump_rows(statement):
"""
Dump rows using COPY TO for bulk export operations.
Parameters:
- statement (str): COPY TO statement
Returns:
Iterator: Iterator over exported row data
Raises:
CopyError: If COPY operation fails
"""
def load_file(statement, file_path):
"""
Load data from file using COPY FROM.
Parameters:
- statement (str): COPY FROM statement
- file_path (str): Path to source file
Returns:
int: Number of rows loaded
"""
def dump_file(statement, file_path):
"""
Dump data to file using COPY TO.
Parameters:
- statement (str): COPY TO statement
- file_path (str): Path to destination file
Returns:
int: Number of rows dumped
"""Asynchronous notification system for real-time communication between database sessions.
class NotificationManager:
"""
Manager for PostgreSQL LISTEN/NOTIFY asynchronous messaging.
"""
def listen(channel):
"""
Start listening for notifications on a channel.
Parameters:
- channel (str): Channel name to listen on
Raises:
NotificationError: If listen operation fails
"""
def unlisten(channel=None):
"""
Stop listening for notifications.
Parameters:
- channel (str, optional): Specific channel to unlisten (all if None)
Raises:
NotificationError: If unlisten operation fails
"""
def notify(channel, payload=None):
"""
Send notification to a channel.
Parameters:
- channel (str): Channel name to notify
- payload (str, optional): Optional message payload
Raises:
NotificationError: If notify operation fails
"""
def get_notifications():
"""
Get pending notifications (non-blocking).
Returns:
list: List of notification objects with channel, payload, pid
"""
def wait_for_notification(timeout=None):
"""
Wait for next notification (blocking).
Parameters:
- timeout (float, optional): Timeout in seconds (infinite if None)
Returns:
dict or None: Notification object or None if timeout
"""
class Notification:
"""Notification message from PostgreSQL."""
@property
def channel():
"""Channel name that received the notification."""
@property
def payload():
"""Optional payload data."""
@property
def pid():
"""Process ID of the notifying backend."""PostgreSQL advisory locks for application-level synchronization and coordination.
class ALock:
"""
Base class for PostgreSQL advisory locks.
"""
def acquire(blocking=True):
"""
Acquire the advisory lock.
Parameters:
- blocking (bool): Whether to block until lock is available
Returns:
bool: True if lock acquired, False if non-blocking and unavailable
Raises:
LockError: If lock acquisition fails
"""
def release():
"""
Release the advisory lock.
Raises:
LockError: If lock release fails
"""
def __enter__():
"""Context manager entry - acquire lock."""
def __exit__(exc_type, exc_val, exc_tb):
"""Context manager exit - release lock."""
@property
def is_held():
"""
Check if lock is currently held by this session.
Returns:
bool: True if lock is held
"""
class ExclusiveLock(ALock):
"""
Exclusive advisory lock - only one session can hold it.
"""
def __init__(lock_id):
"""
Create exclusive advisory lock.
Parameters:
- lock_id (int or tuple): Lock identifier (int or pair of ints)
"""
class ShareLock(ALock):
"""
Shared advisory lock - multiple sessions can hold it simultaneously.
"""
def __init__(lock_id):
"""
Create shared advisory lock.
Parameters:
- lock_id (int or tuple): Lock identifier (int or pair of ints)
"""Interfaces for streaming large result sets without loading all data into memory.
class ResultStream:
"""
Streaming interface for large query results.
"""
def __iter__():
"""Iterate over result rows."""
def __next__():
"""Get next result row."""
def close():
"""Close the result stream."""
@property
def description():
"""
Get column description information.
Returns:
list: Column metadata
"""
def stream_query(connection, query, *parameters):
"""
Execute query and return streaming result interface.
Parameters:
- connection: Database connection
- query (str): SQL query to execute
- *parameters: Query parameters
Returns:
ResultStream: Streaming result interface
"""Utilities for managing connection pools and connection lifecycle.
class ConnectionPool:
"""
Connection pool for managing database connections.
"""
def __init__(connector, min_size=1, max_size=10):
"""
Create connection pool.
Parameters:
- connector: Connection factory
- min_size (int): Minimum pool size
- max_size (int): Maximum pool size
"""
def get_connection():
"""
Get connection from pool.
Returns:
Connection: Database connection from pool
"""
def return_connection(connection):
"""
Return connection to pool.
Parameters:
- connection: Connection to return
"""
def close_all():
"""Close all connections in pool."""
@property
def size():
"""Current pool size."""
@property
def available():
"""Number of available connections."""Interface for calling PostgreSQL stored procedures and functions with parameter binding and result handling.
class StoredProcedure:
"""
Interface for calling PostgreSQL stored procedures and functions.
"""
def __call__(*args, **kw):
"""
Execute the stored procedure with provided arguments.
Parameters:
- *args: Positional parameters for the procedure
- **kw: Keyword parameters for the procedure
Returns:
Procedure result (varies by procedure type)
Raises:
ProcedureError: If procedure execution fails
"""
@property
def name():
"""
Get procedure name.
Returns:
str: Fully qualified procedure name
"""
@property
def parameter_types():
"""
Get parameter type information.
Returns:
List[int]: PostgreSQL type OIDs for parameters
"""
@property
def return_type():
"""
Get return type information.
Returns:
int: PostgreSQL type OID for return value
"""import postgresql
import postgresql.copyman as copy_manager
import csv
db = postgresql.open('pq://user:pass@localhost/mydb')
# Create test table
db.execute("""
CREATE TABLE IF NOT EXISTS bulk_data (
id SERIAL PRIMARY KEY,
name TEXT,
value NUMERIC,
created_date DATE
)
""")
# Bulk insert using COPY
def bulk_insert_with_copy():
"""Bulk insert data using COPY for high performance."""
# Prepare data
data_rows = []
for i in range(10000):
data_rows.append((f"Item {i}", i * 1.5, '2023-01-01'))
# Use COPY for bulk insert
copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV)"
copy_mgr = copy_manager.CopyManager(db)
rows_loaded = copy_mgr.load_rows(copy_stmt, data_rows)
print(f"Loaded {rows_loaded} rows using COPY")
# Bulk export using COPY
def bulk_export_with_copy():
"""Export data using COPY for high performance."""
copy_stmt = "COPY bulk_data TO STDOUT WITH (FORMAT CSV, HEADER)"
copy_mgr = copy_manager.CopyManager(db)
# Export to file
copy_mgr.dump_file(copy_stmt, "/tmp/exported_data.csv")
print("Data exported to /tmp/exported_data.csv")
# Or stream export data
row_count = 0
for row_data in copy_mgr.dump_rows(copy_stmt):
row_count += 1
if row_count <= 5: # Show first 5 rows
print(f"Exported row: {row_data}")
print(f"Total rows exported: {row_count}")
# Import from CSV file
def import_csv_file():
"""Import data from CSV file using COPY."""
# Create CSV file
with open('/tmp/import_data.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['name', 'value', 'created_date']) # Header
for i in range(5000):
writer.writerow([f"CSV Item {i}", i * 2.0, '2023-06-01'])
# Import using COPY
copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV, HEADER)"
copy_mgr = copy_manager.CopyManager(db)
rows_loaded = copy_mgr.load_file(copy_stmt, '/tmp/import_data.csv')
print(f"Imported {rows_loaded} rows from CSV file")
# Execute bulk operations
bulk_insert_with_copy()
bulk_export_with_copy()
import_csv_file()import postgresql
import postgresql.notifyman as notify
import threading
import time
# Set up two connections for demonstration
publisher = postgresql.open('pq://user:pass@localhost/mydb')
subscriber = postgresql.open('pq://user:pass@localhost/mydb')
def notification_publisher():
"""Publish notifications to channels."""
notifier = notify.NotificationManager(publisher)
for i in range(10):
# Send notifications to different channels
notifier.notify('events', f'Event {i}: Something happened')
notifier.notify('alerts', f'Alert {i}: Check system status')
print(f"Published notification {i}")
time.sleep(2)
# Send termination signal
notifier.notify('events', 'TERMINATE')
def notification_subscriber():
"""Subscribe to notifications and process them."""
listener = notify.NotificationManager(subscriber)
# Listen to multiple channels
listener.listen('events')
listener.listen('alerts')
print("Listening for notifications...")
while True:
# Wait for notifications (blocking)
notification = listener.wait_for_notification(timeout=30)
if notification:
channel = notification.channel
payload = notification.payload
sender_pid = notification.pid
print(f"Received on '{channel}': {payload} (from PID {sender_pid})")
# Check for termination signal
if payload == 'TERMINATE':
print("Termination signal received, stopping listener")
break
else:
print("Notification timeout")
break
# Clean up
listener.unlisten() # Unlisten from all channels
# Run publisher and subscriber in separate threads
subscriber_thread = threading.Thread(target=notification_subscriber)
publisher_thread = threading.Thread(target=notification_publisher)
subscriber_thread.start()
time.sleep(1) # Let subscriber start first
publisher_thread.start()
# Wait for both threads to complete
subscriber_thread.join()
publisher_thread.join()
publisher.close()
subscriber.close()import postgresql
import postgresql.alock as advisory_locks
import threading
import time
db1 = postgresql.open('pq://user:pass@localhost/mydb')
db2 = postgresql.open('pq://user:pass@localhost/mydb')
def exclusive_lock_example():
"""Demonstrate exclusive advisory locks."""
def worker(worker_id, connection):
lock = advisory_locks.ExclusiveLock(12345) # Lock ID
lock.connection = connection
print(f"Worker {worker_id}: Attempting to acquire exclusive lock")
# Try to acquire lock (blocking)
if lock.acquire():
print(f"Worker {worker_id}: Acquired exclusive lock")
# Simulate work
time.sleep(3)
print(f"Worker {worker_id}: Releasing exclusive lock")
lock.release()
else:
print(f"Worker {worker_id}: Failed to acquire lock")
# Start two workers competing for the same lock
thread1 = threading.Thread(target=worker, args=(1, db1))
thread2 = threading.Thread(target=worker, args=(2, db2))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def shared_lock_example():
"""Demonstrate shared advisory locks."""
def reader(reader_id, connection):
lock = advisory_locks.ShareLock(54321) # Shared lock ID
lock.connection = connection
print(f"Reader {reader_id}: Acquiring shared lock")
with lock: # Context manager automatically acquires/releases
print(f"Reader {reader_id}: Reading data (shared access)")
time.sleep(2)
print(f"Reader {reader_id}: Finished reading")
def writer(connection):
lock = advisory_locks.ExclusiveLock(54321) # Same ID as readers
lock.connection = connection
print("Writer: Waiting for exclusive access")
with lock:
print("Writer: Writing data (exclusive access)")
time.sleep(3)
print("Writer: Finished writing")
# Start multiple readers and one writer
reader_threads = []
for i in range(3):
thread = threading.Thread(target=reader, args=(i+1, db1))
reader_threads.append(thread)
thread.start()
time.sleep(1) # Let readers start first
writer_thread = threading.Thread(target=writer, args=(db2,))
writer_thread.start()
# Wait for all threads
for thread in reader_threads:
thread.join()
writer_thread.join()
def distributed_counter_example():
"""Implement distributed counter using advisory locks."""
# Create counter table
db1.execute("""
CREATE TABLE IF NOT EXISTS distributed_counter (
name TEXT PRIMARY KEY,
value INTEGER DEFAULT 0
)
""")
# Initialize counter
db1.execute("INSERT INTO distributed_counter (name, value) VALUES ('global', 0) ON CONFLICT (name) DO NOTHING")
def increment_counter(worker_id, connection, increments):
for i in range(increments):
# Use lock to ensure atomic counter increment
lock = advisory_locks.ExclusiveLock(99999) # Counter lock ID
lock.connection = connection
with lock:
# Read current value
current = connection.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
# Increment
new_value = current + 1
connection.execute("UPDATE distributed_counter SET value = $1 WHERE name = 'global'", new_value)
print(f"Worker {worker_id}: Incremented counter to {new_value}")
time.sleep(0.1) # Small delay between increments
# Start multiple workers incrementing the counter
workers = []
for i in range(3):
connection = postgresql.open('pq://user:pass@localhost/mydb')
thread = threading.Thread(target=increment_counter, args=(i+1, connection, 5))
workers.append((thread, connection))
thread.start()
# Wait for all workers
for thread, connection in workers:
thread.join()
connection.close()
# Check final counter value
final_value = db1.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']
print(f"Final counter value: {final_value}")
print("=== Exclusive Lock Example ===")
exclusive_lock_example()
print("\n=== Shared Lock Example ===")
shared_lock_example()
print("\n=== Distributed Counter Example ===")
distributed_counter_example()
db1.close()
db2.close()import postgresql
import time
db = postgresql.open('pq://user:pass@localhost/mydb')
# Create large test dataset
db.execute("""
CREATE TABLE IF NOT EXISTS large_dataset AS
SELECT
generate_series(1, 1000000) as id,
'Item ' || generate_series(1, 1000000) as name,
random() * 1000 as value,
NOW() - (random() * interval '365 days') as created_at
""")
def stream_large_results():
"""Stream large result sets to avoid memory issues."""
# Prepare streaming query
query = db.prepare("""
SELECT id, name, value, created_at
FROM large_dataset
WHERE value > $1
ORDER BY value DESC
""")
print("Starting streaming query...")
start_time = time.time()
# Stream results instead of loading all into memory
row_count = 0
total_value = 0
for row in query.rows(500): # Stream rows where value > 500
row_count += 1
total_value += row['value']
# Process row (show first 10)
if row_count <= 10:
print(f"Row {row_count}: ID={row['id']}, Name={row['name']}, Value={row['value']:.2f}")
elif row_count % 10000 == 0:
print(f"Processed {row_count} rows...")
end_time = time.time()
avg_value = total_value / row_count if row_count > 0 else 0
print(f"Streaming complete:")
print(f" Rows processed: {row_count}")
print(f" Average value: {avg_value:.2f}")
print(f" Time taken: {end_time - start_time:.2f} seconds")
def chunk_processing():
"""Process large datasets in chunks."""
query = db.prepare("SELECT * FROM large_dataset ORDER BY id")
print("Starting chunk processing...")
chunk_size = 0
chunk_count = 0
total_processed = 0
# Process data in chunks
for chunk in query.chunks():
chunk_count += 1
chunk_size = len(chunk)
total_processed += chunk_size
# Process chunk
chunk_sum = sum(row['value'] for row in chunk)
chunk_avg = chunk_sum / chunk_size
print(f"Chunk {chunk_count}: {chunk_size} rows, avg value: {chunk_avg:.2f}")
# Simulate processing time
time.sleep(0.1)
# Limit for demonstration
if chunk_count >= 10:
break
print(f"Chunk processing complete - {total_processed} rows in {chunk_count} chunks")
def memory_efficient_aggregation():
"""Perform aggregations on large datasets without loading all data."""
query = db.prepare("""
SELECT
EXTRACT(month FROM created_at) as month,
id, value
FROM large_dataset
ORDER BY created_at
""")
# Track monthly statistics
monthly_stats = {}
print("Computing monthly statistics...")
for row in query.rows():
month = int(row['month'])
value = row['value']
if month not in monthly_stats:
monthly_stats[month] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}
stats = monthly_stats[month]
stats['count'] += 1
stats['sum'] += value
stats['min'] = min(stats['min'], value)
stats['max'] = max(stats['max'], value)
# Display results
print("Monthly statistics:")
for month in sorted(monthly_stats.keys()):
stats = monthly_stats[month]
avg = stats['sum'] / stats['count']
print(f" Month {month:2d}: {stats['count']:6d} rows, "
f"avg: {avg:6.2f}, min: {stats['min']:6.2f}, max: {stats['max']:6.2f}")
# Run streaming examples
stream_large_results()
print("\n" + "="*50 + "\n")
chunk_processing()
print("\n" + "="*50 + "\n")
memory_efficient_aggregation()
db.close()import postgresql
import postgresql.pool as connection_pool
import threading
import time
import random
def connection_pool_example():
"""Demonstrate connection pooling for concurrent access."""
# Create connection factory
connector = postgresql.open('&pq://user:pass@localhost/mydb')
# Create connection pool
pool = connection_pool.ConnectionPool(
connector,
min_size=2,
max_size=5
)
def worker(worker_id, num_operations):
"""Worker function that uses pooled connections."""
for i in range(num_operations):
# Get connection from pool
conn = pool.get_connection()
try:
# Simulate database work
result = conn.query("SELECT $1 as worker_id, $2 as operation, NOW() as timestamp",
worker_id, i)
print(f"Worker {worker_id}, Op {i}: {result[0]['timestamp']}")
# Simulate processing time
time.sleep(random.uniform(0.1, 0.5))
finally:
# Always return connection to pool
pool.return_connection(conn)
# Small delay between operations
time.sleep(0.1)
print(f"Starting connection pool with {pool.size} connections")
# Start multiple workers
workers = []
for i in range(8): # More workers than pool size
thread = threading.Thread(target=worker, args=(i+1, 3))
workers.append(thread)
thread.start()
# Monitor pool status
def monitor_pool():
for _ in range(10):
print(f"Pool status - Size: {pool.size}, Available: {pool.available}")
time.sleep(1)
monitor_thread = threading.Thread(target=monitor_pool)
monitor_thread.start()
# Wait for all workers to complete
for thread in workers:
thread.join()
monitor_thread.join()
# Clean up
pool.close_all()
print("Connection pool closed")
# Run connection pool example
connection_pool_example()Install with Tessl CLI
npx tessl i tessl/pypi-py-postgresql