or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdcluster-management.mdconnection-management.mddbapi-interface.mdexception-handling.mdindex.mdquery-execution.mdtransaction-management.mdtype-system.md

advanced-features.mddocs/

0

# Advanced Features

1

2

Advanced PostgreSQL features including COPY operations, LISTEN/NOTIFY, advisory locks, and streaming results for high-performance database applications.

3

4

## Capabilities

5

6

### COPY Operations

7

8

High-performance bulk data import/export using PostgreSQL's COPY protocol for efficient data transfer.

9

10

```python { .api }

11

class CopyManager:

12

"""

13

Manager for PostgreSQL COPY operations providing bulk data import/export.

14

"""

15

16

def load_rows(statement, rows):

17

"""

18

Load rows using COPY FROM for bulk insert operations.

19

20

Parameters:

21

- statement (str): COPY FROM statement

22

- rows (iterable): Rows to insert (tuples or lists)

23

24

Returns:

25

int: Number of rows loaded

26

27

Raises:

28

CopyError: If COPY operation fails

29

"""

30

31

def dump_rows(statement):

32

"""

33

Dump rows using COPY TO for bulk export operations.

34

35

Parameters:

36

- statement (str): COPY TO statement

37

38

Returns:

39

Iterator: Iterator over exported row data

40

41

Raises:

42

CopyError: If COPY operation fails

43

"""

44

45

def load_file(statement, file_path):

46

"""

47

Load data from file using COPY FROM.

48

49

Parameters:

50

- statement (str): COPY FROM statement

51

- file_path (str): Path to source file

52

53

Returns:

54

int: Number of rows loaded

55

"""

56

57

def dump_file(statement, file_path):

58

"""

59

Dump data to file using COPY TO.

60

61

Parameters:

62

- statement (str): COPY TO statement

63

- file_path (str): Path to destination file

64

65

Returns:

66

int: Number of rows dumped

67

"""

68

```

69

70

### LISTEN/NOTIFY Support

71

72

Asynchronous notification system for real-time communication between database sessions.

73

74

```python { .api }

75

class NotificationManager:

76

"""

77

Manager for PostgreSQL LISTEN/NOTIFY asynchronous messaging.

78

"""

79

80

def listen(channel):

81

"""

82

Start listening for notifications on a channel.

83

84

Parameters:

85

- channel (str): Channel name to listen on

86

87

Raises:

88

NotificationError: If listen operation fails

89

"""

90

91

def unlisten(channel=None):

92

"""

93

Stop listening for notifications.

94

95

Parameters:

96

- channel (str, optional): Specific channel to unlisten (all if None)

97

98

Raises:

99

NotificationError: If unlisten operation fails

100

"""

101

102

def notify(channel, payload=None):

103

"""

104

Send notification to a channel.

105

106

Parameters:

107

- channel (str): Channel name to notify

108

- payload (str, optional): Optional message payload

109

110

Raises:

111

NotificationError: If notify operation fails

112

"""

113

114

def get_notifications():

115

"""

116

Get pending notifications (non-blocking).

117

118

Returns:

119

list: List of notification objects with channel, payload, pid

120

"""

121

122

def wait_for_notification(timeout=None):

123

"""

124

Wait for next notification (blocking).

125

126

Parameters:

127

- timeout (float, optional): Timeout in seconds (infinite if None)

128

129

Returns:

130

dict or None: Notification object or None if timeout

131

"""

132

133

class Notification:

134

"""Notification message from PostgreSQL."""

135

136

@property

137

def channel():

138

"""Channel name that received the notification."""

139

140

@property

141

def payload():

142

"""Optional payload data."""

143

144

@property

145

def pid():

146

"""Process ID of the notifying backend."""

147

```

148

149

### Advisory Locks

150

151

PostgreSQL advisory locks for application-level synchronization and coordination.

152

153

