or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

s3-storage.mddocs/

0

# S3 Storage Operations

1

2

Amazon S3 (Simple Storage Service) integration providing comprehensive bucket and object management capabilities. This includes creation, deletion, copying, transformation, and monitoring of S3 resources within Airflow workflows.

3

4

## Capabilities

5

6

### S3 Hook

7

8

Core S3 client providing low-level AWS S3 API access with authentication and connection management.

9

10

```python { .api }

11

class S3Hook(AwsBaseHook):

12

def __init__(self, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):

13

"""

14

Initialize S3 Hook.

15

16

Parameters:

17

- aws_conn_id: AWS connection ID

18

- verify: SSL certificate verification

19

"""

20

21

def create_bucket(self, bucket_name: str, region_name: str = None) -> bool:

22

"""

23

Create an S3 bucket.

24

25

Parameters:

26

- bucket_name: Name of the bucket to create

27

- region_name: AWS region for bucket creation

28

29

Returns:

30

True if bucket created successfully

31

"""

32

33

def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:

34

"""

35

Delete an S3 bucket.

36

37

Parameters:

38

- bucket_name: Name of the bucket to delete

39

- force_delete: Delete bucket even if not empty

40

"""

41

42

def list_keys(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:

43

"""

44

List keys in an S3 bucket.

45

46

Parameters:

47

- bucket_name: Name of the bucket

48

- prefix: Key prefix to filter results

49

- delimiter: Delimiter for grouping keys

50

- page_size: Number of items per page

51

- max_items: Maximum number of items to return

52

53

Returns:

54

List of key names

55

"""

56

57

def list_prefixes(self, bucket_name: str, prefix: str = '', delimiter: str = '', page_size: int = None, max_items: int = None) -> list:

58

"""

59

List prefixes in an S3 bucket.

60

61

Parameters:

62

- bucket_name: Name of the bucket

63

- prefix: Key prefix to filter results

64

- delimiter: Delimiter for grouping keys

65

- page_size: Number of items per page

66

- max_items: Maximum number of items to return

67

68

Returns:

69

List of prefix names

70

"""

71

72

def check_for_bucket(self, bucket_name: str) -> bool:

73

"""

74

Check if S3 bucket exists.

75

76

Parameters:

77

- bucket_name: Name of the bucket to check

78

79

Returns:

80

True if bucket exists

81

"""

82

83

def get_key(self, key: str, bucket_name: str = None) -> Any:

84

"""

85

Get S3 key object.

86

87

Parameters:

88

- key: S3 key name

89

- bucket_name: Name of the bucket

90

91

Returns:

92

S3 key object

93

"""

94

95

def check_for_key(self, key: str, bucket_name: str = None) -> bool:

96

"""

97

Check if S3 key exists.

98

99

Parameters:

100

- key: S3 key name

101

- bucket_name: Name of the bucket

102

103

Returns:

104

True if key exists

105

"""

106

107

def get_key_size(self, key: str, bucket_name: str = None) -> int:

108

"""

109

Get size of S3 key in bytes.

110

111

Parameters:

112

- key: S3 key name

113

- bucket_name: Name of the bucket

114

115

Returns:

116

Size in bytes

117

"""

118

119

def copy_object(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, **kwargs) -> None:

120

"""

121

Copy an S3 object.

122

123

Parameters:

124

- source_bucket_key: Source S3 key

125

- dest_bucket_key: Destination S3 key

126

- source_bucket_name: Source bucket name

127

- dest_bucket_name: Destination bucket name

128

"""

129

130

def delete_objects(self, bucket: str, keys: list) -> None:

131

"""

132

Delete multiple S3 objects.

133

134

Parameters:

135

- bucket: Name of the bucket

136

- keys: List of key names to delete

137

"""

138

139

def download_file(self, key: str, bucket_name: str, local_path: str, **kwargs) -> str:

140

"""

141

Download S3 object to local file.

142

143

Parameters:

144

- key: S3 key name

145

- bucket_name: Name of the bucket

146

- local_path: Local file path for download

147

148

Returns:

149

Local file path

150

"""

151

152

def upload_file(self, filename: str, key: str, bucket_name: str = None, **kwargs) -> None:

153

"""

154

Upload local file to S3.

155

156

Parameters:

157

- filename: Local file path to upload

158

- key: S3 key name for uploaded file

159

- bucket_name: Name of the bucket

160

"""

161

162

def load_file(self, filename: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:

163

"""

164

Load local file to S3 key.

165

166

Parameters:

167

- filename: Local file path to load

168

- key: S3 key name

169

- bucket_name: Name of the bucket

170

- replace: Replace existing key if it exists

171

"""

172

173

def load_string(self, string_data: str, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:

174

"""

175

Load string data to S3 key.

176

177

Parameters:

178

- string_data: String data to upload

179

- key: S3 key name

180

- bucket_name: Name of the bucket

181

- replace: Replace existing key if it exists

182

"""

183

184

def load_bytes(self, bytes_data: bytes, key: str, bucket_name: str = None, replace: bool = True, **kwargs) -> None:

185

"""

186

Load bytes data to S3 key.

187

188

Parameters:

189

- bytes_data: Bytes data to upload

190

- key: S3 key name

191

- bucket_name: Name of the bucket

192

- replace: Replace existing key if it exists

193

"""

194

195

def read_key(self, key: str, bucket_name: str = None) -> str:

196

"""

197

Read S3 key content as string.

198

199

Parameters:

200

- key: S3 key name

201

- bucket_name: Name of the bucket

202

203

Returns:

204

Key content as string

205

"""

206

207

def generate_presigned_url(self, client_method: str, params: dict = None, expires_in: int = 3600, http_method: str = None) -> str:

208

"""

209

Generate presigned URL for S3 operations.

210

211

Parameters:

212

- client_method: S3 client method name

213

- params: Parameters for the method

214

- expires_in: URL expiration time in seconds

215

- http_method: HTTP method for the URL

216

217

Returns:

218

Presigned URL

219

"""

220

```

