or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-operations.mdconnections.mdcursors.mderror-handling.mdindex.mdrow-factories.mdsql-composition.mdtype-system.md

advanced-operations.mddocs/

0

# Advanced Operations

1

2

High-performance operations including COPY for bulk data transfer, pipeline operations for batching, prepared statements, and server-side cursors for memory-efficient large result set processing.

3

4

## Capabilities

5

6

### COPY Operations

7

8

PostgreSQL's COPY protocol for high-performance bulk data transfer between Python and the database.

9

10

```python { .api }

11

class Copy:

12

"""Synchronous COPY operations manager"""

13

14

def __enter__(self) -> Copy:

15

"""Enter context manager for COPY operation"""

16

17

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

18

"""Exit context manager and finalize COPY"""

19

20

def write(self, data: bytes) -> None:

21

"""

22

Write raw bytes to COPY operation.

23

24

Args:

25

data: Raw bytes in COPY format

26

"""

27

28

def write_row(self, row: Sequence[Any]) -> None:

29

"""

30

Write single row to COPY operation.

31

32

Args:

33

row: Sequence of column values

34

"""

35

36

def read(self) -> bytes:

37

"""

38

Read raw bytes from COPY operation.

39

40

Returns:

41

Raw bytes in COPY format

42

"""

43

44

def read_row(self) -> Sequence[Any] | None:

45

"""

46

Read single row from COPY operation.

47

48

Returns:

49

Row as sequence of values, None if no more data

50

"""

51

52

class AsyncCopy:

53

"""Asynchronous COPY operations manager"""

54

55

async def __aenter__(self) -> AsyncCopy:

56

"""Enter async context manager for COPY operation"""

57

58

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

59

"""Exit async context manager and finalize COPY"""

60

61

async def write(self, data: bytes) -> None:

62

"""Async version of write()"""

63

64

async def write_row(self, row: Sequence[Any]) -> None:

65

"""Async version of write_row()"""

66

67

async def read(self) -> bytes:

68

"""Async version of read()"""

69

70

async def read_row(self) -> Sequence[Any] | None:

71

"""Async version of read_row()"""

72

```

73

74

#### COPY Usage Examples

75

76

```python

77

# Bulk insert using COPY FROM

78

data = [

79

("Alice", 30, "alice@example.com"),

80

("Bob", 25, "bob@example.com"),

81

("Charlie", 35, "charlie@example.com")

82

]

83

84

with conn.cursor() as cur:

85

with cur.copy("COPY users (name, age, email) FROM STDIN") as copy:

86

for row in data:

87

copy.write_row(row)

88

89

# Bulk export using COPY TO

90

with conn.cursor() as cur:

91

with cur.copy("COPY users TO STDOUT WITH CSV HEADER") as copy:

92

while True:

93

data = copy.read()

94

if not data:

95

break

96

print(data.decode())

97

98

# Row-by-row export

99

with conn.cursor() as cur:

100

with cur.copy("COPY users (name, email) TO STDOUT") as copy:

101

while True:

102

row = copy.read_row()

103

if row is None:

104

break

105

name, email = row

106

print(f"{name}: {email}")

107

```

108

109

### Pipeline Operations

110

111

Batch multiple operations for improved performance by reducing network round-trips.

112

113

```python { .api }

114

class Pipeline:

115

"""Synchronous pipeline for batching operations"""

116

117

def __enter__(self) -> Pipeline:

118

"""Enter pipeline context manager"""

119

120

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

121

"""Exit pipeline and process all queued operations"""

122

123

def sync(self) -> None:

124

"""

125

Force synchronization of all queued operations.

126

Process results and handle any errors.

127

"""

128

129

@property

130

def status(self) -> PipelineStatus:

131

"""

132

Get the current pipeline status.

133

134

Returns:

135

Current pipeline status (OFF, ON, or ABORTED)

136

"""

137

138

@classmethod

139

def is_supported(cls) -> bool:

140

"""

141

Check if pipeline mode is supported by the current libpq version.

142

143

Returns:

144

True if pipeline mode is supported, False otherwise

145

"""

146

147

class AsyncPipeline:

148

"""Asynchronous pipeline for batching operations"""

149

150

async def __aenter__(self) -> AsyncPipeline:

151

"""Enter async pipeline context manager"""

152

153

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

154

"""Exit async pipeline and process all queued operations"""

155

156

async def sync(self) -> None:

157

"""Async version of sync()"""

158

159

@property

160

def status(self) -> PipelineStatus:

161

"""Get the current pipeline status"""

162

163

@classmethod

164

def is_supported(cls) -> bool:

165

"""Check if pipeline mode is supported"""

166

167

from psycopg.pq import PipelineStatus

168

169

class PipelineStatus(Enum):

170

"""Pipeline status enumeration"""

171

OFF = 0 # Connection is not in pipeline mode

172

ON = auto() # Connection is in pipeline mode

173

ABORTED = auto() # Pipeline is aborted due to error

174

```

