or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdconnection-pooling.mdconnections-cursors.mdcursors-rows.mderror-handling.mdindex.mdsql-composition.mdtypes-adaptation.md

advanced-features.mddocs/

0

# Advanced PostgreSQL Features

1

2

Large objects, server-side cursors, asynchronous operations, notifications (LISTEN/NOTIFY), replication support, and other PostgreSQL-specific advanced functionality. These features leverage PostgreSQL's unique capabilities for high-performance applications.

3

4

## Capabilities

5

6

### Large Objects (LOB)

7

8

PostgreSQL large object interface for handling binary data larger than typical field limits.

9

10

```python { .api }

11

class lobject:

12

"""PostgreSQL large object interface."""

13

14

def __init__(self, conn, oid=0, mode='r', new_oid=None, new_file=None):

15

"""

16

Initialize large object.

17

18

Parameters:

19

- conn (connection): Database connection

20

- oid (int): Existing large object OID (0 for new)

21

- mode (str): Access mode ('r', 'w', 'rw', 'n')

22

- new_oid (int, optional): OID for new object

23

- new_file (str, optional): File to import

24

"""

25

26

def read(self, size=-1):

27

"""

28

Read data from large object.

29

30

Parameters:

31

- size (int): Bytes to read (-1 for all)

32

33

Returns:

34

bytes: Data read from object

35

"""

36

37

def write(self, data):

38

"""

39

Write data to large object.

40

41

Parameters:

42

- data (bytes): Data to write

43

44

Returns:

45

int: Number of bytes written

46

"""

47

48

def seek(self, pos, whence=0):

49

"""

50

Seek to position in large object.

51

52

Parameters:

53

- pos (int): Position to seek to

54

- whence (int): Seek origin (0=start, 1=current, 2=end)

55

56

Returns:

57

int: New position

58

"""

59

60

def tell(self):

61

"""

62

Get current position.

63

64

Returns:

65

int: Current position in object

66

"""

67

68

def truncate(self, size=0):

69

"""

70

Truncate large object.

71

72

Parameters:

73

- size (int): New size

74

"""

75

76

def close(self):

77

"""Close large object."""

78

79

def export(self, filename):

80

"""

81

Export large object to file.

82

83

Parameters:

84

- filename (str): Target filename

85

"""

86

87

def unlink(self):

88

"""Delete large object from database."""

89

90

@property

91

def oid(self):

92

"""Large object OID."""

93

94

@property

95

def mode(self):

96

"""Access mode."""

97

98

@property

99

def closed(self):

100

"""Closed status."""

101

```

102

103

Usage examples:

104

105

```python

106

# Create new large object

107

lobj = conn.lobject(mode='w')

108

lobj.write(b'Large binary data here...')

109

oid = lobj.oid

110

lobj.close()

111

112

# Store OID in table

113

cur.execute("INSERT INTO documents (name, data_oid) VALUES (%s, %s)",

114

('document.pdf', oid))

115

116

# Read large object

117

cur.execute("SELECT data_oid FROM documents WHERE name = %s", ('document.pdf',))

118

oid = cur.fetchone()[0]

119

120

lobj = conn.lobject(oid, mode='r')

121

data = lobj.read()

122

lobj.close()

123

124

# Work with files

125

lobj = conn.lobject(mode='w', new_file='/path/to/large_file.bin')

126

stored_oid = lobj.oid

127

128

# Export large object

129

lobj = conn.lobject(oid, mode='r')

130

lobj.export('/path/to/exported_file.bin')

131

lobj.close()

132

```

133

134

### Asynchronous Operations

135

136

Support for non-blocking database operations with polling and wait callbacks.

137

138

```python { .api }

139

def set_wait_callback(f):

140

"""

141

Set global wait callback for async operations.

142

143

Parameters:

144

- f (callable): Callback function for waiting

145

"""

146

147

def get_wait_callback():

148

"""

149

Get current wait callback.

150

151

Returns:

152

callable/None: Current wait callback

153

"""

154

155

# Connection polling constants

156

POLL_OK: int # Operation completed

157

POLL_READ: int # Wait for socket read

158

POLL_WRITE: int # Wait for socket write

159

POLL_ERROR: int # Error occurred

160

```

161

162

Usage examples:

163

164