```python { .api }

154

class ALock:

155

"""

156

Base class for PostgreSQL advisory locks.

157

"""

158

159

def acquire(blocking=True):

160

"""

161

Acquire the advisory lock.

162

163

Parameters:

164

- blocking (bool): Whether to block until lock is available

165

166

Returns:

167

bool: True if lock acquired, False if non-blocking and unavailable

168

169

Raises:

170

LockError: If lock acquisition fails

171

"""

172

173

def release():

174

"""

175

Release the advisory lock.

176

177

Raises:

178

LockError: If lock release fails

179

"""

180

181

def __enter__():

182

"""Context manager entry - acquire lock."""

183

184

def __exit__(exc_type, exc_val, exc_tb):

185

"""Context manager exit - release lock."""

186

187

@property

188

def is_held():

189

"""

190

Check if lock is currently held by this session.

191

192

Returns:

193

bool: True if lock is held

194

"""

195

196

class ExclusiveLock(ALock):

197

"""

198

Exclusive advisory lock - only one session can hold it.

199

"""

200

201

def __init__(lock_id):

202

"""

203

Create exclusive advisory lock.

204

205

Parameters:

206

- lock_id (int or tuple): Lock identifier (int or pair of ints)

207

"""

208

209

class ShareLock(ALock):

210

"""

211

Shared advisory lock - multiple sessions can hold it simultaneously.

212

"""

213

214

def __init__(lock_id):

215

"""

216

Create shared advisory lock.

217

218

Parameters:

219

- lock_id (int or tuple): Lock identifier (int or pair of ints)

220

"""

221

```

222

223

### Streaming Results

224

225

Interfaces for streaming large result sets without loading all data into memory.

226

227

```python { .api }

228

class ResultStream:

229

"""

230

Streaming interface for large query results.

231

"""

232

233

def __iter__():

234

"""Iterate over result rows."""

235

236

def __next__():

237

"""Get next result row."""

238

239

def close():

240

"""Close the result stream."""

241

242

@property

243

def description():

244

"""

245

Get column description information.

246

247

Returns:

248

list: Column metadata

249

"""

250

251

def stream_query(connection, query, *parameters):

252

"""

253

Execute query and return streaming result interface.

254

255

Parameters:

256

- connection: Database connection

257

- query (str): SQL query to execute

258

- *parameters: Query parameters

259

260

Returns:

261

ResultStream: Streaming result interface

262

"""

263

```

264

265

### Connection Pooling Utilities

266

267

Utilities for managing connection pools and connection lifecycle.

268

269

```python { .api }

270

class ConnectionPool:

271

"""

272

Connection pool for managing database connections.

273

"""

274

275

def __init__(connector, min_size=1, max_size=10):

276

"""

277

Create connection pool.

278

279

Parameters:

280

- connector: Connection factory

281

- min_size (int): Minimum pool size

282

- max_size (int): Maximum pool size

283

"""

284

285

def get_connection():

286

"""

287

Get connection from pool.

288

289

Returns:

290

Connection: Database connection from pool

291

"""

292

293

def return_connection(connection):

294

"""

295

Return connection to pool.

296

297

Parameters:

298

- connection: Connection to return

299

"""

300

301

def close_all():

302

"""Close all connections in pool."""

303

304

@property

305

def size():

306

"""Current pool size."""

307

308

@property

309

def available():

310

"""Number of available connections."""

311

```

312

313

### Stored Procedure Interface

314

315

Interface for calling PostgreSQL stored procedures and functions with parameter binding and result handling.

316

317

```python { .api }

318

class StoredProcedure:

319

"""

320

Interface for calling PostgreSQL stored procedures and functions.

321

"""

322

323

def __call__(*args, **kw):

324

"""

325

Execute the stored procedure with provided arguments.

326

327

Parameters:

328

- *args: Positional parameters for the procedure

329

- **kw: Keyword parameters for the procedure

330

331

Returns:

332

Procedure result (varies by procedure type)

333

334

Raises:

335

ProcedureError: If procedure execution fails

336

"""

337

338

@property

339

def name():

340

"""

341

Get procedure name.

342

343

Returns:

344

str: Fully qualified procedure name

345

"""

346

347

@property

348

def parameter_types():

349

"""

350

Get parameter type information.

351

352

Returns:

353

List[int]: PostgreSQL type OIDs for parameters

354

"""

355

356

@property

357

def return_type():

358

"""

359

Get return type information.

360

361

Returns:

362

int: PostgreSQL type OID for return value

363

"""

364

```

