0
# Schema Operations and Introspection
1
2
Database schema introspection and metadata operations for analyzing table structures, primary keys, constraints, and database organization. Provides programmatic access to PostgreSQL system catalogs and information schema.
3
4
## Capabilities
5
6
### Primary Key Introspection
7
8
Retrieve primary key information for tables using PostgreSQL system catalogs.
9
10
```python { .api }
11
def get_table_primary_key(
12
self,
13
table: str,
14
schema: str | None = "public"
15
) -> list[str] | None:
16
"""
17
Get primary key columns for specified table.
18
19
Parameters:
20
- table: str, table name to inspect
21
- schema: str or None, schema name (defaults to "public")
22
23
Returns:
24
list[str] or None: List of primary key column names, None if no primary key
25
26
Example:
27
pk_cols = hook.get_table_primary_key("users", "public") # ["id"]
28
composite_pk = hook.get_table_primary_key("user_roles") # ["user_id", "role_id"]
29
"""
30
```
31
32
## Usage Examples
33
34
### Basic Primary Key Lookup
35
36
```python
37
from airflow.providers.postgres.hooks.postgres import PostgresHook
38
39
hook = PostgresHook(postgres_conn_id="postgres_default")
40
41
# Get primary key for table in default schema
42
pk_columns = hook.get_table_primary_key("users")
43
print(f"Primary key columns: {pk_columns}") # ["id"]
44
45
# Get primary key for table in specific schema
46
pk_columns = hook.get_table_primary_key("customers", "sales")
47
print(f"Primary key columns: {pk_columns}") # ["customer_id"]
48
```
49
50
### Handling Composite Primary Keys
51
52
```python
53
# Tables with composite primary keys
54
composite_pk = hook.get_table_primary_key("order_items")
55
if composite_pk:
56
print(f"Composite key: {composite_pk}") # ["order_id", "product_id"]
57
58
# Use in upsert operations
59
hook.insert_rows(
60
table="order_items",
61
rows=[(1, 101, 2), (1, 102, 1)],
62
target_fields=["order_id", "product_id", "quantity"],
63
replace=True,
64
replace_index=composite_pk # Use composite key for conflict resolution
65
)
66
```
67
68
### Schema Validation Workflow
69
70
```python
71
def validate_table_structure(table_name, expected_pk_columns):
72
"""Validate table has expected primary key structure."""
73
actual_pk = hook.get_table_primary_key(table_name)
74
75
if actual_pk is None:
76
raise ValueError(f"Table {table_name} has no primary key")
77
78
if set(actual_pk) != set(expected_pk_columns):
79
raise ValueError(
80
f"Primary key mismatch for {table_name}. "
81
f"Expected: {expected_pk_columns}, Actual: {actual_pk}"
82
)
83
84
return True
85
86
# Validate table structure before operations
87
validate_table_structure("users", ["id"])
88
validate_table_structure("user_permissions", ["user_id", "permission_id"])
89
```
90
91
### Dynamic Upsert Configuration
92
93
```python
94
def smart_upsert(table_name, rows, target_fields):
95
"""Perform upsert using table's actual primary key."""
96
97
# Discover primary key dynamically
98
pk_columns = hook.get_table_primary_key(table_name)
99
100
if pk_columns is None:
101
# No primary key - use regular insert
102
hook.insert_rows(
103
table=table_name,
104
rows=rows,
105
target_fields=target_fields
106
)
107
else:
108
# Use discovered primary key for upsert
109
hook.insert_rows(
110
table=table_name,
111
rows=rows,
112
target_fields=target_fields,
113
replace=True,
114
replace_index=pk_columns
115
)
116
117
# Usage
118
smart_upsert("products", [(1, "Widget", 19.99)], ["id", "name", "price"])
119
```
120
121
### ETL Pipeline Integration
122
123
```python
124
def etl_with_schema_validation():
125
"""ETL pipeline with schema validation."""
126
127
# Define expected schemas
128
expected_schemas = {
129
"users": ["user_id"],
130
"orders": ["order_id"],
131
"order_items": ["order_id", "item_id"]
132
}
133
134
# Validate all tables before processing
135
for table, expected_pk in expected_schemas.items():
136
try:
137
validate_table_structure(table, expected_pk)
138
print(f"✓ {table} schema validation passed")
139
except ValueError as e:
140
print(f"✗ {table} schema validation failed: {e}")
141
return False
142
143
# Proceed with ETL operations
144
process_etl_data()
145
return True
146
```
147
148
## Schema Information Queries
149
150
While the hook provides primary key introspection, you can also execute custom queries for additional schema information:
151
152
### Table Information
153
154
```python
155
# Get all tables in schema
156
tables = hook.get_records("""
157
SELECT table_name
158
FROM information_schema.tables
159
WHERE table_schema = %s AND table_type = 'BASE TABLE'
160
""", parameters=["public"])
161
162
# Get column information
163
columns = hook.get_records("""
164
SELECT column_name, data_type, is_nullable, column_default
165
FROM information_schema.columns
166
WHERE table_schema = %s AND table_name = %s
167
ORDER BY ordinal_position
168
""", parameters=["public", "users"])
169
```
170
171
### Constraint Information
172
173
```python
174
# Get foreign key constraints
175
foreign_keys = hook.get_records("""
176
SELECT
177
kcu.column_name,
178
ccu.table_name AS foreign_table_name,
179
ccu.column_name AS foreign_column_name
180
FROM information_schema.table_constraints tc
181
JOIN information_schema.key_column_usage kcu
182
ON tc.constraint_name = kcu.constraint_name
183
JOIN information_schema.constraint_column_usage ccu
184
ON ccu.constraint_name = tc.constraint_name
185
WHERE tc.constraint_type = 'FOREIGN KEY'
186
AND tc.table_schema = %s
187
AND tc.table_name = %s
188
""", parameters=["public", "orders"])
189
190
# Get unique constraints
191
unique_constraints = hook.get_records("""
192
SELECT
193
tc.constraint_name,
194
array_agg(kcu.column_name ORDER BY kcu.ordinal_position) as columns
195
FROM information_schema.table_constraints tc
196
JOIN information_schema.key_column_usage kcu
197
ON tc.constraint_name = kcu.constraint_name
198
WHERE tc.constraint_type = 'UNIQUE'
199
AND tc.table_schema = %s
200
AND tc.table_name = %s
201
GROUP BY tc.constraint_name
202
""", parameters=["public", "users"])
203
```
204
205
### Index Information
206
207
```python
208
# Get table indexes
209
indexes = hook.get_records("""
210
SELECT
211
i.relname as index_name,
212
array_agg(a.attname ORDER BY c.ordinality) as columns,
213
ix.indisunique as is_unique,
214
ix.indisprimary as is_primary
215
FROM pg_class t
216
JOIN pg_index ix ON t.oid = ix.indrelid
217
JOIN pg_class i ON i.oid = ix.indexrelid
218
JOIN unnest(ix.indkey) WITH ORDINALITY AS c(attnum, ordinality) ON true
219
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = c.attnum
220
JOIN pg_namespace n ON t.relnamespace = n.oid
221
WHERE t.relname = %s AND n.nspname = %s
222
GROUP BY i.relname, ix.indisunique, ix.indisprimary
223
ORDER BY i.relname
224
""", parameters=["users", "public"])
225
```
226
227
## Error Handling
228
229
```python
230
def safe_get_primary_key(table_name, schema=None):
231
"""Safely get primary key with error handling."""
232
try:
233
pk_columns = hook.get_table_primary_key(table_name, schema)
234
return pk_columns
235
except Exception as e:
236
print(f"Error getting primary key for {table_name}: {e}")
237
return None
238
239
# Usage with error handling
240
pk = safe_get_primary_key("nonexistent_table")
241
if pk is not None:
242
print(f"Primary key: {pk}")
243
else:
244
print("Could not determine primary key")
245
```