0
# Bulk Operations and Data Loading
1
2
High-performance bulk data operations including file-based loading, dumping, PostgreSQL COPY command support, and efficient row insertion with upsert capabilities for large-scale data transfer and ETL operations.
3
4
## Capabilities
5
6
### Bulk File Loading
7
8
Load data from tab-delimited files directly into PostgreSQL tables using efficient bulk operations.
9
10
```python { .api }
11
def bulk_load(self, table: str, tmp_file: str) -> None:
12
"""
13
Load tab-delimited file into database table using COPY FROM.
14
15
Parameters:
16
- table: str, target table name for data loading
17
- tmp_file: str, path to tab-delimited file to load
18
19
Raises:
20
Exception: If file cannot be read or table doesn't exist
21
"""
22
```
23
24
### Bulk File Dumping
25
26
Export table data to tab-delimited files using PostgreSQL's efficient COPY TO operation.
27
28
```python { .api }
29
def bulk_dump(self, table: str, tmp_file: str) -> None:
30
"""
31
Dump database table into tab-delimited file using COPY TO.
32
33
Parameters:
34
- table: str, source table name for data extraction
35
- tmp_file: str, path to output file for dumped data
36
37
Raises:
38
Exception: If table doesn't exist or file cannot be written
39
"""
40
```
41
42
### PostgreSQL COPY Expert
43
44
Execute custom PostgreSQL COPY commands with full control over format and options.
45
46
```python { .api }
47
def copy_expert(self, sql: str, filename: str) -> None:
48
"""
49
Execute PostgreSQL COPY command using psycopg2's copy_expert.
50
Provides full control over COPY options and format.
51
52
Parameters:
53
- sql: str, complete COPY SQL statement (COPY ... FROM/TO ...)
54
- filename: str, file path for COPY operation
55
56
Example SQL:
57
"COPY users (id, name, email) FROM STDIN WITH CSV HEADER"
58
"COPY (SELECT * FROM sales WHERE date >= '2024-01-01') TO STDOUT WITH CSV"
59
"""
60
```
61
62
### Row Insertion with Upsert
63
64
Insert multiple rows with support for conflict resolution and upsert operations.
65
66
```python { .api }
67
def insert_rows(
68
self,
69
table,
70
rows,
71
target_fields=None,
72
commit_every: int = 1000,
73
replace: bool = False,
74
**kwargs
75
):
76
"""
77
Insert rows into table with optional upsert capability.
78
79
Parameters:
80
- table: str, target table name
81
- rows: list, list of tuples/lists containing row data
82
- target_fields: list, column names for insertion (None for all columns)
83
- commit_every: int, commit after every N rows (default 1000)
84
- replace: bool, enable upsert mode using ON CONFLICT
85
- **kwargs: additional parameters including replace_index
86
87
Upsert Parameters:
88
- replace_index: str or list, column(s) to use for conflict detection
89
90
Example:
91
insert_rows("users", [(1, "john"), (2, "jane")], ["id", "name"], replace=True, replace_index="id")
92
"""
93
```
94
95
## Usage Examples
96
97
### Basic Bulk Loading
98
99
```python
100
from airflow.providers.postgres.hooks.postgres import PostgresHook
101
102
hook = PostgresHook(postgres_conn_id="postgres_default")
103
104
# Load tab-delimited file into table
105
hook.bulk_load("user_imports", "/data/users.tsv")
106
107
# Dump table to file
108
hook.bulk_dump("export_data", "/output/data_export.tsv")
109
```
110
111
### Custom COPY Operations
112
113
```python
114
# Import CSV with headers
115
hook.copy_expert(
116
"COPY users (id, name, email, created_at) FROM STDIN WITH CSV HEADER",
117
"/data/users.csv"
118
)
119
120
# Export query results to CSV
121
hook.copy_expert(
122
"COPY (SELECT u.name, u.email, p.total FROM users u JOIN purchases p ON u.id = p.user_id WHERE p.date >= '2024-01-01') TO STDOUT WITH CSV HEADER",
123
"/output/user_purchases.csv"
124
)
125
126
# Import with custom delimiter and null values
127
hook.copy_expert(
128
"COPY products FROM STDIN WITH DELIMITER '|' NULL 'NULL' QUOTE '\"'",
129
"/data/products.pipe"
130
)
131
```
132
133
### Row Insertion
134
135
```python
136
# Simple row insertion
137
rows = [
138
(1, "Alice", "alice@example.com"),
139
(2, "Bob", "bob@example.com"),
140
(3, "Charlie", "charlie@example.com")
141
]
142
143
hook.insert_rows(
144
table="users",
145
rows=rows,
146
target_fields=["id", "name", "email"],
147
commit_every=500
148
)
149
```
150
151
### Upsert Operations
152
153
```python
154
# Insert with conflict resolution on primary key
155
hook.insert_rows(
156
table="products",
157
rows=[
158
(1, "Widget", 19.99, "2024-01-01"),
159
(2, "Gadget", 29.99, "2024-01-02")
160
],
161
target_fields=["id", "name", "price", "updated_at"],
162
replace=True,
163
replace_index="id" # Use id column for conflict detection
164
)
165
166
# Upsert with composite key
167
hook.insert_rows(
168
table="user_preferences",
169
rows=[
170
(1, "theme", "dark"),
171
(1, "language", "en"),
172
(2, "theme", "light")
173
],
174
target_fields=["user_id", "setting_name", "setting_value"],
175
replace=True,
176
replace_index=["user_id", "setting_name"] # Composite conflict key
177
)
178
```
179
180
### Large Dataset Processing
181
182
```python
183
# Process large datasets in batches
184
large_dataset = load_large_dataset() # Assume this returns iterator of rows
185
186
batch_size = 5000
187
batch = []
188
189
for row in large_dataset:
190
batch.append(row)
191
192
if len(batch) >= batch_size:
193
hook.insert_rows(
194
table="large_table",
195
rows=batch,
196
target_fields=["col1", "col2", "col3"],
197
commit_every=1000
198
)
199
batch = []
200
201
# Handle remaining rows
202
if batch:
203
hook.insert_rows(
204
table="large_table",
205
rows=batch,
206
target_fields=["col1", "col2", "col3"]
207
)
208
```
209
210
### ETL Pipeline Example
211
212
```python
213
def etl_pipeline():
214
hook = PostgresHook(postgres_conn_id="postgres_default")
215
216
# Step 1: Extract data to file
217
hook.copy_expert(
218
"COPY (SELECT * FROM raw_data WHERE processed = false) TO STDOUT WITH CSV HEADER",
219
"/tmp/unprocessed_data.csv"
220
)
221
222
# Step 2: Transform data (external processing)
223
transformed_file = transform_data("/tmp/unprocessed_data.csv")
224
225
# Step 3: Load transformed data
226
hook.copy_expert(
227
"COPY processed_data FROM STDIN WITH CSV HEADER",
228
transformed_file
229
)
230
231
# Step 4: Update processed status
232
hook.run("UPDATE raw_data SET processed = true WHERE processed = false")
233
```
234
235
## Performance Considerations
236
237
### COPY vs INSERT Performance
238
239
- **COPY Operations**: Fastest for large datasets (>1000 rows)
240
- **Bulk Insert**: Good for medium datasets with upsert needs
241
- **Single Insert**: Best for small datasets or real-time updates
242
243
### Batch Size Optimization
244
245
```python
246
# Optimal batch sizes for different operations
247
COPY_BATCH_SIZE = 50000 # For file operations
248
INSERT_BATCH_SIZE = 5000 # For row insertion
249
COMMIT_INTERVAL = 1000 # For transaction commits
250
```
251
252
### Memory Management
253
254
```python
255
# Process large files in chunks to manage memory
256
def process_large_file(filename, table_name):
257
chunk_size = 10000
258
259
with open(filename, 'r') as f:
260
while True:
261
chunk = list(itertools.islice(f, chunk_size))
262
if not chunk:
263
break
264
265
# Process chunk
266
hook.insert_rows(table_name, chunk, commit_every=1000)
267
```
268
269
## File Format Requirements
270
271
### Tab-Delimited Files (bulk_load/bulk_dump)
272
273
- **Delimiter**: Tab character (`\t`)
274
- **Line Ending**: Unix-style (`\n`) or Windows-style (`\r\n`)
275
- **Null Values**: Empty fields or `\N`
276
- **Escaping**: PostgreSQL COPY format escaping
277
278
### Custom COPY Formats
279
280
Support for various formats through copy_expert:
281
282
- **CSV**: `WITH CSV HEADER`
283
- **Binary**: `WITH BINARY`
284
- **Custom Delimiter**: `WITH DELIMITER '|'`
285
- **Custom Quote**: `WITH QUOTE '"'`
286
- **Custom Null**: `WITH NULL 'NULL'`