0
# S3 Storage and File Management
1
2
Comprehensive Amazon S3 integration for Dagster providing data storage, file management, compute logs, and I/O operations. The S3 module includes I/O managers for different data formats, file management utilities, and compute log storage.
3
4
## Capabilities
5
6
### S3 Resource
7
8
Core S3 resource providing configured access to Amazon S3 services with automatic credential management and retry logic.
9
10
```python { .api }
11
class S3Resource(ResourceWithS3Configuration):
12
"""
13
Resource for interacting with Amazon S3.
14
15
Inherits S3-specific configuration options from ResourceWithS3Configuration.
16
"""
17
def get_client(self):
18
"""
19
Get configured boto3 S3 client.
20
21
Returns:
22
boto3.client: Configured S3 client
23
"""
24
25
def s3_resource(**kwargs) -> S3Resource:
26
"""
27
Factory function to create S3Resource.
28
29
Parameters:
30
**kwargs: Configuration parameters for S3Resource
31
32
Returns:
33
S3Resource: Configured S3 resource
34
"""
35
```
36
37
### S3 I/O Managers
38
39
I/O managers for storing and retrieving Dagster assets and op outputs in Amazon S3, supporting different serialization formats.
40
41
```python { .api }
42
class S3PickleIOManager(ConfigurableIOManager):
43
"""
44
I/O manager that stores and retrieves objects in S3 using pickle serialization.
45
"""
46
s3_resource: S3Resource
47
s3_bucket: str
48
s3_prefix: str = ""
49
50
def load_input(self, context):
51
"""
52
Load input from S3.
53
54
Parameters:
55
context: Input loading context
56
57
Returns:
58
Any: Deserialized object from S3
59
"""
60
61
def handle_output(self, context, obj):
62
"""
63
Store output to S3.
64
65
Parameters:
66
context: Output handling context
67
obj: Object to serialize and store
68
"""
69
70
class PickledObjectS3IOManager(S3PickleIOManager):
71
"""
72
Legacy alias for S3PickleIOManager.
73
"""
74
75
class ConfigurablePickledObjectS3IOManager(ConfigurableIOManager):
76
"""
77
Configurable version of S3 pickle I/O manager.
78
"""
79
80
def s3_pickle_io_manager(**kwargs):
81
"""
82
Factory function for S3 pickle I/O manager.
83
84
Parameters:
85
s3_resource: S3Resource instance
86
s3_bucket: S3 bucket name
87
s3_prefix: Optional S3 key prefix
88
89
Returns:
90
IOManagerDefinition: Configured S3 pickle I/O manager
91
"""
92
```
93
94
### S3 File Management
95
96
File management utilities for handling file uploads, downloads, and operations within Dagster pipelines.
97
98
```python { .api }
99
class S3FileHandle:
100
"""
101
Handle representing a file stored in S3.
102
"""
103
s3_bucket: str
104
s3_key: str
105
106
def __init__(self, s3_bucket: str, s3_key: str): ...
107
108
class S3FileManager:
109
"""
110
Utility for managing files in S3.
111
"""
112
def __init__(self, s3_session, s3_bucket: str, s3_base_key: str): ...
113
114
def copy_handle_to_local_temp(self, file_handle: S3FileHandle) -> str:
115
"""
116
Copy S3 file to local temporary file.
117
118
Parameters:
119
file_handle: S3 file handle
120
121
Returns:
122
str: Path to local temp file
123
"""
124
125
def read(self, file_handle, mode="rb"):
126
"""
127
Context manager for reading files.
128
129
Parameters:
130
file_handle: S3 file handle
131
mode: File read mode
132
133
Returns:
134
Context manager for file reading
135
"""
136
137
def read_data(self, file_handle) -> bytes:
138
"""
139
Read file data as bytes.
140
141
Parameters:
142
file_handle: S3 file handle
143
144
Returns:
145
bytes: File content as bytes
146
"""
147
148
def write_data(self, data: bytes, ext=None) -> S3FileHandle:
149
"""
150
Write bytes data to S3.
151
152
Parameters:
153
data: Bytes data to write
154
ext: Optional file extension
155
156
Returns:
157
S3FileHandle: Handle to written file
158
"""
159
160
def write(self, file_obj, mode="wb", ext=None) -> S3FileHandle:
161
"""
162
Write file object to S3.
163
164
Parameters:
165
file_obj: File-like object to write
166
mode: Write mode
167
ext: Optional file extension
168
169
Returns:
170
S3FileHandle: Handle to written file
171
"""
172
173
def get_full_key(self, file_key: str) -> str:
174
"""
175
Get full S3 key with prefix.
176
177
Parameters:
178
file_key: Base file key
179
180
Returns:
181
str: Full S3 key with prefix
182
"""
183
184
def delete_local_temp(self): ...
185
def upload_file(self, file_path: str, key: str) -> S3FileHandle: ...
186
187
class S3FileManagerResource(S3FileManager, ConfigurableResource):
188
"""
189
Configurable S3 file manager resource.
190
"""
191
192
def s3_file_manager(**kwargs):
193
"""
194
Factory function for S3 file manager.
195
196
Returns:
197
ResourceDefinition: Configured S3 file manager resource
198
"""
199
```
200
201
### S3 Compute Log Manager
202
203
Manages compute logs storage in S3 for Dagster run execution logs.
204
205
```python { .api }
206
class S3ComputeLogManager:
207
"""
208
Compute log manager that stores logs in S3.
209
"""
210
def __init__(self, bucket: str, local_dir=None, inst_data=None,
211
prefix="dagster", use_ssl=True, verify=True,
212
verify_cert_path=None, endpoint_url=None,
213
skip_empty_files=False, upload_interval=None,
214
upload_extra_args=None, show_url_only=False, region=None): ...
215
216
def get_log_data(self, log_key: str) -> str:
217
"""
218
Retrieve log data from S3.
219
220
Parameters:
221
log_key: Log identifier
222
223
Returns:
224
str: Log content
225
"""
226
227
def delete_logs(self, log_key=None, prefix=None):
228
"""
229
Delete logs from S3.
230
231
Parameters:
232
log_key: Specific log key to delete
233
prefix: Prefix for bulk deletion
234
"""
235
236
def download_url_for_type(self, log_key: str, io_type: str) -> str:
237
"""
238
Get download URL for log type.
239
240
Parameters:
241
log_key: Log identifier
242
io_type: Log type (stdout/stderr)
243
244
Returns:
245
str: Presigned download URL
246
"""
247
248
def cloud_storage_has_logs(self, log_key: str, io_type: str, partial=False) -> bool:
249
"""
250
Check if logs exist in cloud storage.
251
252
Parameters:
253
log_key: Log identifier
254
io_type: Log type
255
partial: Check for partial logs
256
257
Returns:
258
bool: True if logs exist
259
"""
260
261
def upload_log(self, log_key: str, log_data: str): ...
262
```
263
264
### S3 Operations and Utilities
265
266
Utility functions and operations for working with S3 within Dagster pipelines.
267
268
```python { .api }
269
class S3Coordinate:
270
"""
271
Coordinate for S3 file location.
272
"""
273
bucket: str
274
key: str
275
276
def file_handle_to_s3(context, file_handle) -> S3Coordinate:
277
"""
278
Convert file handle to S3 coordinate.
279
280
Parameters:
281
context: Dagster execution context
282
file_handle: File handle to convert
283
284
Returns:
285
S3Coordinate: S3 location coordinate
286
"""
287
288
class S3Callback:
289
"""
290
Callback utilities for S3 operations.
291
"""
292
def __init__(self, logger, bucket: str, key: str, filename: str, size: int): ...
293
def __call__(self, bytes_amount: int): ...
294
```
295
296
### S3 Testing Utilities
297
298
Mock S3 resources and utilities for testing Dagster pipelines without actual S3 dependencies.
299
300
```python { .api }
301
class S3FakeSession:
302
"""
303
Fake S3 session for testing purposes.
304
"""
305
def client(self, service_name: str): ...
306
307
def create_s3_fake_resource(**kwargs):
308
"""
309
Create fake S3 resource for testing.
310
311
Returns:
312
ResourceDefinition: Mock S3 resource for testing
313
"""
314
```
315
316
## Usage Examples
317
318
### Basic S3 I/O Manager Setup
319
320
```python
321
from dagster import Definitions, asset
322
from dagster_aws.s3 import S3Resource, s3_pickle_io_manager
323
324
# Configure S3 resource
325
s3_resource_def = S3Resource(
326
region_name="us-west-2",
327
aws_access_key_id="your-access-key",
328
aws_secret_access_key="your-secret-key"
329
)
330
331
# Configure I/O manager
332
s3_io_manager = s3_pickle_io_manager.configured({
333
"s3_bucket": "my-dagster-bucket",
334
"s3_prefix": "dagster-outputs/"
335
})
336
337
@asset
338
def my_dataset():
339
return [1, 2, 3, 4, 5]
340
341
defs = Definitions(
342
assets=[my_dataset],
343
resources={
344
"s3": s3_resource_def,
345
"io_manager": s3_io_manager
346
}
347
)
348
```
349
350
### S3 File Management
351
352
```python
353
from dagster import op, job
354
from dagster_aws.s3 import S3FileManagerResource
355
356
@op(required_resource_keys={"s3_file_manager"})
357
def process_file(context):
358
file_manager = context.resources.s3_file_manager
359
360
# Upload a file
361
file_handle = file_manager.upload_file("/local/path/data.csv", "data/input.csv")
362
363
# Copy to local temp for processing
364
local_path = file_manager.copy_handle_to_local_temp(file_handle)
365
366
# Process file...
367
368
# Clean up temp file
369
file_manager.delete_local_temp()
370
371
@job(
372
resource_defs={
373
"s3_file_manager": S3FileManagerResource(
374
s3_bucket="my-bucket",
375
region_name="us-west-2"
376
)
377
}
378
)
379
def file_processing_job():
380
process_file()
381
```