0
# COPY Operations
1
2
High-performance bulk data import/export using PostgreSQL's native COPY protocol with support for various formats, streaming, custom delimiters, and efficient handling of large datasets.
3
4
## Capabilities
5
6
### Copy From Table
7
8
Export table data to files or file-like objects with comprehensive formatting options.
9
10
```python { .api }
11
async def copy_from_table(
12
self,
13
table_name: str,
14
*,
15
output,
16
columns: typing.List[str] = None,
17
schema_name: str = None,
18
timeout: float = None,
19
format: str = None,
20
oids: bool = None,
21
delimiter: str = None,
22
null: str = None,
23
header: bool = None,
24
quote: str = None,
25
escape: str = None,
26
force_quote: typing.Union[bool, typing.List[str]] = None,
27
encoding: str = None
28
) -> str
29
"""
30
Copy table contents to a file or file-like object.
31
32
Parameters:
33
table_name: Name of the table to copy from
34
output: File path (str) or file-like object to write to
35
columns: List of column names to copy (default: all columns)
36
schema_name: Schema name (default: current schema)
37
timeout: Operation timeout in seconds
38
format: Output format ('text', 'csv', 'binary')
39
oids: Include object IDs
40
delimiter: Field delimiter character
41
null: String representing NULL values
42
header: Include header row (CSV format)
43
quote: Quote character (CSV format)
44
escape: Escape character (CSV format)
45
force_quote: List of columns to always quote
46
encoding: Character encoding
47
48
Returns:
49
COPY command status string
50
"""
51
```
52
53
#### Example Usage
54
55
```python
56
# Copy table to file
57
with open('/tmp/users.csv', 'w') as f:
58
await conn.copy_from_table('users', output=f, format='csv', header=True)
59
60
# Copy specific columns
61
await conn.copy_from_table(
62
'orders',
63
output='/tmp/orders.csv',
64
columns=['id', 'customer_id', 'total', 'created_at'],
65
format='csv',
66
header=True
67
)
68
69
# Copy with custom formatting
70
import io
71
buffer = io.StringIO()
72
await conn.copy_from_table(
73
'products',
74
output=buffer,
75
format='csv',
76
delimiter='|',
77
null='NULL',
78
quote='"',
79
force_quote=['name', 'description']
80
)
81
csv_data = buffer.getvalue()
82
83
# Binary format for maximum performance
84
with open('/tmp/users.bin', 'wb') as f:
85
await conn.copy_from_table('users', output=f, format='binary')
86
```
87
88
### Copy From Query
89
90
Export query results to files or file-like objects, enabling complex data transformations during export.
91
92
```python { .api }
93
async def copy_from_query(
94
self,
95
query: str,
96
*args,
97
output,
98
timeout: float = None,
99
format: str = None,
100
oids: bool = None,
101
delimiter: str = None,
102
null: str = None,
103
header: bool = None,
104
quote: str = None,
105
escape: str = None,
106
force_quote: typing.Union[bool, typing.List[str]] = None,
107
encoding: str = None
108
) -> str
109
"""
110
Copy the results of a query to a file or file-like object.
111
112
Parameters:
113
query: SQL query to execute
114
*args: Query parameters
115
output: File path (str) or file-like object to write to
116
timeout: Operation timeout in seconds
117
format: Output format ('text', 'csv', 'binary')
118
oids: Include object IDs
119
delimiter: Field delimiter character
120
null: String representing NULL values
121
header: Include header row (CSV format)
122
quote: Quote character (CSV format)
123
escape: Escape character (CSV format)
124
force_quote: List of columns to always quote
125
encoding: Character encoding
126
127
Returns:
128
COPY command status string
129
"""
130
```
131
132
#### Example Usage
133
134
```python
135
# Export query results
136
with open('/tmp/monthly_report.csv', 'w') as f:
137
await conn.copy_from_query(
138
"""
139
SELECT u.name, u.email, COUNT(o.id) as order_count, SUM(o.total) as total_spent
140
FROM users u
141
LEFT JOIN orders o ON u.id = o.customer_id
142
WHERE o.created_at >= $1 AND o.created_at < $2
143
GROUP BY u.id, u.name, u.email
144
ORDER BY total_spent DESC
145
""",
146
start_date, end_date,
147
output=f,
148
format='csv',
149
header=True
150
)
151
152
# Stream large result set
153
import asyncio
154
155
async def stream_query_results(query, output_file):
156
with open(output_file, 'w') as f:
157
await conn.copy_from_query(
158
query,
159
output=f,
160
format='csv',
161
header=True
162
)
163
164
# Export with JSON aggregation
165
await conn.copy_from_query(
166
"""
167
SELECT customer_id,
168
json_agg(json_build_object('id', id, 'total', total, 'date', created_at)) as orders
169
FROM orders
170
WHERE created_at >= $1
171
GROUP BY customer_id
172
""",
173
datetime.now() - timedelta(days=30),
174
output='/tmp/customer_orders.csv',
175
format='csv'
176
)
177
```
178
179
### Copy To Table
180
181
Import data from files or file-like objects into database tables with comprehensive parsing options.
182
183
```python { .api }
184
async def copy_to_table(
185
self,
186
table_name: str,
187
*,
188
source,
189
columns: typing.List[str] = None,
190
schema_name: str = None,
191
timeout: float = None,
192
format: str = None,
193
oids: bool = None,
194
freeze: bool = None,
195
delimiter: str = None,
196
null: str = None,
197
header: bool = None,
198
quote: str = None,
199
escape: str = None,
200
force_quote: typing.Union[bool, typing.List[str]] = None,
201
force_not_null: typing.List[str] = None,
202
force_null: typing.List[str] = None,
203
encoding: str = None,
204
where: str = None
205
) -> str
206
"""
207
Copy data from a file or file-like object to the specified table.
208
209
Parameters:
210
table_name: Target table name
211
source: File path (str) or file-like object to read from
212
columns: List of target column names
213
schema_name: Schema name (default: current schema)
214
timeout: Operation timeout in seconds
215
format: Input format ('text', 'csv', 'binary')
216
oids: Expect object IDs in input
217
freeze: Freeze imported rows (performance optimization)
218
delimiter: Field delimiter character
219
null: String representing NULL values
220
header: Skip header row (CSV format)
221
quote: Quote character (CSV format)
222
escape: Escape character (CSV format)
223
force_quote: List of columns that are quoted
224
force_not_null: List of columns that should never be NULL
225
force_null: List of columns that should be NULL if empty
226
encoding: Character encoding
227
where: WHERE clause for filtering during import
228
229
Returns:
230
COPY command status string
231
"""
232
```
233
234
#### Example Usage
235
236
```python
237
# Import CSV file
238
with open('/tmp/users.csv', 'r') as f:
239
result = await conn.copy_to_table(
240
'users',
241
source=f,
242
format='csv',
243
header=True,
244
columns=['name', 'email', 'age']
245
)
246
print(result) # "COPY 1000"
247
248
# Import with custom delimiter
249
await conn.copy_to_table(
250
'products',
251
source='/tmp/products.txt',
252
format='text',
253
delimiter='|',
254
null='\\N',
255
columns=['sku', 'name', 'price', 'category']
256
)
257
258
# Import with data transformation using WHERE clause
259
await conn.copy_to_table(
260
'orders',
261
source=data_file,
262
format='csv',
263
header=True,
264
where="total > 0 AND customer_id IS NOT NULL"
265
)
266
267
# Binary import for maximum performance
268
with open('/tmp/users.bin', 'rb') as f:
269
await conn.copy_to_table('users', source=f, format='binary')
270
```
271
272
### Copy Records To Table
273
274
Import Python data structures directly to database tables using optimized binary COPY protocol.
275
276
```python { .api }
277
async def copy_records_to_table(
278
self,
279
table_name: str,
280
*,
281
records,
282
columns: typing.List[str] = None,
283
schema_name: str = None,
284
timeout: float = None,
285
where: str = None
286
) -> str
287
"""
288
Copy a list of records to the specified table using binary COPY.
289
290
Parameters:
291
table_name: Target table name
292
records: Iterable of records (tuples, lists, or dicts)
293
columns: List of target column names
294
schema_name: Schema name (default: current schema)
295
timeout: Operation timeout in seconds
296
where: WHERE clause for filtering during import
297
298
Returns:
299
COPY command status string
300
"""
301
```
302
303
#### Example Usage
304
305
```python
306
# Import list of tuples
307
users = [
308
("Alice", "alice@example.com", 25),
309
("Bob", "bob@example.com", 30),
310
("Charlie", "charlie@example.com", 35)
311
]
312
313
result = await conn.copy_records_to_table(
314
'users',
315
records=users,
316
columns=['name', 'email', 'age']
317
)
318
319
# Import list of dictionaries
320
orders = [
321
{"customer_id": 1, "total": 99.99, "status": "pending"},
322
{"customer_id": 2, "total": 149.50, "status": "shipped"},
323
{"customer_id": 3, "total": 75.25, "status": "delivered"}
324
]
325
326
await conn.copy_records_to_table(
327
'orders',
328
records=orders,
329
columns=['customer_id', 'total', 'status']
330
)
331
332
# Stream large datasets
333
async def generate_records():
334
for i in range(1000000):
335
yield (f"user_{i}", f"user_{i}@example.com", random.randint(18, 80))
336
337
await conn.copy_records_to_table(
338
'users',
339
records=generate_records(),
340
columns=['name', 'email', 'age']
341
)
342
```
343
344
### Streaming COPY Operations
345
346
Handle large datasets efficiently with streaming and memory-conscious processing.
347
348
```python
349
import asyncio
350
from asyncio import StreamWriter, StreamReader
351
352
async def stream_csv_to_table(file_path: str, table_name: str):
353
"""Stream large CSV file to database without loading into memory."""
354
355
with open(file_path, 'r') as f:
356
# Skip header
357
header = f.readline()
358
359
# Process in chunks
360
chunk_size = 10000
361
records = []
362
363
for line_num, line in enumerate(f, 1):
364
# Parse CSV line
365
values = line.strip().split(',')
366
records.append(values)
367
368
# Insert chunk when full
369
if len(records) >= chunk_size:
370
await conn.copy_records_to_table(
371
table_name,
372
records=records,
373
columns=['col1', 'col2', 'col3']
374
)
375
records = []
376
print(f"Processed {line_num} lines")
377
378
# Insert remaining records
379
if records:
380
await conn.copy_records_to_table(
381
table_name,
382
records=records,
383
columns=['col1', 'col2', 'col3']
384
)
385
386
# Async generator for streaming
387
async def async_record_generator():
388
"""Generate records asynchronously for streaming import."""
389
for i in range(1000000):
390
# Simulate async data generation/fetching
391
if i % 1000 == 0:
392
await asyncio.sleep(0.01) # Yield control
393
394
yield {
395
'id': i,
396
'name': f'record_{i}',
397
'timestamp': datetime.now(),
398
'data': json.dumps({'value': random.random()})
399
}
400
401
# Use async generator
402
await conn.copy_records_to_table(
403
'streaming_data',
404
records=async_record_generator(),
405
columns=['id', 'name', 'timestamp', 'data']
406
)
407
```
408
409
### Error Handling
410
411
Handle COPY-specific errors and data validation issues.
412
413
```python
414
try:
415
await conn.copy_to_table(
416
'users',
417
source='/tmp/users.csv',
418
format='csv',
419
header=True
420
)
421
except asyncpg.DataError as e:
422
print(f"Data format error: {e}")
423
# Handle malformed data
424
except asyncpg.UniqueViolationError:
425
print("Duplicate key violation during import")
426
# Handle constraint violations
427
except FileNotFoundError:
428
print("Source file not found")
429
except asyncpg.UndefinedTableError:
430
print("Target table does not exist")
431
432
# Validate data before import
433
def validate_records(records):
434
"""Validate records before COPY operation."""
435
valid_records = []
436
errors = []
437
438
for i, record in enumerate(records):
439
try:
440
# Validate email format
441
if '@' not in record.get('email', ''):
442
raise ValueError("Invalid email format")
443
444
# Validate required fields
445
if not record.get('name'):
446
raise ValueError("Name is required")
447
448
valid_records.append(record)
449
450
except ValueError as e:
451
errors.append(f"Record {i}: {e}")
452
453
return valid_records, errors
454
455
# Use validation
456
records, errors = validate_records(input_records)
457
if errors:
458
print(f"Found {len(errors)} validation errors")
459
for error in errors[:10]: # Show first 10 errors
460
print(f" {error}")
461
462
if records:
463
await conn.copy_records_to_table('users', records=records)
464
```
465
466
### Performance Optimization
467
468
Optimize COPY operations for maximum throughput and efficiency.
469
470
```python
471
# Use binary format for best performance
472
await conn.copy_records_to_table(
473
'large_table',
474
records=records,
475
columns=columns
476
# Binary format is used automatically for copy_records_to_table
477
)
478
479
# Batch processing for memory efficiency
480
async def batch_import(records, table_name, batch_size=10000):
481
"""Import large datasets in batches."""
482
total_imported = 0
483
484
for i in range(0, len(records), batch_size):
485
batch = records[i:i + batch_size]
486
487
result = await conn.copy_records_to_table(
488
table_name,
489
records=batch,
490
columns=['col1', 'col2', 'col3']
491
)
492
493
# Parse result to get row count
494
rows_imported = int(result.split()[-1])
495
total_imported += rows_imported
496
497
print(f"Imported {rows_imported} rows (total: {total_imported})")
498
499
return total_imported
500
501
# Disable autocommit for large imports (use transactions)
502
async with conn.transaction():
503
await conn.copy_records_to_table(
504
'huge_table',
505
records=million_records,
506
columns=columns
507
)
508
# Single commit after all data is imported
509
```
510
511
## Types
512
513
```python { .api }
514
# COPY operations work with various data types:
515
516
# File-like objects
517
import io
518
from typing import Union, TextIO, BinaryIO
519
520
FileSource = Union[str, TextIO, BinaryIO, io.StringIO, io.BytesIO]
521
522
# Record types for copy_records_to_table
523
Record = Union[tuple, list, dict]
524
Records = Union[list[Record], typing.Iterator[Record], typing.AsyncIterator[Record]]
525
```