```python

165

import select

166

from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE

167

168

# Create async connection

169

conn = psycopg2.connect(..., async_=True)

170

171

# Wait for connection to complete

172

def wait_for_connection(conn):

173

while True:

174

state = conn.poll()

175

if state == POLL_OK:

176

break

177

elif state == POLL_READ:

178

select.select([conn.fileno()], [], [])

179

elif state == POLL_WRITE:

180

select.select([], [conn.fileno()], [])

181

else:

182

raise Exception("Connection failed")

183

184

wait_for_connection(conn)

185

186

# Async query execution

187

cur = conn.cursor()

188

cur.execute("SELECT * FROM large_table")

189

190

# Poll for query completion

191

def wait_for_query(conn):

192

while True:

193

state = conn.poll()

194

if state == POLL_OK:

195

return

196

elif state == POLL_READ:

197

select.select([conn.fileno()], [], [])

198

elif state == POLL_WRITE:

199

select.select([], [conn.fileno()], [])

200

201

wait_for_query(conn)

202

results = cur.fetchall()

203

204

# Custom wait callback

205

def custom_wait_callback(conn):

206

"""Custom wait callback using select."""

207

while True:

208

state = conn.poll()

209

if state == POLL_OK:

210

break

211

elif state == POLL_READ:

212

select.select([conn.fileno()], [], [], 1.0) # 1 second timeout

213

elif state == POLL_WRITE:

214

select.select([], [conn.fileno()], [], 1.0)

215

216

set_wait_callback(custom_wait_callback)

217

```

218

219

### Notifications (LISTEN/NOTIFY)

220

221

PostgreSQL's asynchronous notification system for inter-process communication.

222

223

```python { .api }

224

class Notify:

225

"""PostgreSQL notification message."""

226

227

@property

228

def channel(self):

229

"""Notification channel name."""

230

231

@property

232

def payload(self):

233

"""Notification payload data."""

234

235

@property

236

def pid(self):

237

"""Process ID that sent notification."""

238

239

# Connection notification methods

240

class connection:

241

def notifies(self):

242

"""

243

Get pending notifications.

244

245

Returns:

246

list: List of Notify objects

247

"""

248

```

249

250

Usage examples:

251

252

```python

253

# Setup listener connection

254

listener_conn = psycopg2.connect(...)

255

listener_conn.autocommit = True

256

257

# Listen for notifications

258

cur = listener_conn.cursor()

259

cur.execute("LISTEN order_updates")

260

cur.execute("LISTEN inventory_changes")

261

262

# Setup notifier connection

263

notifier_conn = psycopg2.connect(...)

264

notifier_conn.autocommit = True

265

266

# Send notification

267

notifier_cur = notifier_conn.cursor()

268

notifier_cur.execute("NOTIFY order_updates, 'Order 12345 shipped'")

269

270

# Check for notifications (polling)

271

listener_conn.poll()

272

notifies = listener_conn.notifies()

273

for notify in notifies:

274

print(f"Channel: {notify.channel}")

275

print(f"Payload: {notify.payload}")

276

print(f"From PID: {notify.pid}")

277

278

# Async notification handling

279

def notification_handler():

280

while True:

281

# Wait for data

282

select.select([listener_conn], [], [])

283

284

# Poll connection

285

listener_conn.poll()

286

287

# Process notifications

288

while listener_conn.notifies():

289

notify = listener_conn.notifies().pop(0)

290

handle_notification(notify)

291

292

def handle_notification(notify):

293

if notify.channel == 'order_updates':

294

process_order_update(notify.payload)

295

elif notify.channel == 'inventory_changes':

296

process_inventory_change(notify.payload)

297

```

298

299

### Replication Support

300

301

Support for PostgreSQL streaming replication (physical and logical).

302

303

