or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

aws-integration.mddocs/

0

# AWS Integration and Authentication

1

2

AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management, cross-provider integration, and support for both standard Redshift clusters and Redshift Serverless workgroups.

3

4

## Capabilities

5

6

### IAM Token Authentication

7

8

Generate and use IAM authentication tokens for AWS RDS PostgreSQL and Redshift connections.

9

10

```python { .api }

11

def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:

12

"""

13

Get IAM authentication token for AWS RDS/Redshift connection.

14

15

Parameters:

16

- conn: Connection object with AWS configuration

17

18

Returns:

19

tuple[str, str, int]: (username, token, port)

20

21

Dependencies:

22

Requires apache-airflow-providers-amazon package

23

24

Raises:

25

AirflowOptionalProviderFeatureException: If Amazon provider not installed

26

"""

27

```

28

29

## AWS Connection Configuration

30

31

### Basic IAM Authentication

32

33

Configure PostgreSQL connection for IAM authentication:

34

35

```json

36

{

37

"iam": true,

38

"aws_conn_id": "aws_default"

39

}

40

```

41

42

### Redshift Configuration

43

44

Configure connection for Amazon Redshift with IAM:

45

46

```json

47

{

48

"iam": true,

49

"redshift": true,

50

"cluster-identifier": "my-redshift-cluster",

51

"aws_conn_id": "my_aws_conn"

52

}

53

```

54

55

### Redshift Serverless Configuration

56

57

Configure connection for Redshift Serverless:

58

59

```json

60

{

61

"iam": true,

62

"redshift-serverless": true,

63

"workgroup-name": "my-serverless-workgroup",

64

"aws_conn_id": "my_aws_conn"

65

}

66

```

67

68

## Usage Examples

69

70

### RDS PostgreSQL with IAM

71

72

```python

73

from airflow.providers.postgres.hooks.postgres import PostgresHook

74

75

# Connection configured with IAM authentication

76

hook = PostgresHook(postgres_conn_id="rds_postgres_iam")

77

78

# Hook automatically handles IAM token generation

79

records = hook.get_records("SELECT current_user, current_database()")

80

```

81

82

### Redshift with IAM

83

84

```python

85

# Redshift connection with IAM

86

hook = PostgresHook(postgres_conn_id="redshift_iam")

87

88

# Execute Redshift-specific queries

89

hook.run("""

90

COPY users FROM 's3://my-bucket/users.csv'

91

IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'

92

CSV

93

""")

94

```

95

96

### Redshift Serverless with IAM

97

98

```python

99

# Redshift Serverless connection

100

hook = PostgresHook(postgres_conn_id="redshift_serverless_iam")

101

102

# Query Redshift Serverless

103

df = hook.get_df("""

104

SELECT

105

user_id,

106

COUNT(*) as order_count,

107

SUM(amount) as total_amount

108

FROM orders

109

WHERE date >= CURRENT_DATE - INTERVAL '30 days'

110

GROUP BY user_id

111

""", df_type="pandas")

112

```

113

114

## Connection Configuration Details

115

116

### Connection Host Format

117

118

**RDS PostgreSQL**:

119

```

120

mydb.cluster-xyz.us-east-1.rds.amazonaws.com

121

```

122

123

**Redshift**:

124

```

125

my-cluster.xyz.us-east-1.redshift.amazonaws.com

126

```

127

128

**Redshift Serverless**:

129

```

130

my-workgroup.123456789012.us-east-1.redshift-serverless.amazonaws.com

131

```

132

133

### Extra Parameters

134

135

```json

136

{

137

"iam": true,

138

"redshift": false,

139

"redshift-serverless": false,

140

"cluster-identifier": "optional-override",

141

"workgroup-name": "optional-override",

142

"aws_conn_id": "aws_default",

143

"region_name": "us-east-1"

144

}

145

```

146

147

### Parameter Details

148

149

- **iam** (bool): Enable IAM authentication

150

- **redshift** (bool): Enable Redshift-specific features

151

- **redshift-serverless** (bool): Enable Redshift Serverless support

152

- **cluster-identifier** (str): Override cluster ID from hostname

153

- **workgroup-name** (str): Override workgroup name from hostname

154

- **aws_conn_id** (str): AWS connection ID for IAM credentials

155

- **region_name** (str): AWS region override

156

157

## Advanced Configuration

158

159

### Multi-Region Setup

160

161

```python

162

# Production environment

163

prod_hook = PostgresHook(postgres_conn_id="prod_redshift")

164

165

# Staging environment in different region

166

staging_hook = PostgresHook(postgres_conn_id="staging_redshift")

167

168

# Cross-region data sync

169

prod_data = prod_hook.get_df("SELECT * FROM processed_data")

170

staging_hook.insert_rows(

171

"processed_data_copy",

172

prod_data.values.tolist(),

173

target_fields=list(prod_data.columns)

174

)

175

```

