0
# AWS Integration and Authentication
1
2
AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management, cross-provider integration, and support for both standard Redshift clusters and Redshift Serverless workgroups.
3
4
## Capabilities
5
6
### IAM Token Authentication
7
8
Generate and use IAM authentication tokens for AWS RDS PostgreSQL and Redshift connections.
9
10
```python { .api }
11
def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
12
"""
13
Get IAM authentication token for AWS RDS/Redshift connection.
14
15
Parameters:
16
- conn: Connection object with AWS configuration
17
18
Returns:
19
tuple[str, str, int]: (username, token, port)
20
21
Dependencies:
22
Requires apache-airflow-providers-amazon package
23
24
Raises:
25
AirflowOptionalProviderFeatureException: If Amazon provider not installed
26
"""
27
```
28
29
## AWS Connection Configuration
30
31
### Basic IAM Authentication
32
33
Configure PostgreSQL connection for IAM authentication:
34
35
```json
36
{
37
"iam": true,
38
"aws_conn_id": "aws_default"
39
}
40
```
41
42
### Redshift Configuration
43
44
Configure connection for Amazon Redshift with IAM:
45
46
```json
47
{
48
"iam": true,
49
"redshift": true,
50
"cluster-identifier": "my-redshift-cluster",
51
"aws_conn_id": "my_aws_conn"
52
}
53
```
54
55
### Redshift Serverless Configuration
56
57
Configure connection for Redshift Serverless:
58
59
```json
60
{
61
"iam": true,
62
"redshift-serverless": true,
63
"workgroup-name": "my-serverless-workgroup",
64
"aws_conn_id": "my_aws_conn"
65
}
66
```
67
68
## Usage Examples
69
70
### RDS PostgreSQL with IAM
71
72
```python
73
from airflow.providers.postgres.hooks.postgres import PostgresHook
74
75
# Connection configured with IAM authentication
76
hook = PostgresHook(postgres_conn_id="rds_postgres_iam")
77
78
# Hook automatically handles IAM token generation
79
records = hook.get_records("SELECT current_user, current_database()")
80
```
81
82
### Redshift with IAM
83
84
```python
85
# Redshift connection with IAM
86
hook = PostgresHook(postgres_conn_id="redshift_iam")
87
88
# Execute Redshift-specific queries
89
hook.run("""
90
COPY users FROM 's3://my-bucket/users.csv'
91
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
92
CSV
93
""")
94
```
95
96
### Redshift Serverless with IAM
97
98
```python
99
# Redshift Serverless connection
100
hook = PostgresHook(postgres_conn_id="redshift_serverless_iam")
101
102
# Query Redshift Serverless
103
df = hook.get_df("""
104
SELECT
105
user_id,
106
COUNT(*) as order_count,
107
SUM(amount) as total_amount
108
FROM orders
109
WHERE date >= CURRENT_DATE - INTERVAL '30 days'
110
GROUP BY user_id
111
""", df_type="pandas")
112
```
113
114
## Connection Configuration Details
115
116
### Connection Host Format
117
118
**RDS PostgreSQL**:
119
```
120
mydb.cluster-xyz.us-east-1.rds.amazonaws.com
121
```
122
123
**Redshift**:
124
```
125
my-cluster.xyz.us-east-1.redshift.amazonaws.com
126
```
127
128
**Redshift Serverless**:
129
```
130
my-workgroup.123456789012.us-east-1.redshift-serverless.amazonaws.com
131
```
132
133
### Extra Parameters
134
135
```json
136
{
137
"iam": true,
138
"redshift": false,
139
"redshift-serverless": false,
140
"cluster-identifier": "optional-override",
141
"workgroup-name": "optional-override",
142
"aws_conn_id": "aws_default",
143
"region_name": "us-east-1"
144
}
145
```
146
147
### Parameter Details
148
149
- **iam** (bool): Enable IAM authentication
150
- **redshift** (bool): Enable Redshift-specific features
151
- **redshift-serverless** (bool): Enable Redshift Serverless support
152
- **cluster-identifier** (str): Override cluster ID from hostname
153
- **workgroup-name** (str): Override workgroup name from hostname
154
- **aws_conn_id** (str): AWS connection ID for IAM credentials
155
- **region_name** (str): AWS region override
156
157
## Advanced Configuration
158
159
### Multi-Region Setup
160
161
```python
162
# Production environment
163
prod_hook = PostgresHook(postgres_conn_id="prod_redshift")
164
165
# Staging environment in different region
166
staging_hook = PostgresHook(postgres_conn_id="staging_redshift")
167
168
# Cross-region data sync
169
prod_data = prod_hook.get_df("SELECT * FROM processed_data")
170
staging_hook.insert_rows(
171
"processed_data_copy",
172
prod_data.values.tolist(),
173
target_fields=list(prod_data.columns)
174
)
175
```
176
177
### Automatic Token Refresh
178
179
```python
180
def long_running_process():
181
"""Handle long-running processes with token refresh."""
182
hook = PostgresHook(postgres_conn_id="rds_iam")
183
184
# Process data in batches
185
for batch_id in range(100):
186
try:
187
# Each query gets fresh token automatically
188
process_batch(hook, batch_id)
189
except Exception as e:
190
if "token" in str(e).lower():
191
# Token expired - hook will refresh on next use
192
print(f"Token refresh needed for batch {batch_id}")
193
continue
194
raise
195
```
196
197
### Cross-Provider Integration
198
199
```python
200
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
201
202
def etl_s3_to_redshift():
203
"""ETL pipeline using S3 and Redshift with IAM."""
204
205
# S3 operations
206
s3_hook = S3Hook(aws_conn_id="aws_default")
207
208
# Redshift operations with same AWS credentials
209
redshift_hook = PostgresHook(postgres_conn_id="redshift_iam")
210
211
# List S3 files
212
files = s3_hook.list_keys("my-bucket", prefix="data/")
213
214
# Process each file
215
for file_key in files:
216
redshift_hook.run(f"""
217
COPY staging_table FROM 's3://my-bucket/{file_key}'
218
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
219
CSV IGNOREHEADER 1
220
""")
221
```
222
223
## Performance Considerations
224
225
### Token Caching
226
227
IAM tokens are automatically cached and refreshed:
228
229
- **Token Lifetime**: 15 minutes (AWS default)
230
- **Automatic Refresh**: Handled transparently by the hook
231
- **Connection Pooling**: Tokens shared across connection pool
232
233
### Redshift Optimization
234
235
```python
236
# Optimize Redshift queries
237
hook.run("SET enable_result_cache_for_session TO off") # For ETL
238
hook.run("SET query_group TO 'etl_jobs'") # For workload management
239
240
# Use COPY for large data loads
241
hook.run("""
242
COPY large_table FROM 's3://bucket/data/'
243
IAM_ROLE 'arn:aws:iam::account:role/RedshiftRole'
244
FORMAT AS PARQUET
245
""")
246
```
247
248
## Error Handling
249
250
### Common IAM Authentication Errors
251
252
```python
253
def handle_iam_errors():
254
try:
255
hook = PostgresHook(postgres_conn_id="rds_iam")
256
records = hook.get_records("SELECT 1")
257
except Exception as e:
258
error_msg = str(e).lower()
259
260
if "amazon provider" in error_msg:
261
print("Install: pip install apache-airflow-providers-amazon")
262
elif "access denied" in error_msg:
263
print("Check IAM permissions for RDS/Redshift access")
264
elif "cluster" in error_msg and "not found" in error_msg:
265
print("Verify cluster-identifier in connection extras")
266
elif "workgroup" in error_msg:
267
print("Verify workgroup-name for Redshift Serverless")
268
else:
269
raise
270
```
271
272
### Connection Validation
273
274
```python
275
def validate_aws_connection(conn_id):
276
"""Validate AWS IAM connection setup."""
277
try:
278
hook = PostgresHook(postgres_conn_id=conn_id)
279
280
# Test basic connectivity
281
result = hook.get_first("SELECT current_user, current_database()")
282
print(f"Connected as: {result[0]} to database: {result[1]}")
283
284
return True
285
except Exception as e:
286
print(f"Connection validation failed: {e}")
287
return False
288
289
# Validate connections
290
validate_aws_connection("rds_postgres_iam")
291
validate_aws_connection("redshift_iam")
292
```
293
294
## IAM Permissions Required
295
296
### RDS PostgreSQL
297
298
```json
299
{
300
"Version": "2012-10-17",
301
"Statement": [
302
{
303
"Effect": "Allow",
304
"Action": [
305
"rds-db:connect"
306
],
307
"Resource": [
308
"arn:aws:rds-db:region:account:dbuser:db-instance-id/db-username"
309
]
310
}
311
]
312
}
313
```
314
315
### Redshift
316
317
```json
318
{
319
"Version": "2012-10-17",
320
"Statement": [
321
{
322
"Effect": "Allow",
323
"Action": [
324
"redshift:GetClusterCredentials"
325
],
326
"Resource": [
327
"arn:aws:redshift:region:account:cluster:cluster-name",
328
"arn:aws:redshift:region:account:dbuser:cluster-name/username"
329
]
330
}
331
]
332
}
333
```
334
335
### Redshift Serverless
336
337
```json
338
{
339
"Version": "2012-10-17",
340
"Statement": [
341
{
342
"Effect": "Allow",
343
"Action": [
344
"redshift-serverless:GetCredentials"
345
],
346
"Resource": [
347
"arn:aws:redshift-serverless:region:account:workgroup/workgroup-name"
348
]
349
}
350
]
351
}
352
```