0
# S3 Bucket Operations
1
2
Direct S3 bucket interface for file operations, listing, searching, and SQL queries. Provides high-level abstractions over AWS S3 operations.
3
4
## Type Imports
5
6
```python { .api }
7
from typing import Union
8
```
9
10
## Capabilities
11
12
### Bucket Initialization
13
14
Create bucket interface for S3 operations.
15
16
```python { .api }
17
class Bucket:
18
def __init__(self, bucket_uri: str):
19
"""
20
Creates a Bucket object.
21
22
Parameters:
23
- bucket_uri: URI of bucket to target. Must start with 's3://'
24
25
Returns:
26
A new Bucket object
27
28
Raises:
29
QuiltException: If bucket_uri is not an S3 URI or contains path/version ID
30
"""
31
```
32
33
### File Upload Operations
34
35
Upload files and directories to S3 buckets.
36
37
```python { .api }
38
class Bucket:
39
def put_file(self, key: str, path: str):
40
"""
41
Stores file at path to key in bucket.
42
43
Parameters:
44
- key: Key in bucket to store file at
45
- path: String representing local path to file
46
47
Raises:
48
- FileNotFoundError: If no file exists at path
49
- Exception: If copy fails
50
"""
51
52
def put_dir(self, key: str, directory: str):
53
"""
54
Stores all files in the directory under the prefix key.
55
56
Parameters:
57
- key: Prefix to store files under in bucket
58
- directory: Path to directory to grab files from
59
60
Raises:
61
- QuiltException: If provided directory does not exist
62
- Exception: If writing to bucket fails
63
"""
64
```
65
66
### File Download Operations
67
68
Download files and directories from S3 buckets.
69
70
```python { .api }
71
class Bucket:
72
def fetch(self, key: str, path: str):
73
"""
74
Fetches file (or files) at key to path.
75
76
Parameters:
77
- key: Key or prefix in bucket to fetch
78
- path: Local path where files will be saved
79
80
Returns:
81
Local path where files were saved
82
83
Note:
84
If key refers to a directory (prefix), all files under that prefix are fetched
85
"""
86
```
87
88
### Bucket Listing and Discovery
89
90
List and discover objects in S3 buckets.
91
92
```python { .api }
93
class Bucket:
94
def keys(self) -> list:
95
"""
96
Lists all keys in the bucket.
97
98
Returns:
99
List of all object keys in the bucket
100
"""
101
102
def ls(self, path: str = None, recursive: bool = False) -> tuple:
103
"""
104
List data from the specified path.
105
106
Parameters:
107
- path: Path prefix to list (None for root)
108
- recursive: Whether to list recursively
109
110
Returns:
111
Tuple of objects and prefixes in the specified path
112
"""
113
```
114
115
### Object Deletion
116
117
Delete objects and directories from S3 buckets.
118
119
```python { .api }
120
class Bucket:
121
def delete(self, key: str):
122
"""
123
Deletes a key from the bucket.
124
125
Parameters:
126
- key: Key to delete from bucket
127
128
Note:
129
This permanently deletes the object from S3
130
"""
131
132
def delete_dir(self, path: str):
133
"""
134
Delete a directory and all of its contents from the bucket.
135
136
Parameters:
137
- path: Directory path to delete recursively
138
139
Note:
140
This permanently deletes all objects under the specified prefix
141
"""
142
```
143
144
### S3 Select and Search
145
146
Query data in S3 objects using SQL and search functionality.
147
148
```python { .api }
149
class Bucket:
150
def select(self, key: str, query: str, raw: bool = False):
151
"""
152
Selects data from an S3 object using SQL query.
153
154
Parameters:
155
- key: S3 object key to query
156
- query: SQL query string
157
- raw: Whether to return raw results
158
159
Returns:
160
Query results (format depends on raw parameter and query)
161
162
Supported formats:
163
- CSV, JSON, Parquet files with SQL SELECT statements
164
- Compression formats: GZIP, BZIP2
165
"""
166
167
def search(self, query: Union[str, dict], limit: int = 10) -> list:
168
"""
169
Execute a search against the configured search endpoint.
170
171
Parameters:
172
- query: Query string or DSL query body
173
- limit: Maximum number of results to return
174
175
Returns:
176
List of search results
177
178
Query Syntax:
179
- String: Elasticsearch Query String Query syntax
180
- Dict: Elasticsearch Query DSL
181
"""
182
```
183
184
## Usage Examples
185
186
### Basic Bucket Operations
187
188
```python
189
import quilt3
190
191
# Create bucket interface
192
bucket = quilt3.Bucket("s3://my-data-bucket")
193
194
# Upload a single file
195
bucket.put_file("data/measurements.csv", "local/path/measurements.csv")
196
197
# Upload entire directory
198
bucket.put_dir("experiment_results/", "local/results/")
199
200
# List bucket contents
201
all_keys = bucket.keys()
202
print(f"Total objects: {len(all_keys)}")
203
204
# List with path prefix
205
data_files = bucket.ls("data/", recursive=True)
206
for item in data_files:
207
print(f"Found: {item}")
208
```
209
210
### File Download and Retrieval
211
212
```python
213
# Download specific file
214
bucket.fetch("data/measurements.csv", "downloaded_measurements.csv")
215
216
# Download entire directory
217
bucket.fetch("experiment_results/", "local_results/")
218
219
# List directory contents first
220
contents = bucket.ls("data/")
221
for item in contents:
222
print(f"Available: {item}")
223
224
# Download multiple files
225
for key in ["data/file1.csv", "data/file2.csv", "data/file3.csv"]:
226
local_name = key.replace("/", "_")
227
bucket.fetch(key, f"downloads/{local_name}")
228
```
229
230
### S3 Select Queries
231
232
```python
233
# Query CSV data
234
csv_query = """
235
SELECT customer_id, purchase_amount, purchase_date
236
FROM S3Object[*]
237
WHERE purchase_amount > 100
238
LIMIT 1000
239
"""
240
241
results = bucket.select("sales/transactions.csv", csv_query)
242
print(f"Found {len(results)} high-value transactions")
243
244
# Query JSON data
245
json_query = """
246
SELECT s.user.name, s.event.type, s.timestamp
247
FROM S3Object[*].events[*] s
248
WHERE s.event.type = 'purchase'
249
"""
250
251
events = bucket.select("logs/user_events.json", json_query)
252
for event in events:
253
print(f"Purchase by {event['name']} at {event['timestamp']}")
254
255
# Query Parquet data
256
parquet_query = """
257
SELECT product_category, AVG(price) as avg_price
258
FROM S3Object
259
WHERE date >= '2024-01-01'
260
GROUP BY product_category
261
"""
262
263
analytics = bucket.select("analytics/sales.parquet", parquet_query)
264
for row in analytics:
265
print(f"{row['product_category']}: ${row['avg_price']:.2f} average")
266
```
267
268
### Search Operations
269
270
```python
271
# Simple text search
272
search_results = bucket.search("experiment temperature", limit=50)
273
for result in search_results:
274
print(f"Found in: {result['_source']['key']}")
275
276
# Advanced search with Elasticsearch DSL
277
complex_query = {
278
"query": {
279
"bool": {
280
"must": [
281
{"term": {"file_type": "csv"}},
282
{"range": {"file_size": {"gte": 1000000}}}
283
],
284
"filter": [
285
{"term": {"tags": "experiment"}}
286
]
287
}
288
},
289
"sort": [{"modified_date": {"order": "desc"}}]
290
}
291
292
advanced_results = bucket.search(complex_query, limit=20)
293
for result in advanced_results:
294
source = result['_source']
295
print(f"Large CSV: {source['key']} ({source['file_size']} bytes)")
296
```
297
298
### Bucket Management
299
300
```python
301
# Delete specific files
302
bucket.delete("temp/old_data.csv")
303
bucket.delete("cache/expired_results.json")
304
305
# Delete entire directory
306
bucket.delete_dir("temp/")
307
308
# Batch operations
309
files_to_upload = [
310
("local/data1.csv", "processed/data1.csv"),
311
("local/data2.csv", "processed/data2.csv"),
312
("local/data3.csv", "processed/data3.csv")
313
]
314
315
for local_path, s3_key in files_to_upload:
316
bucket.put_file(s3_key, local_path)
317
print(f"Uploaded {local_path} to {s3_key}")
318
319
# Verify uploads
320
uploaded_files = bucket.ls("processed/")
321
print(f"Successfully uploaded {len(uploaded_files)} files")
322
```
323
324
### Working with Large Objects
325
326
```python
327
# Handle large file uploads with progress tracking
328
import os
329
330
def upload_large_file(local_path, s3_key):
331
file_size = os.path.getsize(local_path)
332
print(f"Uploading {file_size} byte file...")
333
334
bucket.put_file(s3_key, local_path)
335
print(f"Upload complete: {s3_key}")
336
337
# Stream large query results
338
large_query = """
339
SELECT * FROM S3Object
340
WHERE date BETWEEN '2023-01-01' AND '2023-12-31'
341
"""
342
343
# Process results in batches to manage memory
344
batch_size = 1000
345
offset = 0
346
347
while True:
348
paginated_query = f"{large_query} LIMIT {batch_size} OFFSET {offset}"
349
batch = bucket.select("large_dataset.csv", paginated_query)
350
351
if not batch:
352
break
353
354
process_batch(batch)
355
offset += batch_size
356
print(f"Processed {offset} records...")
357
```