```python { .api }

304

class ReplicationConnection(connection):

305

"""Connection for replication operations."""

306

307

def __init__(self, *args, **kwargs):

308

"""Initialize replication connection."""

309

310

class ReplicationCursor(cursor):

311

"""Cursor for replication operations."""

312

313

def start_replication(self, slot_name=None, decode=False, start_lsn=None,

314

timeline=None, options=None):

315

"""

316

Start replication stream.

317

318

Parameters:

319

- slot_name (str, optional): Replication slot name

320

- decode (bool): Logical decoding mode

321

- start_lsn (str, optional): Starting LSN

322

- timeline (int, optional): Timeline ID

323

- options (dict, optional): Additional options

324

"""

325

326

def send_feedback(self, write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False):

327

"""

328

Send replication feedback.

329

330

Parameters:

331

- write_lsn (int): Write LSN

332

- flush_lsn (int): Flush LSN

333

- apply_lsn (int): Apply LSN

334

- reply (bool): Request reply

335

"""

336

337

def create_replication_slot(self, slot_name, output_plugin=None):

338

"""

339

Create replication slot.

340

341

Parameters:

342

- slot_name (str): Slot name

343

- output_plugin (str, optional): Output plugin for logical slots

344

"""

345

346

def drop_replication_slot(self, slot_name):

347

"""

348

Drop replication slot.

349

350

Parameters:

351

- slot_name (str): Slot name to drop

352

"""

353

354

class ReplicationMessage:

355

"""Replication stream message."""

356

357

@property

358

def data_start(self):

359

"""Message data start LSN."""

360

361

@property

362

def wal_end(self):

363

"""WAL end LSN."""

364

365

@property

366

def send_time(self):

367

"""Message send time."""

368

369

@property

370

def payload(self):

371

"""Message payload data."""

372

373

# Replication constants

374

REPLICATION_PHYSICAL: int # Physical replication mode

375

REPLICATION_LOGICAL: int # Logical replication mode

376

```

377

378

Usage examples:

379

380

```python

381

from psycopg2.extras import (

382

ReplicationConnection,

383

REPLICATION_PHYSICAL,

384

REPLICATION_LOGICAL

385

)

386

387

# Physical replication

388

repl_conn = psycopg2.connect(

389

host='master-server',

390

user='replication_user',

391

connection_factory=ReplicationConnection

392

)

393

394

cur = repl_conn.cursor()

395

396

# Create physical replication slot

397

cur.create_replication_slot('physical_slot')

398

399

# Start physical replication

400

cur.start_replication(slot_name='physical_slot')

401

402

# Process replication messages

403

def consume_stream():

404

for msg in cur:

405

# Process WAL data

406

process_wal_message(msg)

407

408

# Send feedback periodically

409

cur.send_feedback(flush_lsn=msg.data_start)

410

411

# Logical replication

412

logical_conn = psycopg2.connect(

413

connection_factory=ReplicationConnection,

414

...

415

)

416

417

logical_cur = logical_conn.cursor()

418

419

# Create logical replication slot

420

logical_cur.create_replication_slot('logical_slot', 'test_decoding')

421

422

# Start logical replication

423

logical_cur.start_replication(

424

slot_name='logical_slot',

425

decode=True,

426

options={'include-xids': 0, 'skip-empty-xacts': 1}

427

)

428

429

# Process logical changes

430

for msg in logical_cur:

431

change_data = msg.payload.decode('utf-8')

432

process_logical_change(change_data)

433

```

434

435

### Connection Information and Diagnostics

436

437

Access to connection state and PostgreSQL server information.

438

439