221

222

### S3 Operators

223

224

Task implementations for S3 operations that can be used directly in Airflow DAGs.

225

226

```python { .api }

227

class S3CreateBucketOperator(BaseOperator):

228

def __init__(self, bucket_name: str, aws_conn_id: str = 'aws_default', region_name: str = None, **kwargs):

229

"""

230

Create an S3 bucket.

231

232

Parameters:

233

- bucket_name: Name of the bucket to create

234

- aws_conn_id: AWS connection ID

235

- region_name: AWS region for bucket creation

236

"""

237

238

class S3DeleteBucketOperator(BaseOperator):

239

def __init__(self, bucket_name: str, force_delete: bool = False, aws_conn_id: str = 'aws_default', **kwargs):

240

"""

241

Delete an S3 bucket.

242

243

Parameters:

244

- bucket_name: Name of the bucket to delete

245

- force_delete: Delete bucket even if not empty

246

- aws_conn_id: AWS connection ID

247

"""

248

249

class S3DeleteObjectsOperator(BaseOperator):

250

def __init__(self, bucket: str, keys: list, aws_conn_id: str = 'aws_default', **kwargs):

251

"""

252

Delete multiple S3 objects.

253

254

Parameters:

255

- bucket: Name of the bucket

256

- keys: List of key names to delete

257

- aws_conn_id: AWS connection ID

258

"""

259

260

class S3CopyObjectOperator(BaseOperator):

261

def __init__(self, source_bucket_key: str, dest_bucket_key: str, source_bucket_name: str = None, dest_bucket_name: str = None, aws_conn_id: str = 'aws_default', **kwargs):

262

"""

263

Copy an S3 object.

264

265

Parameters:

266

- source_bucket_key: Source S3 key

267

- dest_bucket_key: Destination S3 key

268

- source_bucket_name: Source bucket name

269

- dest_bucket_name: Destination bucket name

270

- aws_conn_id: AWS connection ID

271

"""

272

273

class S3CreateObjectOperator(BaseOperator):

274

def __init__(self, s3_bucket: str, s3_key: str, data: Any, replace: bool = True, aws_conn_id: str = 'aws_default', **kwargs):

275

"""

276

Create an S3 object with provided data.

277

278

Parameters:

279

- s3_bucket: Name of the bucket

280

- s3_key: S3 key name

281

- data: Data to write to S3 object

282

- replace: Replace existing object if it exists

283

- aws_conn_id: AWS connection ID

284

"""

285

286

class S3FileTransformOperator(BaseOperator):

287

def __init__(self, source_s3_key: str, dest_s3_key: str, transform_script: str, source_aws_conn_id: str = 'aws_default', dest_aws_conn_id: str = 'aws_default', **kwargs):

288

"""

289

Transform S3 file using provided script.

290

291

Parameters:

292

- source_s3_key: Source S3 key

293

- dest_s3_key: Destination S3 key

294

- transform_script: Transformation script to apply

295

- source_aws_conn_id: Source AWS connection ID

296

- dest_aws_conn_id: Destination AWS connection ID

297

"""

298

299

class S3ListOperator(BaseOperator):

300

def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):

301

"""

302

List S3 objects in a bucket.

303

304

Parameters:

305

- bucket: Name of the bucket

306

- prefix: Key prefix to filter results

307

- delimiter: Delimiter for grouping keys

308

- aws_conn_id: AWS connection ID

309

"""

310

311

class S3ListPrefixesOperator(BaseOperator):

312

def __init__(self, bucket: str, prefix: str = '', delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):

313

"""

314

List S3 prefixes in a bucket.

315

316

Parameters:

317

- bucket: Name of the bucket

318

- prefix: Key prefix to filter results

319

- delimiter: Delimiter for grouping keys

320

- aws_conn_id: AWS connection ID

321

"""

322

```

323

324

### S3 Sensors

325

326

Monitoring tasks that wait for specific S3 conditions or states.

327

328

