or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

s3-storage.mddocs/

0

# S3 Storage and File Management

1

2

Comprehensive Amazon S3 integration for Dagster providing data storage, file management, compute logs, and I/O operations. The S3 module includes I/O managers for different data formats, file management utilities, and compute log storage.

3

4

## Capabilities

5

6

### S3 Resource

7

8

Core S3 resource providing configured access to Amazon S3 services with automatic credential management and retry logic.

9

10

```python { .api }

11

class S3Resource(ResourceWithS3Configuration):

12

"""

13

Resource for interacting with Amazon S3.

14

15

Inherits S3-specific configuration options from ResourceWithS3Configuration.

16

"""

17

def get_client(self):

18

"""

19

Get configured boto3 S3 client.

20

21

Returns:

22

boto3.client: Configured S3 client

23

"""

24

25

def s3_resource(**kwargs) -> S3Resource:

26

"""

27

Factory function to create S3Resource.

28

29

Parameters:

30

**kwargs: Configuration parameters for S3Resource

31

32

Returns:

33

S3Resource: Configured S3 resource

34

"""

35

```

36

37

### S3 I/O Managers

38

39

I/O managers for storing and retrieving Dagster assets and op outputs in Amazon S3, supporting different serialization formats.

40

41

```python { .api }

42

class S3PickleIOManager(ConfigurableIOManager):

43

"""

44

I/O manager that stores and retrieves objects in S3 using pickle serialization.

45

"""

46

s3_resource: S3Resource

47

s3_bucket: str

48

s3_prefix: str = ""

49

50

def load_input(self, context):

51

"""

52

Load input from S3.

53

54

Parameters:

55

context: Input loading context

56

57

Returns:

58

Any: Deserialized object from S3

59

"""

60

61

def handle_output(self, context, obj):

62

"""

63

Store output to S3.

64

65

Parameters:

66

context: Output handling context

67

obj: Object to serialize and store

68

"""

69

70

class PickledObjectS3IOManager(S3PickleIOManager):

71

"""

72

Legacy alias for S3PickleIOManager.

73

"""

74

75

class ConfigurablePickledObjectS3IOManager(ConfigurableIOManager):

76

"""

77

Configurable version of S3 pickle I/O manager.

78

"""

79

80

def s3_pickle_io_manager(**kwargs):

81

"""

82

Factory function for S3 pickle I/O manager.

83

84

Parameters:

85

s3_resource: S3Resource instance

86

s3_bucket: S3 bucket name

87

s3_prefix: Optional S3 key prefix

88

89

Returns:

90

IOManagerDefinition: Configured S3 pickle I/O manager

91

"""

92

```

93

94

### S3 File Management

95

96

File management utilities for handling file uploads, downloads, and operations within Dagster pipelines.

97

98

```python { .api }

99

class S3FileHandle:

100

"""

101

Handle representing a file stored in S3.

102

"""

103

s3_bucket: str

104

s3_key: str

105

106

def __init__(self, s3_bucket: str, s3_key: str): ...

107

108

class S3FileManager:

109

"""

110

Utility for managing files in S3.

111

"""

112

def __init__(self, s3_session, s3_bucket: str, s3_base_key: str): ...

113

114

def copy_handle_to_local_temp(self, file_handle: S3FileHandle) -> str:

115

"""

116

Copy S3 file to local temporary file.

117

118

Parameters:

119

file_handle: S3 file handle

120

121

Returns:

122

str: Path to local temp file

123

"""

124

125

def read(self, file_handle, mode="rb"):

126

"""

127

Context manager for reading files.

128

129

Parameters:

130

file_handle: S3 file handle

131

mode: File read mode

132

133

Returns:

134

Context manager for file reading

135

"""

136

137

def read_data(self, file_handle) -> bytes:

138

"""

139

Read file data as bytes.

140

141

Parameters:

142

file_handle: S3 file handle

143

144

Returns:

145

bytes: File content as bytes

146

"""

147

148

def write_data(self, data: bytes, ext=None) -> S3FileHandle:

149

"""

150

Write bytes data to S3.

151

152

Parameters:

153

data: Bytes data to write

154

ext: Optional file extension

155

156

Returns:

157

S3FileHandle: Handle to written file

158

"""

159

160

def write(self, file_obj, mode="wb", ext=None) -> S3FileHandle:

161

"""

162

Write file object to S3.

163

164

Parameters:

165

file_obj: File-like object to write

166

mode: Write mode

167

ext: Optional file extension

168

169

Returns:

170

S3FileHandle: Handle to written file

171

"""

172

173

def get_full_key(self, file_key: str) -> str:

174

"""

175

Get full S3 key with prefix.

176

177

Parameters:

178

file_key: Base file key

179

180

Returns:

181

str: Full S3 key with prefix

182

"""

183

184

def delete_local_temp(self): ...

185

def upload_file(self, file_path: str, key: str) -> S3FileHandle: ...

186

187

class S3FileManagerResource(S3FileManager, ConfigurableResource):

188

"""

189

Configurable S3 file manager resource.

190

"""

191

192

def s3_file_manager(**kwargs):

193

"""

194

Factory function for S3 file manager.

195

196

Returns:

197

ResourceDefinition: Configured S3 file manager resource

198

"""

199

```

