PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core database connectivity, query execution, and transaction management functionality for PostgreSQL databases through Apache Airflow. Provides connection management, SSL support, custom cursor types, and comprehensive query execution capabilities.
Main database hook class that extends DbApiHook with PostgreSQL-specific functionality.
class PostgresHook(DbApiHook):
"""
Interact with Postgres.
Parameters:
- postgres_conn_id: str, connection ID reference to PostgreSQL database
- options: str | None, command-line options to send to server at connection start
- enable_log_db_messages: bool, enable logging of database messages sent to client
"""
# Class attributes
conn_name_attr = "postgres_conn_id"
default_conn_name = "postgres_default"
conn_type = "postgres"
hook_name = "Postgres"
supports_autocommit = True
supports_executemany = True
ignored_extra_options = {
"iam", "redshift", "redshift-serverless", "cursor",
"cluster-identifier", "workgroup-name", "aws_conn_id",
"sqlalchemy_scheme", "sqlalchemy_query"
}
def __init__(
self,
*args,
options: str | None = None,
enable_log_db_messages: bool = False,
**kwargs
) -> None: ...Establishes and manages database connections with comprehensive configuration support.
def get_conn(self) -> connection:
"""
Establishes database connection with support for:
- SSL parameters from connection extras
- Custom cursor types (dictcursor, realdictcursor, namedtuplecursor)
- AWS IAM authentication for RDS/Redshift
- Connection options and logging configuration
Returns:
psycopg2 connection object
"""
def get_uri(self) -> str:
"""
Extract connection URI from connection configuration.
Returns:
str: Connection URI in SQLAlchemy format
"""Provides SQLAlchemy URL objects for framework integration.
@property
def sqlalchemy_url(self) -> URL:
"""
Constructs SQLAlchemy URL object for PostgreSQL connection.
Includes query parameters from extra configuration.
Returns:
sqlalchemy.engine.URL: SQLAlchemy URL object
"""
@property
def dialect_name(self) -> str:
"""
Database dialect name identifier.
Returns:
str: "postgresql"
"""
@property
def dialect(self) -> Dialect:
"""
Returns dialect implementation for PostgreSQL.
Returns:
PostgresDialect: Dialect implementation instance
"""Executes SQL statements with parameter binding and transaction control.
def run(
self,
sql,
autocommit: bool = False,
parameters=None,
handler=None
):
"""
Execute SQL statement(s).
Parameters:
- sql: str or list of str, SQL statement(s) to execute
- autocommit: bool, whether to autocommit the transaction
- parameters: list/tuple/dict, query parameters for binding
- handler: callable, optional result handler function
Returns:
Query results (if any)
"""
def get_records(self, sql, parameters=None):
"""
Execute SQL query and return all records.
Parameters:
- sql: str, SQL query to execute
- parameters: list/tuple/dict, query parameters for binding
Returns:
list: All records from query result
"""
def get_first(self, sql, parameters=None):
"""
Execute SQL query and return first record.
Parameters:
- sql: str, SQL query to execute
- parameters: list/tuple/dict, query parameters for binding
Returns:
tuple or None: First record from query result
"""Manages database cursors with support for different cursor types.
def get_cursor(self):
"""
Get database cursor from current connection.
Supports custom cursor types configured in connection extras.
Returns:
Database cursor object (DictCursor, RealDictCursor, NamedTupleCursor, or default)
"""
def _get_cursor(self, raw_cursor: str) -> CursorType:
"""
Internal method to get specific cursor type from string name.
Parameters:
- raw_cursor: str, cursor type name ("dictcursor", "realdictcursor", "namedtuplecursor")
Returns:
CursorType: Configured cursor class
"""
@staticmethod
def _serialize_cell(cell: object, conn: connection | None = None) -> Any:
"""
Internal static method to serialize cell values for database operations.
Handles special data type conversions for PostgreSQL compatibility.
Parameters:
- cell: object, data value to serialize
- conn: connection | None, optional database connection for context
Returns:
Any: Serialized value suitable for database insertion
"""Controls transaction behavior and autocommit settings.
def set_autocommit(self, conn, autocommit: bool):
"""
Set autocommit mode for connection.
Parameters:
- conn: database connection object
- autocommit: bool, autocommit mode setting
"""
def get_autocommit(self, conn) -> bool:
"""
Get current autocommit status for connection.
Parameters:
- conn: database connection object
Returns:
bool: Current autocommit status
"""Manages database message logging for debugging and monitoring.
def get_db_log_messages(self, conn) -> None:
"""
Log database messages sent to client during session.
Requires enable_log_db_messages=True in constructor.
Parameters:
- conn: database connection object
"""Provides Airflow UI field behavior configuration for connection forms.
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Returns Airflow UI field behavior configuration for PostgreSQL connections.
Defines form field visibility, requirements, and placeholders.
Returns:
dict: UI field configuration dictionary
"""from typing import TypeAlias
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor
CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursorConfigure connection behavior through the connection's extra JSON field:
{
"sslmode": "require",
"sslcert": "/path/to/cert.pem",
"sslkey": "/path/to/key.pem",
"sslrootcert": "/path/to/ca.pem",
"cursor": "dictcursor",
"iam": true,
"redshift": true,
"cluster-identifier": "my-cluster",
"aws_conn_id": "aws_default"
}sslmode, sslcert, sslkey, sslrootcert, sslcrlcursor - "dictcursor", "realdictcursor", "namedtuplecursor"iam (bool), aws_conn_id (str)redshift (bool), cluster-identifier (str)redshift-serverless (bool), workgroup-name (str)sqlalchemy_scheme (str), sqlalchemy_query (dict)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres