PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
High-performance bulk data operations including file-based loading, dumping, PostgreSQL COPY command support, and efficient row insertion with upsert capabilities for large-scale data transfer and ETL operations.
Load data from tab-delimited files directly into PostgreSQL tables using efficient bulk operations.
def bulk_load(self, table: str, tmp_file: str) -> None:
"""
Load tab-delimited file into database table using COPY FROM.
Parameters:
- table: str, target table name for data loading
- tmp_file: str, path to tab-delimited file to load
Raises:
Exception: If file cannot be read or table doesn't exist
"""Export table data to tab-delimited files using PostgreSQL's efficient COPY TO operation.
def bulk_dump(self, table: str, tmp_file: str) -> None:
"""
Dump database table into tab-delimited file using COPY TO.
Parameters:
- table: str, source table name for data extraction
- tmp_file: str, path to output file for dumped data
Raises:
Exception: If table doesn't exist or file cannot be written
"""Execute custom PostgreSQL COPY commands with full control over format and options.
def copy_expert(self, sql: str, filename: str) -> None:
"""
Execute PostgreSQL COPY command using psycopg2's copy_expert.
Provides full control over COPY options and format.
Parameters:
- sql: str, complete COPY SQL statement (COPY ... FROM/TO ...)
- filename: str, file path for COPY operation
Example SQL:
"COPY users (id, name, email) FROM STDIN WITH CSV HEADER"
"COPY (SELECT * FROM sales WHERE date >= '2024-01-01') TO STDOUT WITH CSV"
"""Insert multiple rows with support for conflict resolution and upsert operations.
def insert_rows(
self,
table,
rows,
target_fields=None,
commit_every: int = 1000,
replace: bool = False,
**kwargs
):
"""
Insert rows into table with optional upsert capability.
Parameters:
- table: str, target table name
- rows: list, list of tuples/lists containing row data
- target_fields: list, column names for insertion (None for all columns)
- commit_every: int, commit after every N rows (default 1000)
- replace: bool, enable upsert mode using ON CONFLICT
- **kwargs: additional parameters including replace_index
Upsert Parameters:
- replace_index: str or list, column(s) to use for conflict detection
Example:
insert_rows("users", [(1, "john"), (2, "jane")], ["id", "name"], replace=True, replace_index="id")
"""from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_default")
# Load tab-delimited file into table
hook.bulk_load("user_imports", "/data/users.tsv")
# Dump table to file
hook.bulk_dump("export_data", "/output/data_export.tsv")# Import CSV with headers
hook.copy_expert(
"COPY users (id, name, email, created_at) FROM STDIN WITH CSV HEADER",
"/data/users.csv"
)
# Export query results to CSV
hook.copy_expert(
"COPY (SELECT u.name, u.email, p.total FROM users u JOIN purchases p ON u.id = p.user_id WHERE p.date >= '2024-01-01') TO STDOUT WITH CSV HEADER",
"/output/user_purchases.csv"
)
# Import with custom delimiter and null values
hook.copy_expert(
"COPY products FROM STDIN WITH DELIMITER '|' NULL 'NULL' QUOTE '\"'",
"/data/products.pipe"
)# Simple row insertion
rows = [
(1, "Alice", "alice@example.com"),
(2, "Bob", "bob@example.com"),
(3, "Charlie", "charlie@example.com")
]
hook.insert_rows(
table="users",
rows=rows,
target_fields=["id", "name", "email"],
commit_every=500
)# Insert with conflict resolution on primary key
hook.insert_rows(
table="products",
rows=[
(1, "Widget", 19.99, "2024-01-01"),
(2, "Gadget", 29.99, "2024-01-02")
],
target_fields=["id", "name", "price", "updated_at"],
replace=True,
replace_index="id" # Use id column for conflict detection
)
# Upsert with composite key
hook.insert_rows(
table="user_preferences",
rows=[
(1, "theme", "dark"),
(1, "language", "en"),
(2, "theme", "light")
],
target_fields=["user_id", "setting_name", "setting_value"],
replace=True,
replace_index=["user_id", "setting_name"] # Composite conflict key
)# Process large datasets in batches
large_dataset = load_large_dataset() # Assume this returns iterator of rows
batch_size = 5000
batch = []
for row in large_dataset:
batch.append(row)
if len(batch) >= batch_size:
hook.insert_rows(
table="large_table",
rows=batch,
target_fields=["col1", "col2", "col3"],
commit_every=1000
)
batch = []
# Handle remaining rows
if batch:
hook.insert_rows(
table="large_table",
rows=batch,
target_fields=["col1", "col2", "col3"]
)def etl_pipeline():
hook = PostgresHook(postgres_conn_id="postgres_default")
# Step 1: Extract data to file
hook.copy_expert(
"COPY (SELECT * FROM raw_data WHERE processed = false) TO STDOUT WITH CSV HEADER",
"/tmp/unprocessed_data.csv"
)
# Step 2: Transform data (external processing)
transformed_file = transform_data("/tmp/unprocessed_data.csv")
# Step 3: Load transformed data
hook.copy_expert(
"COPY processed_data FROM STDIN WITH CSV HEADER",
transformed_file
)
# Step 4: Update processed status
hook.run("UPDATE raw_data SET processed = true WHERE processed = false")# Optimal batch sizes for different operations
COPY_BATCH_SIZE = 50000 # For file operations
INSERT_BATCH_SIZE = 5000 # For row insertion
COMMIT_INTERVAL = 1000 # For transaction commits# Process large files in chunks to manage memory
def process_large_file(filename, table_name):
chunk_size = 10000
with open(filename, 'r') as f:
while True:
chunk = list(itertools.islice(f, chunk_size))
if not chunk:
break
# Process chunk
hook.insert_rows(table_name, chunk, commit_every=1000)\t)\n) or Windows-style (\r\n)\NSupport for various formats through copy_expert:
WITH CSV HEADERWITH BINARYWITH DELIMITER '|'WITH QUOTE '"'WITH NULL 'NULL'Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres