or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-management.mdconnection-pooling.mdcopy-operations.mdcursor-operations.mdexception-handling.mdindex.mdlisteners-notifications.mdprepared-statements.mdquery-execution.mdtransaction-management.mdtype-system.md

copy-operations.mddocs/

0

# COPY Operations

1

2

High-performance bulk data import/export using PostgreSQL's native COPY protocol with support for various formats, streaming, custom delimiters, and efficient handling of large datasets.

3

4

## Capabilities

5

6

### Copy From Table

7

8

Export table data to files or file-like objects with comprehensive formatting options.

9

10

```python { .api }

11

async def copy_from_table(

12

self,

13

table_name: str,

14

*,

15

output,

16

columns: typing.List[str] = None,

17

schema_name: str = None,

18

timeout: float = None,

19

format: str = None,

20

oids: bool = None,

21

delimiter: str = None,

22

null: str = None,

23

header: bool = None,

24

quote: str = None,

25

escape: str = None,

26

force_quote: typing.Union[bool, typing.List[str]] = None,

27

encoding: str = None

28

) -> str

29

"""

30

Copy table contents to a file or file-like object.

31

32

Parameters:

33

table_name: Name of the table to copy from

34

output: File path (str) or file-like object to write to

35

columns: List of column names to copy (default: all columns)

36

schema_name: Schema name (default: current schema)

37

timeout: Operation timeout in seconds

38

format: Output format ('text', 'csv', 'binary')

39

oids: Include object IDs

40

delimiter: Field delimiter character

41

null: String representing NULL values

42

header: Include header row (CSV format)

43

quote: Quote character (CSV format)

44

escape: Escape character (CSV format)

45

force_quote: List of columns to always quote

46

encoding: Character encoding

47

48

Returns:

49

COPY command status string

50

"""

51

```

52

53

#### Example Usage

54

55

```python

56

# Copy table to file

57

with open('/tmp/users.csv', 'w') as f:

58

await conn.copy_from_table('users', output=f, format='csv', header=True)

59

60

# Copy specific columns

61

await conn.copy_from_table(

62

'orders',

63

output='/tmp/orders.csv',

64

columns=['id', 'customer_id', 'total', 'created_at'],

65

format='csv',

66

header=True

67

)

68

69

# Copy with custom formatting

70

import io

71

buffer = io.StringIO()

72

await conn.copy_from_table(

73

'products',

74

output=buffer,

75

format='csv',

76

delimiter='|',

77

null='NULL',

78

quote='"',

79

force_quote=['name', 'description']

80

)

81

csv_data = buffer.getvalue()

82

83

# Binary format for maximum performance

84

with open('/tmp/users.bin', 'wb') as f:

85

await conn.copy_from_table('users', output=f, format='binary')

86

```

87

88

### Copy From Query

89

90

Export query results to files or file-like objects, enabling complex data transformations during export.

91

92

```python { .api }

93

async def copy_from_query(

94

self,

95

query: str,

96

*args,

97

output,

98

timeout: float = None,

99

format: str = None,

100

oids: bool = None,

101

delimiter: str = None,

102

null: str = None,

103

header: bool = None,

104

quote: str = None,

105

escape: str = None,

106

force_quote: typing.Union[bool, typing.List[str]] = None,

107

encoding: str = None

108

) -> str

109

"""

110

Copy the results of a query to a file or file-like object.

111

112

Parameters:

113

query: SQL query to execute

114

*args: Query parameters

115

output: File path (str) or file-like object to write to

116

timeout: Operation timeout in seconds

117

format: Output format ('text', 'csv', 'binary')

118

oids: Include object IDs

119

delimiter: Field delimiter character

120

null: String representing NULL values

121

header: Include header row (CSV format)

122

quote: Quote character (CSV format)

123

escape: Escape character (CSV format)

124

force_quote: List of columns to always quote

125

encoding: Character encoding

126

127

Returns:

128

COPY command status string

129

"""

130

```

131

132

#### Example Usage

133

134

```python

135

# Export query results

136

with open('/tmp/monthly_report.csv', 'w') as f:

137

await conn.copy_from_query(

138

"""

139

SELECT u.name, u.email, COUNT(o.id) as order_count, SUM(o.total) as total_spent

140

FROM users u

141

LEFT JOIN orders o ON u.id = o.customer_id

142

WHERE o.created_at >= $1 AND o.created_at < $2

143

GROUP BY u.id, u.name, u.email

144

ORDER BY total_spent DESC

145

""",

146

start_date, end_date,

147

output=f,

148

format='csv',

149

header=True

150

)

151

152

# Stream large result set

153

import asyncio

154

155

async def stream_query_results(query, output_file):

156

with open(output_file, 'w') as f:

157

await conn.copy_from_query(

158

query,

159

output=f,

160

format='csv',

161

header=True

162

)

163

164

# Export with JSON aggregation

165

await conn.copy_from_query(

166

"""

167

SELECT customer_id,

168

json_agg(json_build_object('id', id, 'total', total, 'date', created_at)) as orders

169

FROM orders

170

WHERE created_at >= $1

171

GROUP BY customer_id

172

""",

173

datetime.now() - timedelta(days=30),

174

output='/tmp/customer_orders.csv',

175

format='csv'

176

)

177

```