```python { .api }

329

class S3KeySensor(BaseSensorOperator):

330

def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', verify: bool = None, **kwargs):

331

"""

332

Wait for S3 key to exist.

333

334

Parameters:

335

- bucket_name: Name of the bucket

336

- bucket_key: S3 key name to wait for

337

- wildcard_match: Use wildcard matching for key name

338

- aws_conn_id: AWS connection ID

339

- verify: SSL certificate verification

340

"""

341

342

class S3KeySizeSensor(BaseSensorOperator):

343

def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):

344

"""

345

Wait for S3 key size condition.

346

347

Parameters:

348

- bucket_name: Name of the bucket

349

- bucket_key: S3 key name to check

350

- check_fn: Function to check key size condition

351

- aws_conn_id: AWS connection ID

352

"""

353

354

class S3KeysUnchangedSensor(BaseSensorOperator):

355

def __init__(self, bucket_name: str, prefix: str, aws_conn_id: str = 'aws_default', inactivity_period: int = 60*60, min_objects: int = 1, **kwargs):

356

"""

357

Wait for S3 keys to be unchanged for specified period.

358

359

Parameters:

360

- bucket_name: Name of the bucket

361

- prefix: Key prefix to monitor

362

- aws_conn_id: AWS connection ID

363

- inactivity_period: Inactivity period in seconds

364

- min_objects: Minimum number of objects required

365

"""

366

367

class S3PrefixSensor(BaseSensorOperator):

368

def __init__(self, bucket_name: str, prefix: str, delimiter: str = '', aws_conn_id: str = 'aws_default', **kwargs):

369

"""

370

Wait for S3 prefix to exist.

371

372

Parameters:

373

- bucket_name: Name of the bucket

374

- prefix: Prefix to wait for

375

- delimiter: Delimiter for prefix matching

376

- aws_conn_id: AWS connection ID

377

"""

378

```

379

380

### S3 Triggers

381

382

Asynchronous triggers for efficient S3 monitoring without blocking Airflow workers.

383

384

```python { .api }

385

class S3KeyTrigger(BaseTrigger):

386

def __init__(self, bucket_name: str, bucket_key: str, wildcard_match: bool = False, aws_conn_id: str = 'aws_default', **kwargs):

387

"""

388

Asynchronous trigger for S3 key existence.

389

390

Parameters:

391

- bucket_name: Name of the bucket

392

- bucket_key: S3 key name to wait for

393

- wildcard_match: Use wildcard matching for key name

394

- aws_conn_id: AWS connection ID

395

"""

396

397

class S3KeySizeTrigger(BaseTrigger):

398

def __init__(self, bucket_name: str, bucket_key: str, check_fn: callable = None, aws_conn_id: str = 'aws_default', **kwargs):

399

"""

400

Asynchronous trigger for S3 key size condition.

401

402

Parameters:

403

- bucket_name: Name of the bucket

404

- bucket_key: S3 key name to check

405

- check_fn: Function to check key size condition

406

- aws_conn_id: AWS connection ID

407

"""

408

```

409

410

## Usage Examples

411

412

### Basic S3 Operations

413

414

```python

415

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

416

417

# Initialize hook

418

s3_hook = S3Hook(aws_conn_id='my_aws_conn')

419

420

# Create bucket

421

s3_hook.create_bucket('my-data-bucket', region_name='us-east-1')

422

423

# Upload file

424

s3_hook.load_file('/local/path/data.csv', 'uploads/data.csv', 'my-data-bucket')

425

426

# Check if file exists

427

exists = s3_hook.check_for_key('uploads/data.csv', 'my-data-bucket')

428

429

# Download file

430

s3_hook.download_file('uploads/data.csv', 'my-data-bucket', '/local/path/downloaded.csv')

431

432

# List objects with prefix

433

objects = s3_hook.list_keys('my-data-bucket', prefix='uploads/')

434

```

435

436

### S3 DAG Operations

437

438

```python

439

from airflow import DAG

440

from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3CreateObjectOperator

441

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

442

443

dag = DAG('s3_workflow', start_date=datetime(2023, 1, 1))

444

445

create_bucket = S3CreateBucketOperator(

446

task_id='create_bucket',

447

bucket_name='my-processing-bucket',

448

aws_conn_id='aws_default',

449

dag=dag

450

)

451

452

upload_config = S3CreateObjectOperator(

453

task_id='upload_config',

454

s3_bucket='my-processing-bucket',

455

s3_key='config/settings.json',

456

data='{"version": "1.0", "environment": "prod"}',

457

dag=dag

458

)

459

460

wait_for_data = S3KeySensor(

461

task_id='wait_for_data',

462

bucket_name='my-processing-bucket',

463

bucket_key='input/{{ ds }}/data.parquet',

464

timeout=3600,

465

dag=dag

466

)

467

468

create_bucket >> upload_config >> wait_for_data

469

```

470

471

## Types

472

473

```python { .api }

474

# S3 key and bucket identifiers

475

BucketName = str

476

KeyName = str

477

S3Uri = str # Format: s3://bucket/key

478

479

# S3 object metadata

480

class S3ObjectMetadata:

481

key: str

482

size: int

483

last_modified: datetime

484

etag: str

485

storage_class: str

486

487

# S3 connection configuration

488

class S3Config:

489

aws_access_key_id: str

490

aws_secret_access_key: str

491

region_name: str = 'us-east-1'

492

endpoint_url: str = None

493

verify: bool = True

494

```