365

366

## Usage Examples

367

368

### COPY Operations for Bulk Data

369

370

```python

371

import postgresql

372

import postgresql.copyman as copy_manager

373

import csv

374

375

db = postgresql.open('pq://user:pass@localhost/mydb')

376

377

# Create test table

378

db.execute("""

379

CREATE TABLE IF NOT EXISTS bulk_data (

380

id SERIAL PRIMARY KEY,

381

name TEXT,

382

value NUMERIC,

383

created_date DATE

384

)

385

""")

386

387

# Bulk insert using COPY

388

def bulk_insert_with_copy():

389

"""Bulk insert data using COPY for high performance."""

390

391

# Prepare data

392

data_rows = []

393

for i in range(10000):

394

data_rows.append((f"Item {i}", i * 1.5, '2023-01-01'))

395

396

# Use COPY for bulk insert

397

copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV)"

398

399

copy_mgr = copy_manager.CopyManager(db)

400

rows_loaded = copy_mgr.load_rows(copy_stmt, data_rows)

401

402

print(f"Loaded {rows_loaded} rows using COPY")

403

404

# Bulk export using COPY

405

def bulk_export_with_copy():

406

"""Export data using COPY for high performance."""

407

408

copy_stmt = "COPY bulk_data TO STDOUT WITH (FORMAT CSV, HEADER)"

409

410

copy_mgr = copy_manager.CopyManager(db)

411

412

# Export to file

413

copy_mgr.dump_file(copy_stmt, "/tmp/exported_data.csv")

414

print("Data exported to /tmp/exported_data.csv")

415

416

# Or stream export data

417

row_count = 0

418

for row_data in copy_mgr.dump_rows(copy_stmt):

419

row_count += 1

420

if row_count <= 5: # Show first 5 rows

421

print(f"Exported row: {row_data}")

422

423

print(f"Total rows exported: {row_count}")

424

425

# Import from CSV file

426

def import_csv_file():

427

"""Import data from CSV file using COPY."""

428

429

# Create CSV file

430

with open('/tmp/import_data.csv', 'w', newline='') as csvfile:

431

writer = csv.writer(csvfile)

432

writer.writerow(['name', 'value', 'created_date']) # Header

433

for i in range(5000):

434

writer.writerow([f"CSV Item {i}", i * 2.0, '2023-06-01'])

435

436

# Import using COPY

437

copy_stmt = "COPY bulk_data (name, value, created_date) FROM STDIN WITH (FORMAT CSV, HEADER)"

438

439

copy_mgr = copy_manager.CopyManager(db)

440

rows_loaded = copy_mgr.load_file(copy_stmt, '/tmp/import_data.csv')

441

442

print(f"Imported {rows_loaded} rows from CSV file")

443

444

# Execute bulk operations

445

bulk_insert_with_copy()

446

bulk_export_with_copy()

447

import_csv_file()

448

```

449

450

### LISTEN/NOTIFY for Real-time Communication

451

452