178

179

### Copy To Table

180

181

Import data from files or file-like objects into database tables with comprehensive parsing options.

182

183

```python { .api }

184

async def copy_to_table(

185

self,

186

table_name: str,

187

*,

188

source,

189

columns: typing.List[str] = None,

190

schema_name: str = None,

191

timeout: float = None,

192

format: str = None,

193

oids: bool = None,

194

freeze: bool = None,

195

delimiter: str = None,

196

null: str = None,

197

header: bool = None,

198

quote: str = None,

199

escape: str = None,

200

force_quote: typing.Union[bool, typing.List[str]] = None,

201

force_not_null: typing.List[str] = None,

202

force_null: typing.List[str] = None,

203

encoding: str = None,

204

where: str = None

205

) -> str

206

"""

207

Copy data from a file or file-like object to the specified table.

208

209

Parameters:

210

table_name: Target table name

211

source: File path (str) or file-like object to read from

212

columns: List of target column names

213

schema_name: Schema name (default: current schema)

214

timeout: Operation timeout in seconds

215

format: Input format ('text', 'csv', 'binary')

216

oids: Expect object IDs in input

217

freeze: Freeze imported rows (performance optimization)

218

delimiter: Field delimiter character

219

null: String representing NULL values

220

header: Skip header row (CSV format)

221

quote: Quote character (CSV format)

222

escape: Escape character (CSV format)

223

force_quote: List of columns that are quoted

224

force_not_null: List of columns that should never be NULL

225

force_null: List of columns that should be NULL if empty

226

encoding: Character encoding

227

where: WHERE clause for filtering during import

228

229

Returns:

230

COPY command status string

231

"""

232

```

233

234

#### Example Usage

235

236

```python

237

# Import CSV file

238

with open('/tmp/users.csv', 'r') as f:

239

result = await conn.copy_to_table(

240

'users',

241

source=f,

242

format='csv',

243

header=True,

244

columns=['name', 'email', 'age']

245

)

246

print(result) # "COPY 1000"

247

248

# Import with custom delimiter

249

await conn.copy_to_table(

250

'products',

251

source='/tmp/products.txt',

252

format='text',

253

delimiter='|',

254

null='\\N',

255

columns=['sku', 'name', 'price', 'category']

256

)

257

258

# Import with data transformation using WHERE clause

259

await conn.copy_to_table(

260

'orders',

261

source=data_file,

262

format='csv',

263

header=True,

264

where="total > 0 AND customer_id IS NOT NULL"

265

)

266

267

# Binary import for maximum performance

268

with open('/tmp/users.bin', 'rb') as f:

269

await conn.copy_to_table('users', source=f, format='binary')

270

```

271

272

### Copy Records To Table

273

274

Import Python data structures directly to database tables using optimized binary COPY protocol.

275

276

```python { .api }

277

async def copy_records_to_table(

278

self,

279

table_name: str,

280

*,

281

records,

282

columns: typing.List[str] = None,

283

schema_name: str = None,

284

timeout: float = None,

285

where: str = None

286

) -> str

287

"""

288

Copy a list of records to the specified table using binary COPY.

289

290

Parameters:

291

table_name: Target table name

292

records: Iterable of records (tuples, lists, or dicts)

293

columns: List of target column names

294

schema_name: Schema name (default: current schema)

295

timeout: Operation timeout in seconds

296

where: WHERE clause for filtering during import

297

298

Returns:

299

COPY command status string

300

"""

301

```

302

303

#### Example Usage

304

305

```python

306

# Import list of tuples

307

users = [

308

("Alice", "alice@example.com", 25),

309

("Bob", "bob@example.com", 30),

310

("Charlie", "charlie@example.com", 35)

311

]

312

313

result = await conn.copy_records_to_table(

314

'users',

315

records=users,

316

columns=['name', 'email', 'age']

317

)

318

319

# Import list of dictionaries

320

orders = [

321

{"customer_id": 1, "total": 99.99, "status": "pending"},

322

{"customer_id": 2, "total": 149.50, "status": "shipped"},

323

{"customer_id": 3, "total": 75.25, "status": "delivered"}

324

]

325

326

await conn.copy_records_to_table(

327

'orders',

328

records=orders,

329

columns=['customer_id', 'total', 'status']

330

)

331

332

# Stream large datasets

333

async def generate_records():

334

for i in range(1000000):

335

yield (f"user_{i}", f"user_{i}@example.com", random.randint(18, 80))

336

337

await conn.copy_records_to_table(

338

'users',

339

records=generate_records(),

340

columns=['name', 'email', 'age']

341

)

342

```