176

177

### Automatic Token Refresh

178

179

```python

180

def long_running_process():

181

"""Handle long-running processes with token refresh."""

182

hook = PostgresHook(postgres_conn_id="rds_iam")

183

184

# Process data in batches

185

for batch_id in range(100):

186

try:

187

# Each query gets fresh token automatically

188

process_batch(hook, batch_id)

189

except Exception as e:

190

if "token" in str(e).lower():

191

# Token expired - hook will refresh on next use

192

print(f"Token refresh needed for batch {batch_id}")

193

continue

194

raise

195

```

196

197

### Cross-Provider Integration

198

199

```python

200

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

201

202

def etl_s3_to_redshift():

203

"""ETL pipeline using S3 and Redshift with IAM."""

204

205

# S3 operations

206

s3_hook = S3Hook(aws_conn_id="aws_default")

207

208

# Redshift operations with same AWS credentials

209

redshift_hook = PostgresHook(postgres_conn_id="redshift_iam")

210

211

# List S3 files

212

files = s3_hook.list_keys("my-bucket", prefix="data/")

213

214

# Process each file

215

for file_key in files:

216

redshift_hook.run(f"""

217

COPY staging_table FROM 's3://my-bucket/{file_key}'

218

IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'

219

CSV IGNOREHEADER 1

220

""")

221

```

222

223

## Performance Considerations

224

225

### Token Caching

226

227

IAM tokens are automatically cached and refreshed:

228

229

- **Token Lifetime**: 15 minutes (AWS default)

230

- **Automatic Refresh**: Handled transparently by the hook

231

- **Connection Pooling**: Tokens shared across connection pool

232

233

### Redshift Optimization

234

235

```python

236

# Optimize Redshift queries

237

hook.run("SET enable_result_cache_for_session TO off") # For ETL

238

hook.run("SET query_group TO 'etl_jobs'") # For workload management

239

240

# Use COPY for large data loads

241

hook.run("""

242

COPY large_table FROM 's3://bucket/data/'

243

IAM_ROLE 'arn:aws:iam::account:role/RedshiftRole'

244

FORMAT AS PARQUET

245

""")

246

```

247

248

## Error Handling

249

250

### Common IAM Authentication Errors

251

252

```python

253

def handle_iam_errors():

254

try:

255

hook = PostgresHook(postgres_conn_id="rds_iam")

256

records = hook.get_records("SELECT 1")

257

except Exception as e:

258

error_msg = str(e).lower()

259

260

if "amazon provider" in error_msg:

261

print("Install: pip install apache-airflow-providers-amazon")

262

elif "access denied" in error_msg:

263

print("Check IAM permissions for RDS/Redshift access")

264

elif "cluster" in error_msg and "not found" in error_msg:

265

print("Verify cluster-identifier in connection extras")

266

elif "workgroup" in error_msg:

267

print("Verify workgroup-name for Redshift Serverless")

268

else:

269

raise

270

```

271

272

### Connection Validation

273

274

```python

275

def validate_aws_connection(conn_id):

276

"""Validate AWS IAM connection setup."""

277

try:

278

hook = PostgresHook(postgres_conn_id=conn_id)

279

280

# Test basic connectivity

281

result = hook.get_first("SELECT current_user, current_database()")

282

print(f"Connected as: {result[0]} to database: {result[1]}")

283

284

return True

285

except Exception as e:

286

print(f"Connection validation failed: {e}")

287

return False

288

289

# Validate connections

290

validate_aws_connection("rds_postgres_iam")

291

validate_aws_connection("redshift_iam")

292

```

293

294

## IAM Permissions Required

295

296

### RDS PostgreSQL

297

298

```json

299

{

300

"Version": "2012-10-17",

301

"Statement": [

302

{

303

"Effect": "Allow",

304

"Action": [

305

"rds-db:connect"

306

],

307

"Resource": [

308

"arn:aws:rds-db:region:account:dbuser:db-instance-id/db-username"

309

]

310

}

311

]

312

}

313

```

314

315

### Redshift

316

317

```json

318

{

319

"Version": "2012-10-17",

320

"Statement": [

321

{

322

"Effect": "Allow",

323

"Action": [

324

"redshift:GetClusterCredentials"

325

],

326

"Resource": [

327

"arn:aws:redshift:region:account:cluster:cluster-name",

328

"arn:aws:redshift:region:account:dbuser:cluster-name/username"

329

]

330

}

331

]

332

}

333

```

334

335

### Redshift Serverless

336

337

```json

338

{

339

"Version": "2012-10-17",

340

"Statement": [

341

{

342

"Effect": "Allow",

343

"Action": [

344

"redshift-serverless:GetCredentials"

345

],

346

"Resource": [

347

"arn:aws:redshift-serverless:region:account:workgroup/workgroup-name"

348

]

349

}

350

]

351

}

352

```