175

176

#### Pipeline Usage Examples

177

178

```python

179

# Batch multiple INSERT operations

180

with conn.pipeline() as pipeline:

181

with conn.cursor() as cur:

182

for i in range(1000):

183

cur.execute(

184

"INSERT INTO items (name, value) VALUES (%s, %s)",

185

(f"item_{i}", i * 10)

186

)

187

# All operations sent to server when pipeline exits

188

189

# Manual synchronization points

190

with conn.pipeline() as pipeline:

191

with conn.cursor() as cur:

192

# First batch

193

for i in range(100):

194

cur.execute("INSERT INTO batch1 (id) VALUES (%s)", (i,))

195

196

pipeline.sync() # Force processing of first batch

197

198

# Second batch

199

for i in range(100):

200

cur.execute("INSERT INTO batch2 (id) VALUES (%s)", (i,))

201

# Second batch processed on pipeline exit

202

```

203

204

### Prepared Statements

205

206

Automatic prepared statement management for improved performance with repeated queries.

207

208

```python { .api }

209

class Connection:

210

def prepare(self, query: str, name: str | None = None) -> str:

211

"""

212

Explicitly prepare a statement for repeated execution.

213

214

Args:

215

query: SQL query to prepare

216

name: Optional name for prepared statement

217

218

Returns:

219

Name of prepared statement

220

"""

221

222

def prepared(self) -> dict[str, str]:

223

"""

224

Get mapping of prepared statement names to queries.

225

226

Returns:

227

Dictionary of {name: query} pairs

228

"""

229

230

# Automatic preparation with prepare=True

231

class Cursor:

232

def execute(

233

self,

234

query,

235

params=None,

236

*,

237

prepare: bool | None = None

238

) -> Cursor:

239

"""

240

Execute query with optional automatic preparation.

241

242

Args:

243

query: SQL query

244

params: Query parameters

245

prepare: True to prepare statement, None for auto

246

"""

247

```

248

249

#### Prepared Statement Examples

250

251

```python

252

# Explicit preparation

253

with conn.cursor() as cur:

254

# Prepare statement manually

255

stmt_name = conn.prepare(

256

"SELECT * FROM users WHERE age > $1 AND city = $2"

257

)

258

259

# Use prepared statement multiple times

260

cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (25, "New York"))

261

result1 = cur.fetchall()

262

263

cur.execute(f"EXECUTE {stmt_name} (%s, %s)", (30, "Boston"))

264

result2 = cur.fetchall()

265

266

# Automatic preparation

267

with conn.cursor() as cur:

268

query = "SELECT * FROM products WHERE price > %s"

269

270

# First execution prepares statement automatically

271

cur.execute(query, (100,), prepare=True)

272

expensive_products = cur.fetchall()

273

274

# Subsequent executions reuse prepared statement

275

cur.execute(query, (200,), prepare=True)

276

very_expensive = cur.fetchall()

277

278

cur.execute(query, (50,), prepare=True)

279

moderate_products = cur.fetchall()

280

```

281

282

### Large Object Support

283

284

Handle PostgreSQL large objects (BLOBs) for storing binary data larger than 1GB.

285

286

