or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

bulk-operations.mddocs/

0

# Bulk Operations and Data Loading

1

2

High-performance bulk data operations including file-based loading, dumping, PostgreSQL COPY command support, and efficient row insertion with upsert capabilities for large-scale data transfer and ETL operations.

3

4

## Capabilities

5

6

### Bulk File Loading

7

8

Load data from tab-delimited files directly into PostgreSQL tables using efficient bulk operations.

9

10

```python { .api }

11

def bulk_load(self, table: str, tmp_file: str) -> None:

12

"""

13

Load tab-delimited file into database table using COPY FROM.

14

15

Parameters:

16

- table: str, target table name for data loading

17

- tmp_file: str, path to tab-delimited file to load

18

19

Raises:

20

Exception: If file cannot be read or table doesn't exist

21

"""

22

```

23

24

### Bulk File Dumping

25

26

Export table data to tab-delimited files using PostgreSQL's efficient COPY TO operation.

27

28

```python { .api }

29

def bulk_dump(self, table: str, tmp_file: str) -> None:

30

"""

31

Dump database table into tab-delimited file using COPY TO.

32

33

Parameters:

34

- table: str, source table name for data extraction

35

- tmp_file: str, path to output file for dumped data

36

37

Raises:

38

Exception: If table doesn't exist or file cannot be written

39

"""

40

```

41

42

### PostgreSQL COPY Expert

43

44

Execute custom PostgreSQL COPY commands with full control over format and options.

45

46

```python { .api }

47

def copy_expert(self, sql: str, filename: str) -> None:

48

"""

49

Execute PostgreSQL COPY command using psycopg2's copy_expert.

50

Provides full control over COPY options and format.

51

52

Parameters:

53

- sql: str, complete COPY SQL statement (COPY ... FROM/TO ...)

54

- filename: str, file path for COPY operation

55

56

Example SQL:

57

"COPY users (id, name, email) FROM STDIN WITH CSV HEADER"

58

"COPY (SELECT * FROM sales WHERE date >= '2024-01-01') TO STDOUT WITH CSV"

59

"""

60

```

61

62

### Row Insertion with Upsert

63

64

Insert multiple rows with support for conflict resolution and upsert operations.

65

66

```python { .api }

67

def insert_rows(

68

self,

69

table,

70

rows,

71

target_fields=None,

72

commit_every: int = 1000,

73

replace: bool = False,

74

**kwargs

75

):

76

"""

77

Insert rows into table with optional upsert capability.

78

79

Parameters:

80

- table: str, target table name

81

- rows: list, list of tuples/lists containing row data

82

- target_fields: list, column names for insertion (None for all columns)

83

- commit_every: int, commit after every N rows (default 1000)

84

- replace: bool, enable upsert mode using ON CONFLICT

85

- **kwargs: additional parameters including replace_index

86

87

Upsert Parameters:

88

- replace_index: str or list, column(s) to use for conflict detection

89

90

Example:

91

insert_rows("users", [(1, "john"), (2, "jane")], ["id", "name"], replace=True, replace_index="id")

92

"""

93

```

94

95

## Usage Examples

96

97

### Basic Bulk Loading

98

99

```python

100

from airflow.providers.postgres.hooks.postgres import PostgresHook

101

102

hook = PostgresHook(postgres_conn_id="postgres_default")

103

104

# Load tab-delimited file into table

105

hook.bulk_load("user_imports", "/data/users.tsv")

106

107

# Dump table to file

108

hook.bulk_dump("export_data", "/output/data_export.tsv")

109

```

110

111

### Custom COPY Operations

112

113

```python

114

# Import CSV with headers

115

hook.copy_expert(

116

"COPY users (id, name, email, created_at) FROM STDIN WITH CSV HEADER",

117

"/data/users.csv"

118

)

119

120

# Export query results to CSV

121

hook.copy_expert(

122

"COPY (SELECT u.name, u.email, p.total FROM users u JOIN purchases p ON u.id = p.user_id WHERE p.date >= '2024-01-01') TO STDOUT WITH CSV HEADER",

123

"/output/user_purchases.csv"

124

)

125

126

# Import with custom delimiter and null values

127

hook.copy_expert(

128

"COPY products FROM STDIN WITH DELIMITER '|' NULL 'NULL' QUOTE '\"'",

129

"/data/products.pipe"

130

)

131

```

132

133

### Row Insertion

134

135

