0
# Authentication and Connection Management
1
2
Centralized AWS authentication and connection management providing secure, configurable access to AWS services. Handles credentials, sessions, and cross-service authentication patterns with support for multiple authentication methods including IAM roles, access keys, and temporary credentials.
3
4
## Capabilities
5
6
### AWS Base Hook
7
8
Foundation class for all AWS service hooks providing common authentication and session management functionality.
9
10
```python { .api }
11
class AwsBaseHook(BaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, region_name: str = None, client_type: str = None, resource_type: str = None, config: dict = None):
13
"""
14
Initialize AWS Base Hook.
15
16
Parameters:
17
- aws_conn_id: Airflow connection ID for AWS credentials
18
- verify: SSL certificate verification (True/False/path to CA bundle)
19
- region_name: AWS region name
20
- client_type: AWS service client type (e.g., 's3', 'lambda')
21
- resource_type: AWS service resource type
22
- config: Additional configuration for AWS client
23
"""
24
25
def get_client_type(self, client_type: str = None, region_name: str = None, config: dict = None) -> Any:
26
"""
27
Get AWS service client.
28
29
Parameters:
30
- client_type: AWS service type (e.g., 's3', 'ec2', 'lambda')
31
- region_name: AWS region name
32
- config: Client configuration options
33
34
Returns:
35
Boto3 client instance
36
"""
37
38
def get_resource_type(self, resource_type: str, region_name: str = None, config: dict = None) -> Any:
39
"""
40
Get AWS service resource.
41
42
Parameters:
43
- resource_type: AWS service type (e.g., 's3', 'ec2', 'dynamodb')
44
- region_name: AWS region name
45
- config: Resource configuration options
46
47
Returns:
48
Boto3 resource instance
49
"""
50
51
def get_session(self, region_name: str = None) -> Any:
52
"""
53
Get AWS session with configured credentials.
54
55
Parameters:
56
- region_name: AWS region name
57
58
Returns:
59
Boto3 session instance
60
"""
61
62
def get_credentials(self, region_name: str = None) -> dict:
63
"""
64
Get AWS credentials for the configured connection.
65
66
Parameters:
67
- region_name: AWS region name
68
69
Returns:
70
Credentials dictionary with access keys and tokens
71
"""
72
73
def expand_role(self, role: str, region_name: str = None) -> str:
74
"""
75
Expand role ARN if needed.
76
77
Parameters:
78
- role: Role name or ARN
79
- region_name: AWS region name
80
81
Returns:
82
Full role ARN
83
"""
84
85
@staticmethod
86
def retry(should_retry: callable):
87
"""
88
Decorator for implementing retry logic on AWS API calls.
89
90
Parameters:
91
- should_retry: Function to determine if retry should occur
92
93
Returns:
94
Decorator function
95
"""
96
```
97
98
### AWS Connection Configuration
99
100
Connection configuration classes for managing AWS authentication settings.
101
102
```python { .api }
103
class AwsGenericHook(AwsBaseHook):
104
def __init__(self, aws_conn_id: str = 'aws_default', client_type: str = None, **kwargs):
105
"""
106
Generic AWS hook for any AWS service.
107
108
Parameters:
109
- aws_conn_id: Airflow connection ID for AWS credentials
110
- client_type: AWS service client type
111
"""
112
113
def get_conn(self) -> Any:
114
"""
115
Get AWS service client connection.
116
117
Returns:
118
Configured AWS service client
119
"""
120
```
121
122
### Connection Utilities
123
124
Utility classes and functions for connection management and configuration.
125
126
```python { .api }
127
class ConnectionWrapper:
128
def __init__(self, conn: Any, region_name: str = None):
129
"""
130
Wrapper for Airflow connections with AWS-specific enhancements.
131
132
Parameters:
133
- conn: Airflow connection object
134
- region_name: AWS region name
135
"""
136
137
@property
138
def extra_config(self) -> dict:
139
"""
140
Get connection extra configuration.
141
142
Returns:
143
Extra configuration dictionary
144
"""
145
146
@property
147
def aws_access_key_id(self) -> str:
148
"""
149
Get AWS access key ID.
150
151
Returns:
152
AWS access key ID
153
"""
154
155
@property
156
def aws_secret_access_key(self) -> str:
157
"""
158
Get AWS secret access key.
159
160
Returns:
161
AWS secret access key
162
"""
163
164
@property
165
def aws_session_token(self) -> str:
166
"""
167
Get AWS session token.
168
169
Returns:
170
AWS session token for temporary credentials
171
"""
172
173
@property
174
def role_arn(self) -> str:
175
"""
176
Get IAM role ARN for assuming roles.
177
178
Returns:
179
IAM role ARN
180
"""
181
182
@property
183
def region_name(self) -> str:
184
"""
185
Get AWS region name.
186
187
Returns:
188
AWS region name
189
"""
190
191
@property
192
def external_id(self) -> str:
193
"""
194
Get external ID for role assumption.
195
196
Returns:
197
External ID for cross-account role assumption
198
"""
199
200
@property
201
def config_kwargs(self) -> dict:
202
"""
203
Get configuration arguments for AWS clients.
204
205
Returns:
206
Configuration dictionary for boto3 clients
207
"""
208
209
def trim_none_values(config: dict) -> dict:
210
"""
211
Remove None values from configuration dictionary.
212
213
Parameters:
214
- config: Configuration dictionary
215
216
Returns:
217
Dictionary with None values removed
218
"""
219
```
220
221
## Usage Examples
222
223
### Basic Authentication Setup
224
225
```python
226
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
227
228
# Using default connection
229
hook = AwsBaseHook(aws_conn_id='aws_default')
230
231
# Get S3 client
232
s3_client = hook.get_client_type('s3', region_name='us-east-1')
233
234
# Get credentials for manual use
235
credentials = hook.get_credentials()
236
print(f"Access Key: {credentials['aws_access_key_id']}")
237
print(f"Region: {credentials['region_name']}")
238
```
239
240
### Custom Connection Configuration
241
242
```python
243
# Using specific connection with custom config
244
hook = AwsBaseHook(
245
aws_conn_id='my_aws_prod',
246
region_name='us-west-2',
247
config={
248
'retries': {
249
'max_attempts': 10,
250
'mode': 'adaptive'
251
},
252
'max_pool_connections': 50
253
}
254
)
255
256
# Get Lambda client with custom configuration
257
lambda_client = hook.get_client_type('lambda')
258
```
259
260
### Cross-Service Authentication
261
262
```python
263
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
264
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook
265
266
# Both hooks use the same authentication
267
s3_hook = S3Hook(aws_conn_id='aws_production')
268
lambda_hook = LambdaHook(aws_conn_id='aws_production')
269
270
# Upload file to S3
271
s3_hook.load_file('/local/data.json', 'processed/data.json', 'my-bucket')
272
273
# Trigger Lambda function to process the file
274
lambda_hook.invoke_lambda(
275
function_name='data-processor',
276
payload='{"bucket": "my-bucket", "key": "processed/data.json"}'
277
)
278
```
279
280
### Role Assumption
281
282
```python
283
# Hook configured to assume cross-account role
284
hook = AwsBaseHook(aws_conn_id='cross_account_role')
285
286
# The hook will automatically assume the role specified in the connection
287
# and use temporary credentials for all API calls
288
ec2_client = hook.get_client_type('ec2', region_name='us-east-1')
289
290
# List instances in the cross-account environment
291
instances = ec2_client.describe_instances()
292
```
293
294
### Connection Configuration in Airflow
295
296
```python
297
# Example connection configuration via Airflow UI or environment variables
298
299
# Standard IAM User credentials:
300
# Connection ID: aws_default
301
# Connection Type: Amazon Web Services
302
# Login: AKIA... (AWS Access Key ID)
303
# Password: ... (AWS Secret Access Key)
304
# Extra: {"region_name": "us-east-1"}
305
306
# IAM Role assumption:
307
# Connection ID: aws_role
308
# Connection Type: Amazon Web Services
309
# Extra: {
310
# "role_arn": "arn:aws:iam::123456789012:role/AirflowExecutionRole",
311
# "region_name": "us-east-1",
312
# "external_id": "unique-external-id"
313
# }
314
315
# Temporary credentials:
316
# Connection ID: aws_temp
317
# Connection Type: Amazon Web Services
318
# Login: ASIA... (Temporary Access Key ID)
319
# Password: ... (Temporary Secret Access Key)
320
# Extra: {
321
# "aws_session_token": "...",
322
# "region_name": "us-east-1"
323
# }
324
```
325
326
### Advanced Configuration
327
328
```python
329
from airflow.providers.amazon.aws.utils.connection_wrapper import ConnectionWrapper
330
331
# Manual connection wrapper usage
332
from airflow.models import Connection
333
334
conn = Connection(
335
conn_id='manual_aws',
336
conn_type='aws',
337
login='AKIA...',
338
password='...',
339
extra='{"region_name": "eu-west-1", "role_arn": "arn:aws:iam::123456789012:role/DataProcessingRole"}'
340
)
341
342
wrapper = ConnectionWrapper(conn)
343
config = wrapper.config_kwargs
344
345
# Use configuration with boto3 directly
346
import boto3
347
session = boto3.Session(**config)
348
s3 = session.client('s3')
349
```
350
351
### Error Handling and Retries
352
353
```python
354
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
355
from botocore.exceptions import ClientError
356
357
class MyCustomHook(AwsBaseHook):
358
@AwsBaseHook.retry(lambda e: e.response['Error']['Code'] in ['Throttling', 'ServiceUnavailable'])
359
def my_api_call(self):
360
"""API call with automatic retry on specific errors."""
361
client = self.get_client_type('s3')
362
try:
363
return client.list_buckets()
364
except ClientError as e:
365
if e.response['Error']['Code'] == 'AccessDenied':
366
self.log.error("Access denied - check IAM permissions")
367
raise
368
else:
369
self.log.warning(f"API call failed: {e}")
370
raise
371
```
372
373
## Connection Types
374
375
### AWS Connection Types
376
377
The provider supports the following connection types in Airflow:
378
379
```python { .api }
380
# Connection type identifiers
381
AWS_CONNECTION_TYPE = 'aws'
382
REDSHIFT_CONNECTION_TYPE = 'redshift'
383
EMR_CONNECTION_TYPE = 'emr'
384
385
# Connection configuration keys
386
class ConnectionConfigKeys:
387
REGION_NAME = 'region_name'
388
ROLE_ARN = 'role_arn'
389
EXTERNAL_ID = 'external_id'
390
AWS_ACCESS_KEY_ID = 'aws_access_key_id'
391
AWS_SECRET_ACCESS_KEY = 'aws_secret_access_key'
392
AWS_SESSION_TOKEN = 'aws_session_token'
393
ENDPOINT_URL = 'endpoint_url'
394
CONFIG_KWARGS = 'config_kwargs'
395
```
396
397
## Types
398
399
```python { .api }
400
# AWS credential types
401
class AwsCredentials:
402
aws_access_key_id: str
403
aws_secret_access_key: str
404
aws_session_token: str = None
405
region_name: str = 'us-east-1'
406
407
# Connection configuration
408
class AwsConnectionConfig:
409
aws_conn_id: str = 'aws_default'
410
region_name: str = None
411
role_arn: str = None
412
external_id: str = None
413
verify: bool = True
414
endpoint_url: str = None
415
config: dict = None
416
417
# Session configuration
418
class SessionConfig:
419
aws_access_key_id: str = None
420
aws_secret_access_key: str = None
421
aws_session_token: str = None
422
region_name: str = None
423
botocore_session: Any = None
424
profile_name: str = None
425
426
# Client configuration
427
class ClientConfig:
428
region_name: str = None
429
api_version: str = None
430
use_ssl: bool = True
431
verify: bool = None
432
endpoint_url: str = None
433
aws_access_key_id: str = None
434
aws_secret_access_key: str = None
435
aws_session_token: str = None
436
config: Any = None # botocore.config.Config
437
438
# Authentication methods
439
class AuthMethod:
440
IAM_USER = 'iam_user'
441
IAM_ROLE = 'iam_role'
442
INSTANCE_PROFILE = 'instance_profile'
443
CONTAINER_CREDENTIALS = 'container_credentials'
444
EXTERNAL_ID = 'external_id'
445
WEB_IDENTITY_TOKEN = 'web_identity_token'
446
```