200

201

### S3 Compute Log Manager

202

203

Manages compute logs storage in S3 for Dagster run execution logs.

204

205

```python { .api }

206

class S3ComputeLogManager:

207

"""

208

Compute log manager that stores logs in S3.

209

"""

210

def __init__(self, bucket: str, local_dir=None, inst_data=None,

211

prefix="dagster", use_ssl=True, verify=True,

212

verify_cert_path=None, endpoint_url=None,

213

skip_empty_files=False, upload_interval=None,

214

upload_extra_args=None, show_url_only=False, region=None): ...

215

216

def get_log_data(self, log_key: str) -> str:

217

"""

218

Retrieve log data from S3.

219

220

Parameters:

221

log_key: Log identifier

222

223

Returns:

224

str: Log content

225

"""

226

227

def delete_logs(self, log_key=None, prefix=None):

228

"""

229

Delete logs from S3.

230

231

Parameters:

232

log_key: Specific log key to delete

233

prefix: Prefix for bulk deletion

234

"""

235

236

def download_url_for_type(self, log_key: str, io_type: str) -> str:

237

"""

238

Get download URL for log type.

239

240

Parameters:

241

log_key: Log identifier

242

io_type: Log type (stdout/stderr)

243

244

Returns:

245

str: Presigned download URL

246

"""

247

248

def cloud_storage_has_logs(self, log_key: str, io_type: str, partial=False) -> bool:

249

"""

250

Check if logs exist in cloud storage.

251

252

Parameters:

253

log_key: Log identifier

254

io_type: Log type

255

partial: Check for partial logs

256

257

Returns:

258

bool: True if logs exist

259

"""

260

261

def upload_log(self, log_key: str, log_data: str): ...

262

```

263

264

### S3 Operations and Utilities

265

266

Utility functions and operations for working with S3 within Dagster pipelines.

267

268

```python { .api }

269

class S3Coordinate:

270

"""

271

Coordinate for S3 file location.

272

"""

273

bucket: str

274

key: str

275

276

def file_handle_to_s3(context, file_handle) -> S3Coordinate:

277

"""

278

Convert file handle to S3 coordinate.

279

280

Parameters:

281

context: Dagster execution context

282

file_handle: File handle to convert

283

284

Returns:

285

S3Coordinate: S3 location coordinate

286

"""

287

288

class S3Callback:

289

"""

290

Callback utilities for S3 operations.

291

"""

292

def __init__(self, logger, bucket: str, key: str, filename: str, size: int): ...

293

def __call__(self, bytes_amount: int): ...

294

```

295

296

### S3 Testing Utilities

297

298

Mock S3 resources and utilities for testing Dagster pipelines without actual S3 dependencies.

299

300

```python { .api }

301

class S3FakeSession:

302

"""

303

Fake S3 session for testing purposes.

304

"""

305

def client(self, service_name: str): ...

306

307

def create_s3_fake_resource(**kwargs):

308

"""

309

Create fake S3 resource for testing.

310

311

Returns:

312

ResourceDefinition: Mock S3 resource for testing

313

"""

314

```

315

316

## Usage Examples

317

318

### Basic S3 I/O Manager Setup

319

320

```python

321

from dagster import Definitions, asset

322

from dagster_aws.s3 import S3Resource, s3_pickle_io_manager

323

324

# Configure S3 resource

325

s3_resource_def = S3Resource(

326

region_name="us-west-2",

327

aws_access_key_id="your-access-key",

328

aws_secret_access_key="your-secret-key"

329

)

330

331

# Configure I/O manager

332

s3_io_manager = s3_pickle_io_manager.configured({

333

"s3_bucket": "my-dagster-bucket",

334

"s3_prefix": "dagster-outputs/"

335

})

336

337

@asset

338

def my_dataset():

339

return [1, 2, 3, 4, 5]

340

341

defs = Definitions(

342

assets=[my_dataset],

343

resources={

344

"s3": s3_resource_def,

345

"io_manager": s3_io_manager

346

}

347

)

348

```

349

350

### S3 File Management

351

352

```python

353

from dagster import op, job

354

from dagster_aws.s3 import S3FileManagerResource

355

356

@op(required_resource_keys={"s3_file_manager"})

357

def process_file(context):

358

file_manager = context.resources.s3_file_manager

359

360

# Upload a file

361

file_handle = file_manager.upload_file("/local/path/data.csv", "data/input.csv")

362

363

# Copy to local temp for processing

364

local_path = file_manager.copy_handle_to_local_temp(file_handle)

365

366

# Process file...

367

368

# Clean up temp file

369

file_manager.delete_local_temp()

370

371

@job(

372

resource_defs={

373

"s3_file_manager": S3FileManagerResource(

374

s3_bucket="my-bucket",

375

region_name="us-west-2"

376

)

377

}

378

)

379

def file_processing_job():

380

process_file()

381

```