0
# S3 Storage Operations
1
2
Amazon S3 (Simple Storage Service) integration providing comprehensive bucket and object management capabilities. This includes creation, deletion, copying, transformation, and monitoring of S3 resources within Airflow workflows.
3
4
## Capabilities
5
6
### S3 Hook
7
8
Core S3 client providing low-level AWS S3 API access with authentication and connection management.
9
10
```python { .api }
11
class S3Hook(AwsBaseHook):
12
def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
13
"""
14
Initialize S3 Hook.
15
16
Parameters:
17
- aws_conn_id: AWS connection ID
18
- verify: SSL certificate verification
19
"""
20
21
def create_bucket(self, bucket_name: str, region_name: str = None) -> bool:
22
"""
23
Create an S3 bucket.
24
25
Parameters:
26
- bucket_name: Name of the bucket to create
27
- region_name: AWS region for bucket creation
28
29
Returns:
30
True if bucket created successfully
31
"""
32
33
def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:
34
"""
35
Delete an S3 bucket.
36
37
Parameters:
38
- bucket_name: Name of the bucket to delete
39
- force_delete: Delete bucket even if not empty
40
"""
41
42
def list_keys(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
43
"""
44
List keys in an S3 bucket.
45
46
Parameters:
47
- bucket_name: Name of the bucket
48
- prefix: Key prefix to filter results
49
- delimiter: Delimiter for grouping keys
50
- page_size: Number of items per page
51
- max_items: Maximum number of items to return
52
53
Returns:
54
List of key names
55
"""
56
57
def list_prefixes(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:
58
"""
59
List prefixes in an S3 bucket.
60
61
Parameters:
62
- bucket_name: Name of the bucket
63
- prefix: Key prefix to filter results
64
- delimiter: Delimiter for grouping keys
65
- page_size: Number of items per page
66
- max_items: Maximum number of items to return
67
68
Returns:
69
List of prefix names
70
"""
71
72
def check_for_bucket(self, bucket_name: str) -> bool:
73
"""
74
Check if S3 bucket exists.
75
76
Parameters:
77
- bucket_name: Name of the bucket to check
78
79
Returns:
80
True if bucket exists
81
"""
82
83
def get_key(self, key: str, bucket_name: str = None) -> Any:
84
"""
85
Get S3 key object.
86
87
Parameters:
88
- key: S3 key name
89
- bucket_name: Name of the bucket
90
91
Returns:
92
S3 key object
93
"""
94
95
def check_for_key(self, key: str, bucket_name: str = None) -> bool:
96
"""
97
Check if S3 key exists.
98
99
Parameters:
100
- key: S3 key name
101
- bucket_name: Name of the bucket
102
103
Returns:
104
True if key exists
105
"""
106
107
def get_key_size(self, key: str, bucket_name: str = None) -> int:
108
"""
109
Get size of S3 key in bytes.
110
111
Parameters:
112
- key: S3 key name
113
- bucket_name: Name of the bucket
114
115
Returns:
116
Size in bytes
117
"""
118
119
def copy_object(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, **kwargs) -> None:
120
"""
121
Copy an S3 object.
122
123
Parameters:
124
- source_bucket_key: Source S3 key
125
- dest_bucket_key: Destination S3 key
126
- source_bucket_name: Source bucket name
127
- dest_bucket_name: Destination bucket name
128
"""
129
130
def delete_objects(self, bucket: str, keys: list) -> None:
131
"""
132
Delete multiple S3 objects.
133
134
Parameters:
135
- bucket: Name of the bucket
136
- keys: List of key names to delete
137
"""
138
139
def download_file(self, key: str, bucket_name: str, local_path: str, **kwargs) -> str:
140
"""
141
Download S3 object to local file.
142
143
Parameters:
144
- key: S3 key name
145
- bucket_name: Name of the bucket
146
- local_path: Local file path for download
147
148
Returns:
149
Local file path
150
"""
151
152
def upload_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None:
153
"""
154
Upload local file to S3.
155
156
Parameters:
157
- filename: Local file path to upload
158
- key: S3 key name for uploaded file
159
- bucket_name: Name of the bucket
160
"""
161
162
def load_file(self, filename: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
163
"""
164
Load local file to S3 key.
165
166
Parameters:
167
- filename: Local file path to load
168
- key: S3 key name
169
- bucket_name: Name of the bucket
170
- replace: Replace existing key if it exists
171
"""
172
173
def load_string(self, string_data: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
174
"""
175
Load string data to S3 key.
176
177
Parameters:
178
- string_data: String data to upload
179
- key: S3 key name
180
- bucket_name: Name of the bucket
181
- replace: Replace existing key if it exists
182
"""
183
184
def load_bytes(self, bytes_data: bytes, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:
185
"""
186
Load bytes data to S3 key.
187
188
Parameters:
189
- bytes_data: Bytes data to upload
190
- key: S3 key name
191
- bucket_name: Name of the bucket
192
- replace: Replace existing key if it exists
193
"""
194
195
def read_key(self, key: str, bucket_name: str = None) -> str:
196
"""
197
Read S3 key content as string.
198
199
Parameters:
200
- key: S3 key name
201
- bucket_name: Name of the bucket
202
203
Returns:
204
Key content as string
205
"""
206
207
def generate_presigned_url(self, client_method: str, params: dict = None, expires_in: int = 3600, http_method: str = None) -> str:
208
"""
209
Generate presigned URL for S3 operations.
210
211
Parameters:
212
- client_method: S3 client method name
213
- params: Parameters for the method
214
- expires_in: URL expiration time in seconds
215
- http_method: HTTP method for the URL
216
217
Returns:
218
Presigned URL
219
"""
220
```
221
222
### S3 Operators
223
224
Task implementations for S3 operations that can be used directly in Airflow DAGs.
225
226
```python { .api }
227
class S3CreateBucketOperator(BaseOperator):
228
def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):
229
"""
230
Create an S3 bucket.
231
232
Parameters:
233
- bucket_name: Name of the bucket to create
234
- aws_conn_id: AWS connection ID
235
- region_name: AWS region for bucket creation
236
"""
237
238
class S3DeleteBucketOperator(BaseOperator):
239
def __init__(self, bucket_name: str, force_delete: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
240
"""
241
Delete an S3 bucket.
242
243
Parameters:
244
- bucket_name: Name of the bucket to delete
245
- force_delete: Delete bucket even if not empty
246
- aws_conn_id: AWS connection ID
247
"""
248
249
class S3DeleteObjectsOperator(BaseOperator):
250
def __init__(self, bucket: str, keys: list, aws_conn_id: str = 'aws_default', **kwargs):
251
"""
252
Delete multiple S3 objects.
253
254
Parameters:
255
- bucket: Name of the bucket
256
- keys: List of key names to delete
257
- aws_conn_id: AWS connection ID
258
"""
259
260
class S3CopyObjectOperator(BaseOperator):
261
def __init__(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, aws_conn_id: str = 'aws_default', **kwargs):
262
"""
263
Copy an S3 object.
264
265
Parameters:
266
- source_bucket_key: Source S3 key
267
- dest_bucket_key: Destination S3 key
268
- source_bucket_name: Source bucket name
269
- dest_bucket_name: Destination bucket name
270
- aws_conn_id: AWS connection ID
271
"""
272
273
class S3CreateObjectOperator(BaseOperator):
274
def __init__(self, s3_bucket: str, s3_key: str, data: Any, replace: bool = True, aws_conn_id: str = 'aws_default', **kwargs):
275
"""
276
Create an S3 object with provided data.
277
278
Parameters:
279
- s3_bucket: Name of the bucket
280
- s3_key: S3 key name
281
- data: Data to write to S3 object
282
- replace: Replace existing object if it exists
283
- aws_conn_id: AWS connection ID
284
"""
285
286
class S3FileTransformOperator(BaseOperator):
287
def __init__(self, source_s3_key: str, dest_s3_key: str, transform_script: str, source_aws_conn_id: str = 'aws_default', dest_aws_conn_id: str = 'aws_default', **kwargs):
288
"""
289
Transform S3 file using provided script.
290
291
Parameters:
292
- source_s3_key: Source S3 key
293
- dest_s3_key: Destination S3 key
294
- transform_script: Transformation script to apply
295
- source_aws_conn_id: Source AWS connection ID
296
- dest_aws_conn_id: Destination AWS connection ID
297
"""
298
299
class S3ListOperator(BaseOperator):
300
def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
301
"""
302
List S3 objects in a bucket.
303
304
Parameters:
305
- bucket: Name of the bucket
306
- prefix: Key prefix to filter results
307
- delimiter: Delimiter for grouping keys
308
- aws_conn_id: AWS connection ID
309
"""
310
311
class S3ListPrefixesOperator(BaseOperator):
312
def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
313
"""
314
List S3 prefixes in a bucket.
315
316
Parameters:
317
- bucket: Name of the bucket
318
- prefix: Key prefix to filter results
319
- delimiter: Delimiter for grouping keys
320
- aws_conn_id: AWS connection ID
321
"""
322
```
323
324
### S3 Sensors
325
326
Monitoring tasks that wait for specific S3 conditions or states.
327
328
```python { .api }
329
class S3KeySensor(BaseSensorOperator):
330
def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):
331
"""
332
Wait for S3 key to exist.
333
334
Parameters:
335
- bucket_name: Name of the bucket
336
- bucket_key: S3 key name to wait for
337
- wildcard_match: Use wildcard matching for key name
338
- aws_conn_id: AWS connection ID
339
- verify: SSL certificate verification
340
"""
341
342
class S3KeySizeSensor(BaseSensorOperator):
343
def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
344
"""
345
Wait for S3 key size condition.
346
347
Parameters:
348
- bucket_name: Name of the bucket
349
- bucket_key: S3 key name to check
350
- check_fn: Function to check key size condition
351
- aws_conn_id: AWS connection ID
352
"""
353
354
class S3KeysUnchangedSensor(BaseSensorOperator):
355
def __init__(self, bucket_name: str, prefix: str, aws_conn_id: str = 'aws_default', inactivity_period: int = 60*60, min_objects: int = 1, **kwargs):
356
"""
357
Wait for S3 keys to be unchanged for specified period.
358
359
Parameters:
360
- bucket_name: Name of the bucket
361
- prefix: Key prefix to monitor
362
- aws_conn_id: AWS connection ID
363
- inactivity_period: Inactivity period in seconds
364
- min_objects: Minimum number of objects required
365
"""
366
367
class S3PrefixSensor(BaseSensorOperator):
368
def __init__(self, bucket_name: str, prefix: str, delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):
369
"""
370
Wait for S3 prefix to exist.
371
372
Parameters:
373
- bucket_name: Name of the bucket
374
- prefix: Prefix to wait for
375
- delimiter: Delimiter for prefix matching
376
- aws_conn_id: AWS connection ID
377
"""
378
```
379
380
### S3 Triggers
381
382
Asynchronous triggers for efficient S3 monitoring without blocking Airflow workers.
383
384
```python { .api }
385
class S3KeyTrigger(BaseTrigger):
386
def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', **kwargs):
387
"""
388
Asynchronous trigger for S3 key existence.
389
390
Parameters:
391
- bucket_name: Name of the bucket
392
- bucket_key: S3 key name to wait for
393
- wildcard_match: Use wildcard matching for key name
394
- aws_conn_id: AWS connection ID
395
"""
396
397
class S3KeySizeTrigger(BaseTrigger):
398
def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):
399
"""
400
Asynchronous trigger for S3 key size condition.
401
402
Parameters:
403
- bucket_name: Name of the bucket
404
- bucket_key: S3 key name to check
405
- check_fn: Function to check key size condition
406
- aws_conn_id: AWS connection ID
407
"""
408
```
409
410
## Usage Examples
411
412
### Basic S3 Operations
413
414
```python
415
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
416
417
# Initialize hook
418
s3_hook = S3Hook(aws_conn_id='my_aws_conn')
419
420
# Create bucket
421
s3_hook.create_bucket('my-data-bucket', region_name='us-east-1')
422
423
# Upload file
424
s3_hook.load_file('/local/path/data.csv', 'uploads/data.csv', 'my-data-bucket')
425
426
# Check if file exists
427
exists = s3_hook.check_for_key('uploads/data.csv', 'my-data-bucket')
428
429
# Download file
430
s3_hook.download_file('uploads/data.csv', 'my-data-bucket', '/local/path/downloaded.csv')
431
432
# List objects with prefix
433
objects = s3_hook.list_keys('my-data-bucket', prefix='uploads/')
434
```
435
436
### S3 DAG Operations
437
438
```python
439
from airflow import DAG
440
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3CreateObjectOperator
441
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
442
443
dag = DAG('s3_workflow', start_date=datetime(2023, 1, 1))
444
445
create_bucket = S3CreateBucketOperator(
446
task_id='create_bucket',
447
bucket_name='my-processing-bucket',
448
aws_conn_id='aws_default',
449
dag=dag
450
)
451
452
upload_config = S3CreateObjectOperator(
453
task_id='upload_config',
454
s3_bucket='my-processing-bucket',
455
s3_key='config/settings.json',
456
data='{"version": "1.0", "environment": "prod"}',
457
dag=dag
458
)
459
460
wait_for_data = S3KeySensor(
461
task_id='wait_for_data',
462
bucket_name='my-processing-bucket',
463
bucket_key='input/{{ ds }}/data.parquet',
464
timeout=3600,
465
dag=dag
466
)
467
468
create_bucket >> upload_config >> wait_for_data
469
```
470
471
## Types
472
473
```python { .api }
474
# S3 key and bucket identifiers
475
BucketName = str
476
KeyName = str
477
S3Uri = str # Format: s3://bucket/key
478
479
# S3 object metadata
480
class S3ObjectMetadata:
481
key: str
482
size: int
483
last_modified: datetime
484
etag: str
485
storage_class: str
486
487
# S3 connection configuration
488
class S3Config:
489
aws_access_key_id: str
490
aws_secret_access_key: str
491
region_name: str = 'us-east-1'
492
endpoint_url: str = None
493
verify: bool = True
494
```