```python { .api }

287

class Connection:

288

def lobject(

289

self,

290

oid: int = 0,

291

mode: str = "r",

292

new_oid: int = 0

293

) -> LargeObject:

294

"""

295

Open or create large object.

296

297

Args:

298

oid: Object ID (0 to create new)

299

mode: Open mode ("r", "w", "rw")

300

new_oid: Specific OID for new object

301

302

Returns:

303

LargeObject instance

304

"""

305

306

class LargeObject:

307

"""Large object interface for binary data > 1GB"""

308

309

@property

310

def oid(self) -> int:

311

"""Large object OID"""

312

313

def read(self, size: int = -1) -> bytes:

314

"""

315

Read bytes from large object.

316

317

Args:

318

size: Number of bytes to read (-1 for all)

319

320

Returns:

321

Bytes read from object

322

"""

323

324

def write(self, data: bytes) -> int:

325

"""

326

Write bytes to large object.

327

328

Args:

329

data: Bytes to write

330

331

Returns:

332

Number of bytes written

333

"""

334

335

def seek(self, pos: int, whence: int = 0) -> int:

336

"""

337

Seek to position in large object.

338

339

Args:

340

pos: Position to seek to

341

whence: Seek mode (0=absolute, 1=relative, 2=from end)

342

343

Returns:

344

New position

345

"""

346

347

def tell(self) -> int:

348

"""Get current position in large object"""

349

350

def truncate(self, size: int) -> None:

351

"""Truncate large object to specified size"""

352

353

def close(self) -> None:

354

"""Close large object"""

355

356

def unlink(self) -> None:

357

"""Delete large object from database"""

358

```

359

360

#### Large Object Examples

361

362

```python

363

# Store large file as large object

364

def store_file_as_lob(conn, file_path):

365

"""Store file as PostgreSQL large object"""

366

367

with conn.lobject(0, "w") as lob:

368

with open(file_path, "rb") as f:

369

while True:

370

chunk = f.read(65536) # 64KB chunks

371

if not chunk:

372

break

373

lob.write(chunk)

374

375

return lob.oid # Return OID for future reference

376

377

# Retrieve large object as file

378

def retrieve_lob_as_file(conn, oid, output_path):

379

"""Retrieve large object and save as file"""

380

381

with conn.lobject(oid, "r") as lob:

382

with open(output_path, "wb") as f:

383

while True:

384

chunk = lob.read(65536)

385

if not chunk:

386

break

387

f.write(chunk)

388

389

# Stream large object data

390

def stream_lob_data(conn, oid):

391

"""Stream large object data in chunks"""

392

393

with conn.lobject(oid, "r") as lob:

394

while True:

395

chunk = lob.read(8192) # 8KB chunks

396

if not chunk:

397

break

398

yield chunk

399

```

400

401

### Connection Pooling

402

403

Connection pool management for high-performance applications (requires psycopg-pool package).

404

405

```python { .api }

406

# Note: Requires separate psycopg-pool package

407

from psycopg_pool import ConnectionPool, AsyncConnectionPool

408

409

class ConnectionPool:

410

"""Synchronous connection pool"""

411

412

def __init__(

413

self,

414

conninfo: str = "",

415

*,

416

min_size: int = 4,

417

max_size: int | None = None,

418

open: bool = True,

419

name: str | None = None,

420

timeout: float = 30.0,

421

**kwargs

422

):

423

"""

424

Create connection pool.

425

426

Args:

427

conninfo: Connection string

428

min_size: Minimum pool size

429

max_size: Maximum pool size (None = unlimited)

430

open: Open pool immediately

431

name: Pool name for identification

432

timeout: Connection checkout timeout

433

"""

434

435

def getconn(self, timeout: float | None = None) -> Connection:

436

"""

437

Get connection from pool.

438

439

Args:

440

timeout: Checkout timeout (None = use pool default)

441

442

Returns:

443

Connection from pool

444

"""

445

446

def putconn(self, conn: Connection) -> None:

447

"""

448

Return connection to pool.

449

450

Args:

451

conn: Connection to return

452

"""

453

454

def open(self) -> None:

455

"""Open the connection pool"""

456

457

def close(self) -> None:

458

"""Close the connection pool"""

459

460

@property

461

def name(self) -> str | None:

462

"""Pool name"""

463

464

@property

465

def size(self) -> int:

466

"""Current pool size"""

467

468

@property

469

def available(self) -> int:

470

"""Available connections in pool"""

471

472

class AsyncConnectionPool:

473

"""Asynchronous connection pool"""

474

# Same interface as ConnectionPool but with async methods

475

476

async def getconn(self, timeout: float | None = None) -> AsyncConnection: ...

477

async def putconn(self, conn: AsyncConnection) -> None: ...

478

async def open(self) -> None: ...

479

async def close(self) -> None: ...

480

```

481

482

#### Connection Pool Examples

483

484