```python { .api }

440

class ConnectionInfo:

441

"""Connection information and state."""

442

443

@property

444

def dbname(self):

445

"""Database name."""

446

447

@property

448

def user(self):

449

"""Connected user."""

450

451

@property

452

def password(self):

453

"""Connection password (masked)."""

454

455

@property

456

def host(self):

457

"""Server host."""

458

459

@property

460

def port(self):

461

"""Server port."""

462

463

@property

464

def options(self):

465

"""Connection options."""

466

467

@property

468

def dsn_parameters(self):

469

"""All DSN parameters as dict."""

470

471

@property

472

def status(self):

473

"""Connection status."""

474

475

@property

476

def transaction_status(self):

477

"""Transaction status."""

478

479

@property

480

def protocol_version(self):

481

"""Protocol version."""

482

483

@property

484

def server_version(self):

485

"""Server version."""

486

487

@property

488

def error_message(self):

489

"""Last error message."""

490

491

@property

492

def backend_pid(self):

493

"""Backend process ID."""

494

495

@property

496

def needs_password(self):

497

"""Whether connection needs password."""

498

499

@property

500

def used_password(self):

501

"""Whether password was used."""

502

503

@property

504

def ssl_in_use(self):

505

"""Whether SSL is in use."""

506

507

class Diagnostics:

508

"""Error diagnostics information."""

509

510

def __init__(self, exception):

511

"""Initialize from exception."""

512

513

@property

514

def severity(self):

515

"""Error severity."""

516

517

@property

518

def sqlstate(self):

519

"""SQL state code."""

520

521

@property

522

def message_primary(self):

523

"""Primary error message."""

524

525

@property

526

def message_detail(self):

527

"""Detailed error message."""

528

529

@property

530

def message_hint(self):

531

"""Error hint."""

532

533

@property

534

def statement_position(self):

535

"""Error position in statement."""

536

537

@property

538

def internal_position(self):

539

"""Internal statement position."""

540

541

@property

542

def internal_query(self):

543

"""Internal query causing error."""

544

545

@property

546

def context(self):

547

"""Error context."""

548

549

@property

550

def schema_name(self):

551

"""Schema name related to error."""

552

553

@property

554

def table_name(self):

555

"""Table name related to error."""

556

557

@property

558

def column_name(self):

559

"""Column name related to error."""

560

561

@property

562

def datatype_name(self):

563

"""Data type name related to error."""

564

565

@property

566

def constraint_name(self):

567

"""Constraint name related to error."""

568

569

@property

570

def source_file(self):

571

"""Source file where error occurred."""

572

573

@property

574

def source_line(self):

575

"""Source line where error occurred."""

576

577

@property

578

def source_function(self):

579

"""Source function where error occurred."""

580

```

581

582

Usage examples:

583

584

```python

585

# Connection information

586

conn_info = conn.info

587

print(f"Database: {conn_info.dbname}")

588

print(f"User: {conn_info.user}")

589

print(f"Host: {conn_info.host}:{conn_info.port}")

590

print(f"Server version: {conn_info.server_version}")

591

print(f"Backend PID: {conn_info.backend_pid}")

592

print(f"SSL in use: {conn_info.ssl_in_use}")

593

594

# Detailed error diagnostics

595

try:

596

cur.execute("INSERT INTO users (id, email) VALUES (1, 'invalid-email')")

597

except psycopg2.IntegrityError as e:

598

diag = psycopg2.extensions.Diagnostics(e)

599

print(f"Error: {diag.message_primary}")

600

print(f"Detail: {diag.message_detail}")

601

print(f"Hint: {diag.message_hint}")

602

print(f"SQL State: {diag.sqlstate}")

603

print(f"Table: {diag.table_name}")

604

print(f"Constraint: {diag.constraint_name}")

605

```

606

607

### Security Features

608

609

Password encryption and security-related functionality.

610

611

```python { .api }

612

def encrypt_password(password, user, scope=None, algorithm=None):

613

"""

614

Encrypt password for PostgreSQL authentication.

615

616

Parameters:

617

- password (str): Plain text password

618

- user (str): Username

619

- scope (connection, optional): Connection scope

620

- algorithm (str, optional): Encryption algorithm

621

622

Returns:

623

str: Encrypted password string

624

"""

625

```

626

627

Usage examples:

628

629

```python

630

# Encrypt password for storage

631

encrypted = psycopg2.extensions.encrypt_password('mypassword', 'myuser')

632

print(encrypted) # 'md5...' or 'SCRAM-SHA-256$...'

633

634

# Use with connection

635

conn = psycopg2.connect(

636

host='localhost',

637

user='myuser',

638

password=encrypted # Can use pre-encrypted password

639

)

640

```

641

642

## Types

643

644

### Replication Types

645

646

```python { .api }

647

ReplicationSlotInfo = {

648

'slot_name': str,

649

'consistent_point': str,

650

'snapshot_name': str,

651

'output_plugin': str

652

}

653

654

ReplicationOptions = {

655

'include-xids': int,

656

'skip-empty-xacts': int,

657

'include-rewrites': int,

658

'pretty-print': int

659

}

660

```

661

662

### Large Object Constants

663

664

```python { .api }

665

# Access modes

666

INV_READ: int = 0x40 # Read access

667

INV_WRITE: int = 0x80 # Write access

668

669

# Seek origins

670

SEEK_SET: int = 0 # From beginning

671

SEEK_CUR: int = 1 # From current position

672

SEEK_END: int = 2 # From end

673

```