CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

database-connection.mddocs/

Database Connection and Query Execution

Core database connectivity, query execution, and transaction management functionality for PostgreSQL databases through Apache Airflow. Provides connection management, SSL support, custom cursor types, and comprehensive query execution capabilities.

Capabilities

PostgresHook Class

Main database hook class that extends DbApiHook with PostgreSQL-specific functionality.

class PostgresHook(DbApiHook):
    """
    Interact with Postgres.

    Parameters:
    - postgres_conn_id: str, connection ID reference to PostgreSQL database
    - options: str | None, command-line options to send to server at connection start
    - enable_log_db_messages: bool, enable logging of database messages sent to client
    """
    
    # Class attributes
    conn_name_attr = "postgres_conn_id"
    default_conn_name = "postgres_default"
    conn_type = "postgres"
    hook_name = "Postgres"
    supports_autocommit = True
    supports_executemany = True
    ignored_extra_options = {
        "iam", "redshift", "redshift-serverless", "cursor",
        "cluster-identifier", "workgroup-name", "aws_conn_id",
        "sqlalchemy_scheme", "sqlalchemy_query"
    }
    
    def __init__(
        self, 
        *args, 
        options: str | None = None, 
        enable_log_db_messages: bool = False, 
        **kwargs
    ) -> None: ...

Connection Management

Establishes and manages database connections with comprehensive configuration support.

def get_conn(self) -> connection:
    """
    Establishes database connection with support for:
    - SSL parameters from connection extras
    - Custom cursor types (dictcursor, realdictcursor, namedtuplecursor)
    - AWS IAM authentication for RDS/Redshift
    - Connection options and logging configuration
    
    Returns:
    psycopg2 connection object
    """

def get_uri(self) -> str:
    """
    Extract connection URI from connection configuration.
    
    Returns:
    str: Connection URI in SQLAlchemy format
    """

SQLAlchemy Integration

Provides SQLAlchemy URL objects for framework integration.

@property
def sqlalchemy_url(self) -> URL:
    """
    Constructs SQLAlchemy URL object for PostgreSQL connection.
    Includes query parameters from extra configuration.
    
    Returns:
    sqlalchemy.engine.URL: SQLAlchemy URL object
    """

@property  
def dialect_name(self) -> str:
    """
    Database dialect name identifier.
    
    Returns:
    str: "postgresql"
    """

@property
def dialect(self) -> Dialect:
    """
    Returns dialect implementation for PostgreSQL.
    
    Returns:
    PostgresDialect: Dialect implementation instance
    """

Query Execution

Executes SQL statements with parameter binding and transaction control.

def run(
    self, 
    sql, 
    autocommit: bool = False, 
    parameters=None, 
    handler=None
):
    """
    Execute SQL statement(s).
    
    Parameters:
    - sql: str or list of str, SQL statement(s) to execute
    - autocommit: bool, whether to autocommit the transaction
    - parameters: list/tuple/dict, query parameters for binding
    - handler: callable, optional result handler function
    
    Returns:
    Query results (if any)
    """

def get_records(self, sql, parameters=None):
    """
    Execute SQL query and return all records.
    
    Parameters:
    - sql: str, SQL query to execute
    - parameters: list/tuple/dict, query parameters for binding
    
    Returns:
    list: All records from query result
    """

def get_first(self, sql, parameters=None):
    """
    Execute SQL query and return first record.
    
    Parameters:
    - sql: str, SQL query to execute  
    - parameters: list/tuple/dict, query parameters for binding
    
    Returns:
    tuple or None: First record from query result
    """

Cursor Management

Manages database cursors with support for different cursor types.

def get_cursor(self):
    """
    Get database cursor from current connection.
    Supports custom cursor types configured in connection extras.
    
    Returns:
    Database cursor object (DictCursor, RealDictCursor, NamedTupleCursor, or default)
    """

def _get_cursor(self, raw_cursor: str) -> CursorType:
    """
    Internal method to get specific cursor type from string name.
    
    Parameters:
    - raw_cursor: str, cursor type name ("dictcursor", "realdictcursor", "namedtuplecursor")
    
    Returns:
    CursorType: Configured cursor class
    """

@staticmethod
def _serialize_cell(cell: object, conn: connection | None = None) -> Any:
    """
    Internal static method to serialize cell values for database operations.
    Handles special data type conversions for PostgreSQL compatibility.
    
    Parameters:
    - cell: object, data value to serialize
    - conn: connection | None, optional database connection for context
    
    Returns:
    Any: Serialized value suitable for database insertion
    """

Transaction Control

Controls transaction behavior and autocommit settings.

def set_autocommit(self, conn, autocommit: bool):
    """
    Set autocommit mode for connection.
    
    Parameters:
    - conn: database connection object
    - autocommit: bool, autocommit mode setting
    """

def get_autocommit(self, conn) -> bool:
    """
    Get current autocommit status for connection.
    
    Parameters:
    - conn: database connection object
    
    Returns:
    bool: Current autocommit status
    """

Database Logging

Manages database message logging for debugging and monitoring.

def get_db_log_messages(self, conn) -> None:
    """
    Log database messages sent to client during session.
    Requires enable_log_db_messages=True in constructor.
    
    Parameters:
    - conn: database connection object
    """

UI Integration

Provides Airflow UI field behavior configuration for connection forms.

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
    """
    Returns Airflow UI field behavior configuration for PostgreSQL connections.
    Defines form field visibility, requirements, and placeholders.
    
    Returns:
    dict: UI field configuration dictionary
    """

Types

from typing import TypeAlias
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor

CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor

Connection Configuration

Connection Extra Parameters

Configure connection behavior through the connection's extra JSON field:

{
  "sslmode": "require",
  "sslcert": "/path/to/cert.pem",
  "sslkey": "/path/to/key.pem",
  "sslrootcert": "/path/to/ca.pem",
  "cursor": "dictcursor",
  "iam": true,
  "redshift": true,
  "cluster-identifier": "my-cluster",
  "aws_conn_id": "aws_default"
}

Supported Extra Options

  • SSL Configuration: sslmode, sslcert, sslkey, sslrootcert, sslcrl
  • Cursor Type: cursor - "dictcursor", "realdictcursor", "namedtuplecursor"
  • AWS IAM: iam (bool), aws_conn_id (str)
  • Redshift: redshift (bool), cluster-identifier (str)
  • Redshift Serverless: redshift-serverless (bool), workgroup-name (str)
  • SQLAlchemy: sqlalchemy_scheme (str), sqlalchemy_query (dict)
  • Connection Options: Any valid PostgreSQL connection parameter

Default Values

  • Connection Type: "postgres"
  • Default Connection: "postgres_default"
  • Default Port: 5432 (Redshift: 5439)
  • Default Schema: "public"

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json