```python

485

from psycopg_pool import ConnectionPool

486

487

# Create connection pool

488

pool = ConnectionPool("dbname=mydb user=postgres", min_size=2, max_size=10)

489

490

# Use connection from pool

491

conn = pool.getconn()

492

try:

493

with conn.cursor() as cur:

494

cur.execute("SELECT * FROM users")

495

users = cur.fetchall()

496

finally:

497

pool.putconn(conn) # Always return to pool

498

499

# Context manager usage

500

with pool.connection() as conn:

501

with conn.cursor() as cur:

502

cur.execute("SELECT COUNT(*) FROM products")

503

count = cur.fetchone()[0]

504

# Connection automatically returned to pool

505

506

# Cleanup

507

pool.close()

508

```

509

510

### Advanced Cursor Features

511

512

Additional cursor capabilities for specialized use cases.

513

514

#### Named Cursors (Server-Side)

515

516

```python

517

# Server-side cursor for large result sets

518

with conn.cursor(name="large_scan") as cur:

519

cur.execute("SELECT * FROM huge_table ORDER BY id")

520

521

# Process results in batches

522

while True:

523

batch = cur.fetchmany(1000)

524

if not batch:

525

break

526

527

process_batch(batch)

528

529

# Optional: update progress

530

print(f"Processed {cur.rownumber} rows")

531

```

532

533

#### Scrollable Cursors

534

535

```python

536

# Scrollable server-side cursor

537

with conn.cursor(name="scrollable", scrollable=True) as cur:

538

cur.execute("SELECT id, name FROM users ORDER BY name")

539

540

# Move around in result set

541

cur.scroll(10) # Skip first 10 rows

542

row = cur.fetchone()

543

544

cur.scroll(-5, mode="relative") # Go back 5 rows

545

row = cur.fetchone()

546

547

cur.scroll(0, mode="absolute") # Go to beginning

548

first_row = cur.fetchone()

549

```

550

551

#### Holdable Cursors

552

553

```python

554

# Cursor that survives transaction boundaries

555

with conn.cursor(name="holdable", withhold=True) as cur:

556

cur.execute("SELECT * FROM users")

557

558

# Start transaction

559

conn.commit() # Cursor remains valid

560

561

# Continue fetching after transaction boundary

562

remaining_rows = cur.fetchall()

563

```

564

565

### Performance Monitoring

566

567

Built-in capabilities for monitoring and optimizing database operations.

568

569

```python { .api }

570

class Capabilities:

571

"""Database and driver capability detection"""

572

573

@property

574

def libpq_version(self) -> int:

575

"""libpq version number"""

576

577

@property

578

def has_prepare(self) -> bool:

579

"""True if prepared statements are supported"""

580

581

@property

582

def has_pipeline(self) -> bool:

583

"""True if pipeline mode is supported"""

584

585

# Global capabilities instance

586

capabilities: Capabilities

587

```

588

589

#### Performance Examples

590

591

```python

592

# Check capabilities

593

if psycopg.capabilities.has_pipeline:

594

# Use pipeline for bulk operations

595

with conn.pipeline():

596

# ... bulk operations

597

else:

598

# Fall back to individual operations

599

# ... individual operations

600

601

# Monitor prepared statements

602

print("Prepared statements:", conn.prepared())

603

604

# Connection info for debugging

605

info = conn.info

606

print(f"Server version: {info.server_version}")

607

print(f"Backend PID: {info.backend_pid}")

608

print(f"Transaction status: {info.transaction_status}")

609

```

610

611

## Advanced Configuration

612

613

### Connection-Level Settings

614

615

```python

616

# Configure connection for high performance

617

conn = psycopg.connect(

618

"dbname=mydb user=postgres",

619

620

# Connection options

621

application_name="MyApp",

622

connect_timeout=10,

623

624

# Performance options

625

prepare_threshold=5, # Auto-prepare after 5 executions

626

options="-c synchronous_commit=off" # Fast writes

627

)

628

629

# Configure adapters for performance

630

conn.adapters.register_dumper(MyClass, fast_dumper)

631

conn.adapters.register_loader(MY_TYPE_OID, fast_loader)

632

```

633

634

### Query Optimization

635

636

```python

637

# Use binary format for large result sets

638

with conn.cursor() as cur:

639

cur.execute("SELECT data FROM large_table", binary=True)

640

# Results returned in binary format (faster)

641

642

# Optimize fetch sizes

643

with conn.cursor() as cur:

644

cur.itersize = 1000 # Fetch 1000 rows per network round-trip

645

cur.execute("SELECT * FROM big_table")

646

647

for row in cur: # Efficient iteration

648

process_row(row)

649

```