```python

453

import postgresql

454

import postgresql.notifyman as notify

455

import threading

456

import time

457

458

# Set up two connections for demonstration

459

publisher = postgresql.open('pq://user:pass@localhost/mydb')

460

subscriber = postgresql.open('pq://user:pass@localhost/mydb')

461

462

def notification_publisher():

463

"""Publish notifications to channels."""

464

465

notifier = notify.NotificationManager(publisher)

466

467

for i in range(10):

468

# Send notifications to different channels

469

notifier.notify('events', f'Event {i}: Something happened')

470

notifier.notify('alerts', f'Alert {i}: Check system status')

471

472

print(f"Published notification {i}")

473

time.sleep(2)

474

475

# Send termination signal

476

notifier.notify('events', 'TERMINATE')

477

478

def notification_subscriber():

479

"""Subscribe to notifications and process them."""

480

481

listener = notify.NotificationManager(subscriber)

482

483

# Listen to multiple channels

484

listener.listen('events')

485

listener.listen('alerts')

486

487

print("Listening for notifications...")

488

489

while True:

490

# Wait for notifications (blocking)

491

notification = listener.wait_for_notification(timeout=30)

492

493

if notification:

494

channel = notification.channel

495

payload = notification.payload

496

sender_pid = notification.pid

497

498

print(f"Received on '{channel}': {payload} (from PID {sender_pid})")

499

500

# Check for termination signal

501

if payload == 'TERMINATE':

502

print("Termination signal received, stopping listener")

503

break

504

else:

505

print("Notification timeout")

506

break

507

508

# Clean up

509

listener.unlisten() # Unlisten from all channels

510

511

# Run publisher and subscriber in separate threads

512

subscriber_thread = threading.Thread(target=notification_subscriber)

513

publisher_thread = threading.Thread(target=notification_publisher)

514

515

subscriber_thread.start()

516

time.sleep(1) # Let subscriber start first

517

publisher_thread.start()

518

519

# Wait for both threads to complete

520

subscriber_thread.join()

521

publisher_thread.join()

522

523

publisher.close()

524

subscriber.close()

525

```

526

527

### Advisory Locks for Coordination

528

529

```python

530

import postgresql

531

import postgresql.alock as advisory_locks

532

import threading

533

import time

534

535

db1 = postgresql.open('pq://user:pass@localhost/mydb')

536

db2 = postgresql.open('pq://user:pass@localhost/mydb')

537

538

def exclusive_lock_example():

539

"""Demonstrate exclusive advisory locks."""

540

541

def worker(worker_id, connection):

542

lock = advisory_locks.ExclusiveLock(12345) # Lock ID

543

lock.connection = connection

544

545

print(f"Worker {worker_id}: Attempting to acquire exclusive lock")

546

547

# Try to acquire lock (blocking)

548

if lock.acquire():

549

print(f"Worker {worker_id}: Acquired exclusive lock")

550

551

# Simulate work

552

time.sleep(3)

553

554

print(f"Worker {worker_id}: Releasing exclusive lock")

555

lock.release()

556

else:

557

print(f"Worker {worker_id}: Failed to acquire lock")

558

559

# Start two workers competing for the same lock

560

thread1 = threading.Thread(target=worker, args=(1, db1))

561

thread2 = threading.Thread(target=worker, args=(2, db2))

562

563

thread1.start()

564

thread2.start()

565

566

thread1.join()

567

thread2.join()

568

569

def shared_lock_example():

570

"""Demonstrate shared advisory locks."""

571

572

def reader(reader_id, connection):

573

lock = advisory_locks.ShareLock(54321) # Shared lock ID

574

lock.connection = connection

575

576

print(f"Reader {reader_id}: Acquiring shared lock")

577

578

with lock: # Context manager automatically acquires/releases

579

print(f"Reader {reader_id}: Reading data (shared access)")

580

time.sleep(2)

581

print(f"Reader {reader_id}: Finished reading")

582

583

def writer(connection):

584

lock = advisory_locks.ExclusiveLock(54321) # Same ID as readers

585

lock.connection = connection

586

587

print("Writer: Waiting for exclusive access")

588

589

with lock:

590

print("Writer: Writing data (exclusive access)")

591

time.sleep(3)

592

print("Writer: Finished writing")

593

594

# Start multiple readers and one writer

595

reader_threads = []

596

for i in range(3):

597

thread = threading.Thread(target=reader, args=(i+1, db1))

598

reader_threads.append(thread)

599

thread.start()

600

601

time.sleep(1) # Let readers start first

602

603

writer_thread = threading.Thread(target=writer, args=(db2,))

604

writer_thread.start()

605

606

# Wait for all threads

607

for thread in reader_threads:

608

thread.join()

609

writer_thread.join()

610

611

def distributed_counter_example():

612

"""Implement distributed counter using advisory locks."""

613

614

# Create counter table

615

db1.execute("""

616

CREATE TABLE IF NOT EXISTS distributed_counter (

617

name TEXT PRIMARY KEY,

618

value INTEGER DEFAULT 0

619

)

620

""")

621

622

# Initialize counter

623

db1.execute("INSERT INTO distributed_counter (name, value) VALUES ('global', 0) ON CONFLICT (name) DO NOTHING")

624

625

def increment_counter(worker_id, connection, increments):

626

for i in range(increments):

627

# Use lock to ensure atomic counter increment

628

lock = advisory_locks.ExclusiveLock(99999) # Counter lock ID

629

lock.connection = connection

630

631

with lock:

632

# Read current value

633

current = connection.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']

634

635

# Increment

636

new_value = current + 1

637

connection.execute("UPDATE distributed_counter SET value = $1 WHERE name = 'global'", new_value)

638

639

print(f"Worker {worker_id}: Incremented counter to {new_value}")

640

641

time.sleep(0.1) # Small delay between increments

642

643

# Start multiple workers incrementing the counter

644

workers = []

645

for i in range(3):

646

connection = postgresql.open('pq://user:pass@localhost/mydb')

647

thread = threading.Thread(target=increment_counter, args=(i+1, connection, 5))

648

workers.append((thread, connection))

649

thread.start()

650

651

# Wait for all workers

652

for thread, connection in workers:

653

thread.join()

654

connection.close()

655

656

# Check final counter value

657

final_value = db1.query("SELECT value FROM distributed_counter WHERE name = 'global'")[0]['value']

658

print(f"Final counter value: {final_value}")

659

660

print("=== Exclusive Lock Example ===")

661

exclusive_lock_example()

662

663

print("\n=== Shared Lock Example ===")

664

shared_lock_example()

665

666

print("\n=== Distributed Counter Example ===")

667

distributed_counter_example()

668

669

db1.close()

670

db2.close()

671

```

