or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-aws@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-aws@0.27.0

0

# Dagster AWS

1

2

Package for AWS-specific Dagster framework solid and resource components. This library provides comprehensive AWS service integrations for the Dagster data orchestration framework, enabling developers to build data pipelines that seamlessly interact with AWS services including S3, ECS, EMR, Redshift, Athena, RDS, CloudWatch, Secrets Manager, SSM, ECR, and more.

3

4

## Package Information

5

6

- **Package Name**: dagster-aws

7

- **Language**: Python

8

- **Installation**: `pip install dagster-aws`

9

10

## Core Imports

11

12

The package is organized by AWS service, with each service module exporting its components:

13

14

```python

15

# S3 functionality

16

from dagster_aws.s3 import S3Resource, s3_resource, S3PickleIOManager

17

18

# ECS functionality

19

from dagster_aws.ecs import EcsRunLauncher, ecs_executor

20

21

# EMR functionality

22

from dagster_aws.emr import EmrJobRunner, emr_pyspark_step_launcher

23

24

# Redshift functionality

25

from dagster_aws.redshift import RedshiftResource, redshift_resource

26

27

# Other services

28

from dagster_aws.athena import AthenaResource

29

from dagster_aws.cloudwatch import cloudwatch_logger

30

from dagster_aws.secretsmanager import SecretsManagerResource

31

from dagster_aws.ssm import ParameterStoreResource

32

from dagster_aws.pipes import PipesECSClient, PipesS3ContextInjector

33

```

34

35

## Basic Usage

36

37

```python

38

from dagster import Definitions, asset

39

from dagster_aws.s3 import S3Resource, S3PickleIOManager

40

from dagster_aws.ecs import ecs_executor

41

42

# Configure S3 resource

43

s3_resource = S3Resource(region_name="us-west-2")

44

45

# Configure S3 I/O manager for asset storage

46

s3_io_manager = S3PickleIOManager(

47

s3_resource=s3_resource,

48

s3_bucket="my-data-bucket"

49

)

50

51

# Use ECS executor for distributed computation

52

@asset

53

def my_data_asset():

54

return [1, 2, 3, 4, 5]

55

56

# Define deployment with AWS resources

57

defs = Definitions(

58

assets=[my_data_asset],

59

resources={

60

"s3": s3_resource,

61

"io_manager": s3_io_manager,

62

},

63

executors={

64

"ecs": ecs_executor.configured({

65

"cluster": "my-dagster-cluster",

66

"subnets": ["subnet-12345"],

67

"security_group_ids": ["sg-67890"]

68

})

69

}

70

)

71

```

72

73

## Architecture

74

75

Dagster AWS follows a service-oriented architecture where each AWS service integration is contained in its own module. The library provides three main types of components:

76

77

- **Resources**: Configured connections to AWS services (S3Resource, RedshiftResource, etc.)

78

- **I/O Managers**: Handle data storage and retrieval (S3PickleIOManager, etc.)

79

- **Executors/Launchers**: Manage job execution on AWS infrastructure (EcsRunLauncher, ecs_executor)

80

- **Pipes Clients**: Enable external process orchestration (PipesECSClient, PipesLambdaClient, etc.)

81

82

All resources inherit from Dagster's ConfigurableResource and can be configured with AWS credentials, regions, and service-specific settings.

83

84

## Capabilities

85

86

### S3 Storage and File Management

87

88

Comprehensive S3 integration for data storage, file management, compute logs, and I/O operations. Includes specialized I/O managers for different data formats and use cases.

89

90

```python { .api }

91

class S3Resource(ResourceWithBoto3Configuration):

92

def get_client(self): ...

93

94

class S3PickleIOManager(ConfigurableIOManager):

95

def load_input(self, context): ...

96

def handle_output(self, context, obj): ...

97

98

def s3_resource(**kwargs) -> S3Resource: ...

99

def s3_pickle_io_manager(**kwargs): ...

100

```

101

102

[S3 Storage](./s3-storage.md)

103

104

### ECS Container Orchestration

105

106

Execute Dagster jobs and ops on Amazon ECS clusters with full support for task configuration, networking, and scaling.

107

108

```python { .api }

109

class EcsRunLauncher(RunLauncher):

110

def launch_run(self, context): ...

111

112

def ecs_executor(**kwargs): ...

113

114

class EcsEventualConsistencyTimeout(Exception): ...

115

```

116

117

[ECS Orchestration](./ecs-orchestration.md)

118

119

### EMR Big Data Processing

120

121

Integrate with Amazon EMR for big data processing workflows, including PySpark step execution and cluster management.

122

123

```python { .api }

124

class EmrJobRunner:

125

def run_job_flow(self, **kwargs): ...

126

def wait_for_completion(self): ...

127

128

def emr_pyspark_step_launcher(**kwargs): ...

129

130

class EmrError(Exception): ...

131

class EmrClusterState(Enum): ...

132

class EmrStepState(Enum): ...

133

```

134

135

[EMR Processing](./emr-processing.md)

136

137

### Redshift Data Warehousing

138

139

Connect to and execute queries against Amazon Redshift clusters with connection pooling and query optimization.

140

141

