or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

index.mddocs/

0

# Apache Airflow PostgreSQL Provider

1

2

PostgreSQL integration provider for Apache Airflow that enables database connectivity, query execution, and data manipulation through hooks, assets, and SQL dialect support. This package provides comprehensive PostgreSQL integration capabilities including synchronous and asynchronous database connections, bulk data operations, schema introspection, AWS IAM authentication, Redshift support, and OpenLineage integration for data lineage tracking.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-postgres

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-postgres`

9

- **Minimum Airflow Version**: 2.10.0+

10

11

## Core Imports

12

13

```python

14

from airflow.providers.postgres.hooks.postgres import PostgresHook

15

```

16

17

Asset/dataset handling:

18

19

```python

20

from airflow.providers.postgres.assets.postgres import sanitize_uri

21

```

22

23

SQL dialect:

24

25

```python

26

from airflow.providers.postgres.dialects.postgres import PostgresDialect

27

```

28

29

## Basic Usage

30

31

```python

32

from airflow.providers.postgres.hooks.postgres import PostgresHook

33

34

# Initialize hook with connection

35

hook = PostgresHook(postgres_conn_id="my_postgres_conn")

36

37

# Execute queries

38

records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=[True])

39

40

# Get data as DataFrame

41

df = hook.get_df("SELECT * FROM sales_data", df_type="pandas")

42

43

# Insert rows with upsert capability

44

hook.insert_rows(

45

table="users",

46

rows=[(1, "john", "john@example.com"), (2, "jane", "jane@example.com")],

47

target_fields=["id", "name", "email"],

48

replace=True,

49

replace_index="id"

50

)

51

52

# Bulk load from file

53

hook.bulk_load("user_imports", "/path/to/data.tsv")

54

```

55

56

## Architecture

57

58

The provider is built around several key components:

59

60

- **PostgresHook**: Main database interface extending DbApiHook with PostgreSQL-specific features

61

- **PostgresDialect**: SQL dialect implementation for PostgreSQL-specific operations like UPSERT

62

- **Asset Handler**: URI sanitization and validation for PostgreSQL datasets/assets

63

- **Provider Info**: Metadata registration for Airflow integration

64

65

This architecture enables seamless integration with the broader Airflow ecosystem while providing PostgreSQL-specific optimizations and features.

66

67

## Capabilities

68

69

### Database Connection and Query Execution

70

71

Core database connectivity, query execution, and transaction management with support for multiple cursor types, SSL configuration, and connection pooling.

72

73

```python { .api }

74

class PostgresHook:

75

def get_conn(self) -> connection: ...

76

def run(self, sql, autocommit=False, parameters=None, handler=None): ...

77

def get_records(self, sql, parameters=None): ...

78

def get_first(self, sql, parameters=None): ...

79

```

80

81

[Database Connection](./database-connection.md)

82

83

### Data Retrieval and DataFrame Operations

84

85

Advanced data retrieval with DataFrame support for both pandas and polars, providing efficient data manipulation and analysis capabilities.

86

87

```python { .api }

88

def get_df(

89

self,

90

sql: str | list[str],

91

parameters: list | tuple | Mapping[str, Any] | None = None,

92

*,

93

df_type: Literal["pandas", "polars"] = "pandas",

94

**kwargs: Any

95

) -> PandasDataFrame | PolarsDataFrame: ...

96

```

97

98

[Data Retrieval](./data-retrieval.md)

99

100

### Bulk Operations and Data Loading

101

102

High-performance bulk data operations including file-based loading, dumping, and PostgreSQL COPY command support for efficient data transfer.

103

104

```python { .api }

105

def bulk_load(self, table: str, tmp_file: str) -> None: ...

106

def bulk_dump(self, table: str, tmp_file: str) -> None: ...

107

def copy_expert(self, sql: str, filename: str) -> None: ...

108

def insert_rows(

109

self,

110

table,

111

rows,

112

target_fields=None,

113

commit_every=1000,

114

replace=False,

115

**kwargs

116

): ...

117

```

118

119

[Bulk Operations](./bulk-operations.md)

120

121

### Schema Operations and Introspection

122

123

Database schema introspection and metadata operations for analyzing table structures, primary keys, and database organization.

124

125

```python { .api }

126

def get_table_primary_key(

127

self,

128

table: str,

129

schema: str | None = "public"

130

) -> list[str] | None: ...

131

```

132

133

[Schema Operations](./schema-operations.md)

134

135

### AWS Integration and Authentication

136

137

AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management and cross-provider integration.

138

139

```python { .api }

140

def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: ...

141

```

142

143

[AWS Integration](./aws-integration.md)

144

145

### Asset and Dataset Management

146

147

PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems.

148

149

```python { .api }

150

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

151

```

152

153

[Asset Management](./asset-management.md)

154

155

### SQL Dialect and Database-Specific Operations

156

157

PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements and database-specific query generation.

158

159

```python { .api }

160

class PostgresDialect:

161

def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str: ...

162

def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None: ...

163

```

164

165

[SQL Dialect](./sql-dialect.md)

166

167

### OpenLineage Integration

168

169

Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring and compliance requirements.

170

171

```python { .api }

172

def get_openlineage_database_info(self, connection) -> DatabaseInfo: ...

173

def get_openlineage_database_dialect(self, connection) -> str: ...

174

def get_openlineage_default_schema(self) -> str | None: ...

175

```

176

177

[OpenLineage Integration](./openlineage-integration.md)

178

179

## Types

180

181

```python { .api }

182

from typing import TypeAlias, Literal, Mapping, Any

183

from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor

184

from psycopg2.extensions import connection

185

186

# Type aliases used throughout the provider

187

CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor

188

189

# DataFrame types (conditional imports)

190

try:

191

from pandas import DataFrame as PandasDataFrame

192

except ImportError:

193

PandasDataFrame = None

194

195

try:

196

from polars import DataFrame as PolarsDataFrame

197

except ImportError:

198

PolarsDataFrame = None

199

```