672

673

### Streaming Large Result Sets

674

675

```python

676

import postgresql

677

import time

678

679

db = postgresql.open('pq://user:pass@localhost/mydb')

680

681

# Create large test dataset

682

db.execute("""

683

CREATE TABLE IF NOT EXISTS large_dataset AS

684

SELECT

685

generate_series(1, 1000000) as id,

686

'Item ' || generate_series(1, 1000000) as name,

687

random() * 1000 as value,

688

NOW() - (random() * interval '365 days') as created_at

689

""")

690

691

def stream_large_results():

692

"""Stream large result sets to avoid memory issues."""

693

694

# Prepare streaming query

695

query = db.prepare("""

696

SELECT id, name, value, created_at

697

FROM large_dataset

698

WHERE value > $1

699

ORDER BY value DESC

700

""")

701

702

print("Starting streaming query...")

703

start_time = time.time()

704

705

# Stream results instead of loading all into memory

706

row_count = 0

707

total_value = 0

708

709

for row in query.rows(500): # Stream rows where value > 500

710

row_count += 1

711

total_value += row['value']

712

713

# Process row (show first 10)

714

if row_count <= 10:

715

print(f"Row {row_count}: ID={row['id']}, Name={row['name']}, Value={row['value']:.2f}")

716

elif row_count % 10000 == 0:

717

print(f"Processed {row_count} rows...")

718

719

end_time = time.time()

720

avg_value = total_value / row_count if row_count > 0 else 0

721

722

print(f"Streaming complete:")

723

print(f" Rows processed: {row_count}")

724

print(f" Average value: {avg_value:.2f}")

725

print(f" Time taken: {end_time - start_time:.2f} seconds")

726

727

def chunk_processing():

728

"""Process large datasets in chunks."""

729

730

query = db.prepare("SELECT * FROM large_dataset ORDER BY id")

731

732

print("Starting chunk processing...")

733

734

chunk_size = 0

735

chunk_count = 0

736

total_processed = 0

737

738

# Process data in chunks

739

for chunk in query.chunks():

740

chunk_count += 1

741

chunk_size = len(chunk)

742

total_processed += chunk_size

743

744

# Process chunk

745

chunk_sum = sum(row['value'] for row in chunk)

746

chunk_avg = chunk_sum / chunk_size

747

748

print(f"Chunk {chunk_count}: {chunk_size} rows, avg value: {chunk_avg:.2f}")

749

750

# Simulate processing time

751

time.sleep(0.1)

752

753

# Limit for demonstration

754

if chunk_count >= 10:

755

break

756

757

print(f"Chunk processing complete - {total_processed} rows in {chunk_count} chunks")

758

759

def memory_efficient_aggregation():

760

"""Perform aggregations on large datasets without loading all data."""

761

762

query = db.prepare("""

763

SELECT

764

EXTRACT(month FROM created_at) as month,

765

id, value

766

FROM large_dataset

767

ORDER BY created_at

768

""")

769

770

# Track monthly statistics

771

monthly_stats = {}

772

773

print("Computing monthly statistics...")

774

775

for row in query.rows():

776

month = int(row['month'])

777

value = row['value']

778

779

if month not in monthly_stats:

780

monthly_stats[month] = {'count': 0, 'sum': 0, 'min': float('inf'), 'max': float('-inf')}

781

782

stats = monthly_stats[month]

783

stats['count'] += 1

784

stats['sum'] += value

785

stats['min'] = min(stats['min'], value)

786

stats['max'] = max(stats['max'], value)

787

788

# Display results

789

print("Monthly statistics:")

790

for month in sorted(monthly_stats.keys()):

791

stats = monthly_stats[month]

792

avg = stats['sum'] / stats['count']

793

print(f" Month {month:2d}: {stats['count']:6d} rows, "

794

f"avg: {avg:6.2f}, min: {stats['min']:6.2f}, max: {stats['max']:6.2f}")

795

796

# Run streaming examples

797

stream_large_results()

798

print("\n" + "="*50 + "\n")

799

chunk_processing()

800

print("\n" + "="*50 + "\n")

801

memory_efficient_aggregation()

802

803

db.close()

804

```

805

806

### Connection Pool Management

807

808

```python

809

import postgresql

810

import postgresql.pool as connection_pool

811

import threading

812

import time

813

import random

814

815

def connection_pool_example():

816

"""Demonstrate connection pooling for concurrent access."""

817

818

# Create connection factory

819

connector = postgresql.open('&pq://user:pass@localhost/mydb')

820

821

# Create connection pool

822

pool = connection_pool.ConnectionPool(

823

connector,

824

min_size=2,

825

max_size=5

826

)

827

828

def worker(worker_id, num_operations):

829

"""Worker function that uses pooled connections."""

830

831

for i in range(num_operations):

832

# Get connection from pool

833

conn = pool.get_connection()

834

835

try:

836

# Simulate database work

837

result = conn.query("SELECT $1 as worker_id, $2 as operation, NOW() as timestamp",

838

worker_id, i)

839

840

print(f"Worker {worker_id}, Op {i}: {result[0]['timestamp']}")

841

842

# Simulate processing time

843

time.sleep(random.uniform(0.1, 0.5))

844

845

finally:

846

# Always return connection to pool

847

pool.return_connection(conn)

848

849

# Small delay between operations

850

time.sleep(0.1)

851

852

print(f"Starting connection pool with {pool.size} connections")

853

854

# Start multiple workers

855

workers = []

856

for i in range(8): # More workers than pool size

857

thread = threading.Thread(target=worker, args=(i+1, 3))

858

workers.append(thread)

859

thread.start()

860

861

# Monitor pool status

862

def monitor_pool():

863

for _ in range(10):

864

print(f"Pool status - Size: {pool.size}, Available: {pool.available}")

865

time.sleep(1)

866

867

monitor_thread = threading.Thread(target=monitor_pool)

868

monitor_thread.start()

869

870

# Wait for all workers to complete

871

for thread in workers:

872

thread.join()

873

874

monitor_thread.join()

875

876

# Clean up

877

pool.close_all()

878

print("Connection pool closed")

879

880

# Run connection pool example

881

connection_pool_example()

882

```