343

344

### Streaming COPY Operations

345

346

Handle large datasets efficiently with streaming and memory-conscious processing.

347

348

```python

349

import asyncio

350

from asyncio import StreamWriter, StreamReader

351

352

async def stream_csv_to_table(file_path: str, table_name: str):

353

"""Stream large CSV file to database without loading into memory."""

354

355

with open(file_path, 'r') as f:

356

# Skip header

357

header = f.readline()

358

359

# Process in chunks

360

chunk_size = 10000

361

records = []

362

363

for line_num, line in enumerate(f, 1):

364

# Parse CSV line

365

values = line.strip().split(',')

366

records.append(values)

367

368

# Insert chunk when full

369

if len(records) >= chunk_size:

370

await conn.copy_records_to_table(

371

table_name,

372

records=records,

373

columns=['col1', 'col2', 'col3']

374

)

375

records = []

376

print(f"Processed {line_num} lines")

377

378

# Insert remaining records

379

if records:

380

await conn.copy_records_to_table(

381

table_name,

382

records=records,

383

columns=['col1', 'col2', 'col3']

384

)

385

386

# Async generator for streaming

387

async def async_record_generator():

388

"""Generate records asynchronously for streaming import."""

389

for i in range(1000000):

390

# Simulate async data generation/fetching

391

if i % 1000 == 0:

392

await asyncio.sleep(0.01) # Yield control

393

394

yield {

395

'id': i,

396

'name': f'record_{i}',

397

'timestamp': datetime.now(),

398

'data': json.dumps({'value': random.random()})

399

}

400

401

# Use async generator

402

await conn.copy_records_to_table(

403

'streaming_data',

404

records=async_record_generator(),

405

columns=['id', 'name', 'timestamp', 'data']

406

)

407

```

408

409

### Error Handling

410

411

Handle COPY-specific errors and data validation issues.

412

413

```python

414

try:

415

await conn.copy_to_table(

416

'users',

417

source='/tmp/users.csv',

418

format='csv',

419

header=True

420

)

421

except asyncpg.DataError as e:

422

print(f"Data format error: {e}")

423

# Handle malformed data

424

except asyncpg.UniqueViolationError:

425

print("Duplicate key violation during import")

426

# Handle constraint violations

427

except FileNotFoundError:

428

print("Source file not found")

429

except asyncpg.UndefinedTableError:

430

print("Target table does not exist")

431

432

# Validate data before import

433

def validate_records(records):

434

"""Validate records before COPY operation."""

435

valid_records = []

436

errors = []

437

438

for i, record in enumerate(records):

439

try:

440

# Validate email format

441

if '@' not in record.get('email', ''):

442

raise ValueError("Invalid email format")

443

444

# Validate required fields

445

if not record.get('name'):

446

raise ValueError("Name is required")

447

448

valid_records.append(record)

449

450

except ValueError as e:

451

errors.append(f"Record {i}: {e}")

452

453

return valid_records, errors

454

455

# Use validation

456

records, errors = validate_records(input_records)

457

if errors:

458

print(f"Found {len(errors)} validation errors")

459

for error in errors[:10]: # Show first 10 errors

460

print(f" {error}")

461

462

if records:

463

await conn.copy_records_to_table('users', records=records)

464

```

465

466

### Performance Optimization

467

468

Optimize COPY operations for maximum throughput and efficiency.

469

470

```python

471

# Use binary format for best performance

472

await conn.copy_records_to_table(

473

'large_table',

474

records=records,

475

columns=columns

476

# Binary format is used automatically for copy_records_to_table

477

)

478

479

# Batch processing for memory efficiency

480

async def batch_import(records, table_name, batch_size=10000):

481

"""Import large datasets in batches."""

482

total_imported = 0

483

484

for i in range(0, len(records), batch_size):

485

batch = records[i:i + batch_size]

486

487

result = await conn.copy_records_to_table(

488

table_name,

489

records=batch,

490

columns=['col1', 'col2', 'col3']

491

)

492

493

# Parse result to get row count

494

rows_imported = int(result.split()[-1])

495

total_imported += rows_imported

496

497

print(f"Imported {rows_imported} rows (total: {total_imported})")

498

499

return total_imported

500

501

# Disable autocommit for large imports (use transactions)

502

async with conn.transaction():

503

await conn.copy_records_to_table(

504

'huge_table',

505

records=million_records,

506

columns=columns

507

)

508

# Single commit after all data is imported

509

```

510

511

## Types

512

513

```python { .api }

514

# COPY operations work with various data types:

515

516

# File-like objects

517

import io

518

from typing import Union, TextIO, BinaryIO

519

520

FileSource = Union[str, TextIO, BinaryIO, io.StringIO, io.BytesIO]

521

522

# Record types for copy_records_to_table

523

Record = Union[tuple, list, dict]

524

Records = Union[list[Record], typing.Iterator[Record], typing.AsyncIterator[Record]]

525

```