```python { .api }

142

class RedshiftResource(ResourceWithBoto3Configuration):

143

def get_connection(self): ...

144

def execute_query(self, query: str): ...

145

146

def redshift_resource(**kwargs) -> RedshiftResource: ...

147

148

class RedshiftError(Exception): ...

149

```

150

151

[Redshift Integration](./redshift-integration.md)

152

153

### Athena Query Service

154

155

Execute serverless SQL queries against data in S3 using Amazon Athena with result management and query optimization.

156

157

```python { .api }

158

class AthenaResource(ResourceWithBoto3Configuration):

159

def execute_query(self, query: str): ...

160

def get_query_results(self, execution_id: str): ...

161

162

def athena_resource(**kwargs) -> AthenaResource: ...

163

164

class AthenaError(Exception): ...

165

class AthenaTimeout(Exception): ...

166

```

167

168

[Athena Queries](./athena-queries.md)

169

170

### CloudWatch Logging

171

172

Send Dagster logs to Amazon CloudWatch for centralized log management and monitoring.

173

174

```python { .api }

175

def cloudwatch_logger(**kwargs): ...

176

```

177

178

[CloudWatch Logging](./cloudwatch-logging.md)

179

180

### Secrets Management

181

182

Integrate with AWS Secrets Manager for secure credential and configuration management within Dagster pipelines.

183

184

```python { .api }

185

class SecretsManagerResource(ResourceWithBoto3Configuration):

186

def get_secret(self, secret_id: str): ...

187

188

def secretsmanager_resource(**kwargs) -> SecretsManagerResource: ...

189

def get_secrets_from_arns(arns: list) -> dict: ...

190

def get_tagged_secrets(tags: dict) -> dict: ...

191

```

192

193

[Secrets Management](./secrets-management.md)

194

195

### Parameter Store Configuration

196

197

Access AWS Systems Manager Parameter Store for configuration management and secure parameter storage.

198

199

```python { .api }

200

class ParameterStoreResource(ResourceWithBoto3Configuration):

201

def get_parameter(self, name: str): ...

202

def get_parameters_by_path(self, path: str): ...

203

204

class SSMResource(ResourceWithBoto3Configuration): ...

205

206

def parameter_store_resource(**kwargs) -> ParameterStoreResource: ...

207

def ssm_resource(**kwargs) -> SSMResource: ...

208

```

209

210

[Parameter Store](./parameter-store.md)

211

212

### Container Registry Integration

213

214

Interact with Amazon ECR for container image management in containerized Dagster workflows.

215

216

```python { .api }

217

class ECRPublicResource(ResourceWithBoto3Configuration):

218

def get_authorization_token(self): ...

219

220

def ecr_public_resource(**kwargs) -> ECRPublicResource: ...

221

```

222

223

[ECR Integration](./ecr-integration.md)

224

225

### RDS Database Operations

226

227

Connect to and manage Amazon RDS instances for relational database operations within Dagster pipelines.

228

229

```python { .api }

230

class RDSResource(ResourceWithBoto3Configuration):

231

def get_connection(self): ...

232

```

233

234

[RDS Operations](./rds-operations.md)

235

236

### Pipes External Process Orchestration

237

238

Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication.

239

240

```python { .api }

241

class PipesECSClient(PipesClient):

242

def run(self, context, **kwargs): ...

243

244

class PipesLambdaClient(PipesClient):

245

def run(self, context, **kwargs): ...

246

247

class PipesS3ContextInjector(PipesContextInjector):

248

def inject_context(self, context): ...

249

250

class PipesCloudWatchLogReader(PipesLogReader):

251

def read_logs(self): ...

252

```

253

254

[Pipes Orchestration](./pipes-orchestration.md)

255

256

## Common Types

257

258

```python { .api }

259

class ResourceWithBoto3Configuration(ConfigurableResource):

260

"""

261

Base resource class for AWS services using boto3 with standard configuration options.

262

"""

263

region_name: Optional[str] = None

264

max_attempts: int = 5

265

profile_name: Optional[str] = None

266

use_ssl: bool = True

267

endpoint_url: Optional[str] = None

268

verify: Optional[bool] = True

269

aws_access_key_id: Optional[str] = None

270

aws_secret_access_key: Optional[str] = None

271

aws_session_token: Optional[str] = None

272

273

class ResourceWithS3Configuration(ConfigurableResource):

274

"""

275

Base resource class for S3-specific services with S3-focused configuration options.

276

"""

277

use_unsigned_session: bool = False

278

region_name: Optional[str] = None

279

endpoint_url: Optional[str] = None

280

max_attempts: int = 5

281

profile_name: Optional[str] = None

282

use_ssl: bool = True

283

verify: Optional[bool] = None

284

aws_access_key_id: Optional[str] = None

285

aws_secret_access_key: Optional[str] = None

286

aws_session_token: Optional[str] = None

287

288

def construct_boto_client_retry_config(max_attempts: int) -> dict:

289

"""

290

Construct retry configuration for boto3 clients.

291

292

Parameters:

293

max_attempts: Maximum number of retry attempts

294

295

Returns:

296

dict: Boto3 retry configuration

297

"""

298

```