CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

bulk-operations.mddocs/

Bulk Operations and Data Loading

High-performance bulk data operations including file-based loading, dumping, PostgreSQL COPY command support, and efficient row insertion with upsert capabilities for large-scale data transfer and ETL operations.

Capabilities

Bulk File Loading

Load data from tab-delimited files directly into PostgreSQL tables using efficient bulk operations.

def bulk_load(self, table: str, tmp_file: str) -> None:
    """
    Load tab-delimited file into database table using COPY FROM.
    
    Parameters:
    - table: str, target table name for data loading
    - tmp_file: str, path to tab-delimited file to load
    
    Raises:
    Exception: If file cannot be read or table doesn't exist
    """

Bulk File Dumping

Export table data to tab-delimited files using PostgreSQL's efficient COPY TO operation.

def bulk_dump(self, table: str, tmp_file: str) -> None:
    """
    Dump database table into tab-delimited file using COPY TO.
    
    Parameters:
    - table: str, source table name for data extraction
    - tmp_file: str, path to output file for dumped data
    
    Raises:
    Exception: If table doesn't exist or file cannot be written
    """

PostgreSQL COPY Expert

Execute custom PostgreSQL COPY commands with full control over format and options.

def copy_expert(self, sql: str, filename: str) -> None:
    """
    Execute PostgreSQL COPY command using psycopg2's copy_expert.
    Provides full control over COPY options and format.
    
    Parameters:
    - sql: str, complete COPY SQL statement (COPY ... FROM/TO ...)
    - filename: str, file path for COPY operation
    
    Example SQL:
    "COPY users (id, name, email) FROM STDIN WITH CSV HEADER"
    "COPY (SELECT * FROM sales WHERE date >= '2024-01-01') TO STDOUT WITH CSV"
    """

Row Insertion with Upsert

Insert multiple rows with support for conflict resolution and upsert operations.

def insert_rows(
    self, 
    table, 
    rows, 
    target_fields=None, 
    commit_every: int = 1000, 
    replace: bool = False, 
    **kwargs
):
    """
    Insert rows into table with optional upsert capability.
    
    Parameters:
    - table: str, target table name
    - rows: list, list of tuples/lists containing row data
    - target_fields: list, column names for insertion (None for all columns)
    - commit_every: int, commit after every N rows (default 1000)
    - replace: bool, enable upsert mode using ON CONFLICT
    - **kwargs: additional parameters including replace_index
    
    Upsert Parameters:
    - replace_index: str or list, column(s) to use for conflict detection
    
    Example:
    insert_rows("users", [(1, "john"), (2, "jane")], ["id", "name"], replace=True, replace_index="id")
    """

Usage Examples

Basic Bulk Loading

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_default")

# Load tab-delimited file into table
hook.bulk_load("user_imports", "/data/users.tsv")

# Dump table to file
hook.bulk_dump("export_data", "/output/data_export.tsv")

Custom COPY Operations

# Import CSV with headers
hook.copy_expert(
    "COPY users (id, name, email, created_at) FROM STDIN WITH CSV HEADER",
    "/data/users.csv"
)

# Export query results to CSV
hook.copy_expert(
    "COPY (SELECT u.name, u.email, p.total FROM users u JOIN purchases p ON u.id = p.user_id WHERE p.date >= '2024-01-01') TO STDOUT WITH CSV HEADER",
    "/output/user_purchases.csv"
)

# Import with custom delimiter and null values
hook.copy_expert(
    "COPY products FROM STDIN WITH DELIMITER '|' NULL 'NULL' QUOTE '\"'",
    "/data/products.pipe"
)

Row Insertion

# Simple row insertion
rows = [
    (1, "Alice", "alice@example.com"),
    (2, "Bob", "bob@example.com"),
    (3, "Charlie", "charlie@example.com")
]

hook.insert_rows(
    table="users",
    rows=rows,
    target_fields=["id", "name", "email"],
    commit_every=500
)

Upsert Operations

# Insert with conflict resolution on primary key
hook.insert_rows(
    table="products",
    rows=[
        (1, "Widget", 19.99, "2024-01-01"),
        (2, "Gadget", 29.99, "2024-01-02")
    ],
    target_fields=["id", "name", "price", "updated_at"],
    replace=True,
    replace_index="id"  # Use id column for conflict detection
)

# Upsert with composite key
hook.insert_rows(
    table="user_preferences",
    rows=[
        (1, "theme", "dark"),
        (1, "language", "en"),
        (2, "theme", "light")
    ],
    target_fields=["user_id", "setting_name", "setting_value"],
    replace=True,
    replace_index=["user_id", "setting_name"]  # Composite conflict key
)

Large Dataset Processing

# Process large datasets in batches
large_dataset = load_large_dataset()  # Assume this returns iterator of rows

batch_size = 5000
batch = []

for row in large_dataset:
    batch.append(row)
    
    if len(batch) >= batch_size:
        hook.insert_rows(
            table="large_table",
            rows=batch,
            target_fields=["col1", "col2", "col3"],
            commit_every=1000
        )
        batch = []

# Handle remaining rows
if batch:
    hook.insert_rows(
        table="large_table",
        rows=batch,
        target_fields=["col1", "col2", "col3"]
    )

ETL Pipeline Example

def etl_pipeline():
    hook = PostgresHook(postgres_conn_id="postgres_default")
    
    # Step 1: Extract data to file
    hook.copy_expert(
        "COPY (SELECT * FROM raw_data WHERE processed = false) TO STDOUT WITH CSV HEADER",
        "/tmp/unprocessed_data.csv"
    )
    
    # Step 2: Transform data (external processing)
    transformed_file = transform_data("/tmp/unprocessed_data.csv")
    
    # Step 3: Load transformed data
    hook.copy_expert(
        "COPY processed_data FROM STDIN WITH CSV HEADER",
        transformed_file
    )
    
    # Step 4: Update processed status
    hook.run("UPDATE raw_data SET processed = true WHERE processed = false")

Performance Considerations

COPY vs INSERT Performance

  • COPY Operations: Fastest for large datasets (>1000 rows)
  • Bulk Insert: Good for medium datasets with upsert needs
  • Single Insert: Best for small datasets or real-time updates

Batch Size Optimization

# Optimal batch sizes for different operations
COPY_BATCH_SIZE = 50000      # For file operations
INSERT_BATCH_SIZE = 5000     # For row insertion
COMMIT_INTERVAL = 1000       # For transaction commits

Memory Management

# Process large files in chunks to manage memory
def process_large_file(filename, table_name):
    chunk_size = 10000
    
    with open(filename, 'r') as f:
        while True:
            chunk = list(itertools.islice(f, chunk_size))
            if not chunk:
                break
                
            # Process chunk
            hook.insert_rows(table_name, chunk, commit_every=1000)

File Format Requirements

Tab-Delimited Files (bulk_load/bulk_dump)

  • Delimiter: Tab character (\t)
  • Line Ending: Unix-style (\n) or Windows-style (\r\n)
  • Null Values: Empty fields or \N
  • Escaping: PostgreSQL COPY format escaping

Custom COPY Formats

Support for various formats through copy_expert:

  • CSV: WITH CSV HEADER
  • Binary: WITH BINARY
  • Custom Delimiter: WITH DELIMITER '|'
  • Custom Quote: WITH QUOTE '"'
  • Custom Null: WITH NULL 'NULL'

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json