```python

136

# Simple row insertion

137

rows = [

138

(1, "Alice", "alice@example.com"),

139

(2, "Bob", "bob@example.com"),

140

(3, "Charlie", "charlie@example.com")

141

]

142

143

hook.insert_rows(

144

table="users",

145

rows=rows,

146

target_fields=["id", "name", "email"],

147

commit_every=500

148

)

149

```

150

151

### Upsert Operations

152

153

```python

154

# Insert with conflict resolution on primary key

155

hook.insert_rows(

156

table="products",

157

rows=[

158

(1, "Widget", 19.99, "2024-01-01"),

159

(2, "Gadget", 29.99, "2024-01-02")

160

],

161

target_fields=["id", "name", "price", "updated_at"],

162

replace=True,

163

replace_index="id" # Use id column for conflict detection

164

)

165

166

# Upsert with composite key

167

hook.insert_rows(

168

table="user_preferences",

169

rows=[

170

(1, "theme", "dark"),

171

(1, "language", "en"),

172

(2, "theme", "light")

173

],

174

target_fields=["user_id", "setting_name", "setting_value"],

175

replace=True,

176

replace_index=["user_id", "setting_name"] # Composite conflict key

177

)

178

```

179

180

### Large Dataset Processing

181

182

```python

183

# Process large datasets in batches

184

large_dataset = load_large_dataset() # Assume this returns iterator of rows

185

186

batch_size = 5000

187

batch = []

188

189

for row in large_dataset:

190

batch.append(row)

191

192

if len(batch) >= batch_size:

193

hook.insert_rows(

194

table="large_table",

195

rows=batch,

196

target_fields=["col1", "col2", "col3"],

197

commit_every=1000

198

)

199

batch = []

200

201

# Handle remaining rows

202

if batch:

203

hook.insert_rows(

204

table="large_table",

205

rows=batch,

206

target_fields=["col1", "col2", "col3"]

207

)

208

```

209

210

### ETL Pipeline Example

211

212

```python

213

def etl_pipeline():

214

hook = PostgresHook(postgres_conn_id="postgres_default")

215

216

# Step 1: Extract data to file

217

hook.copy_expert(

218

"COPY (SELECT * FROM raw_data WHERE processed = false) TO STDOUT WITH CSV HEADER",

219

"/tmp/unprocessed_data.csv"

220

)

221

222

# Step 2: Transform data (external processing)

223

transformed_file = transform_data("/tmp/unprocessed_data.csv")

224

225

# Step 3: Load transformed data

226

hook.copy_expert(

227

"COPY processed_data FROM STDIN WITH CSV HEADER",

228

transformed_file

229

)

230

231

# Step 4: Update processed status

232

hook.run("UPDATE raw_data SET processed = true WHERE processed = false")

233

```

234

235

## Performance Considerations

236

237

### COPY vs INSERT Performance

238

239

- **COPY Operations**: Fastest for large datasets (>1000 rows)

240

- **Bulk Insert**: Good for medium datasets with upsert needs

241

- **Single Insert**: Best for small datasets or real-time updates

242

243

### Batch Size Optimization

244

245

```python

246

# Optimal batch sizes for different operations

247

COPY_BATCH_SIZE = 50000 # For file operations

248

INSERT_BATCH_SIZE = 5000 # For row insertion

249

COMMIT_INTERVAL = 1000 # For transaction commits

250

```

251

252

### Memory Management

253

254

```python

255

# Process large files in chunks to manage memory

256

def process_large_file(filename, table_name):

257

chunk_size = 10000

258

259

with open(filename, 'r') as f:

260

while True:

261

chunk = list(itertools.islice(f, chunk_size))

262

if not chunk:

263

break

264

265

# Process chunk

266

hook.insert_rows(table_name, chunk, commit_every=1000)

267

```

268

269

## File Format Requirements

270

271

### Tab-Delimited Files (bulk_load/bulk_dump)

272

273

- **Delimiter**: Tab character (`\t`)

274

- **Line Ending**: Unix-style (`\n`) or Windows-style (`\r\n`)

275

- **Null Values**: Empty fields or `\N`

276

- **Escaping**: PostgreSQL COPY format escaping

277

278

### Custom COPY Formats

279

280

Support for various formats through copy_expert:

281

282

- **CSV**: `WITH CSV HEADER`

283

- **Binary**: `WITH BINARY`

284

- **Custom Delimiter**: `WITH DELIMITER '|'`

285

- **Custom Quote**: `WITH QUOTE '"'`

286

- **Custom Null**: `WITH NULL 'NULL'`