Package for AWS-specific Dagster framework solid and resource components.
npx @tessl/cli install tessl/pypi-dagster-aws@0.27.00
# Dagster AWS
1
2
Package for AWS-specific Dagster framework solid and resource components. This library provides comprehensive AWS service integrations for the Dagster data orchestration framework, enabling developers to build data pipelines that seamlessly interact with AWS services including S3, ECS, EMR, Redshift, Athena, RDS, CloudWatch, Secrets Manager, SSM, ECR, and more.
3
4
## Package Information
5
6
- **Package Name**: dagster-aws
7
- **Language**: Python
8
- **Installation**: `pip install dagster-aws`
9
10
## Core Imports
11
12
The package is organized by AWS service, with each service module exporting its components:
13
14
```python
15
# S3 functionality
16
from dagster_aws.s3 import S3Resource, s3_resource, S3PickleIOManager
17
18
# ECS functionality
19
from dagster_aws.ecs import EcsRunLauncher, ecs_executor
20
21
# EMR functionality
22
from dagster_aws.emr import EmrJobRunner, emr_pyspark_step_launcher
23
24
# Redshift functionality
25
from dagster_aws.redshift import RedshiftResource, redshift_resource
26
27
# Other services
28
from dagster_aws.athena import AthenaResource
29
from dagster_aws.cloudwatch import cloudwatch_logger
30
from dagster_aws.secretsmanager import SecretsManagerResource
31
from dagster_aws.ssm import ParameterStoreResource
32
from dagster_aws.pipes import PipesECSClient, PipesS3ContextInjector
33
```
34
35
## Basic Usage
36
37
```python
38
from dagster import Definitions, asset
39
from dagster_aws.s3 import S3Resource, S3PickleIOManager
40
from dagster_aws.ecs import ecs_executor
41
42
# Configure S3 resource
43
s3_resource = S3Resource(region_name="us-west-2")
44
45
# Configure S3 I/O manager for asset storage
46
s3_io_manager = S3PickleIOManager(
47
s3_resource=s3_resource,
48
s3_bucket="my-data-bucket"
49
)
50
51
# Use ECS executor for distributed computation
52
@asset
53
def my_data_asset():
54
return [1, 2, 3, 4, 5]
55
56
# Define deployment with AWS resources
57
defs = Definitions(
58
assets=[my_data_asset],
59
resources={
60
"s3": s3_resource,
61
"io_manager": s3_io_manager,
62
},
63
executors={
64
"ecs": ecs_executor.configured({
65
"cluster": "my-dagster-cluster",
66
"subnets": ["subnet-12345"],
67
"security_group_ids": ["sg-67890"]
68
})
69
}
70
)
71
```
72
73
## Architecture
74
75
Dagster AWS follows a service-oriented architecture where each AWS service integration is contained in its own module. The library provides three main types of components:
76
77
- **Resources**: Configured connections to AWS services (S3Resource, RedshiftResource, etc.)
78
- **I/O Managers**: Handle data storage and retrieval (S3PickleIOManager, etc.)
79
- **Executors/Launchers**: Manage job execution on AWS infrastructure (EcsRunLauncher, ecs_executor)
80
- **Pipes Clients**: Enable external process orchestration (PipesECSClient, PipesLambdaClient, etc.)
81
82
All resources inherit from Dagster's ConfigurableResource and can be configured with AWS credentials, regions, and service-specific settings.
83
84
## Capabilities
85
86
### S3 Storage and File Management
87
88
Comprehensive S3 integration for data storage, file management, compute logs, and I/O operations. Includes specialized I/O managers for different data formats and use cases.
89
90
```python { .api }
91
class S3Resource(ResourceWithBoto3Configuration):
92
def get_client(self): ...
93
94
class S3PickleIOManager(ConfigurableIOManager):
95
def load_input(self, context): ...
96
def handle_output(self, context, obj): ...
97
98
def s3_resource(**kwargs) -> S3Resource: ...
99
def s3_pickle_io_manager(**kwargs): ...
100
```
101
102
[S3 Storage](./s3-storage.md)
103
104
### ECS Container Orchestration
105
106
Execute Dagster jobs and ops on Amazon ECS clusters with full support for task configuration, networking, and scaling.
107
108
```python { .api }
109
class EcsRunLauncher(RunLauncher):
110
def launch_run(self, context): ...
111
112
def ecs_executor(**kwargs): ...
113
114
class EcsEventualConsistencyTimeout(Exception): ...
115
```
116
117
[ECS Orchestration](./ecs-orchestration.md)
118
119
### EMR Big Data Processing
120
121
Integrate with Amazon EMR for big data processing workflows, including PySpark step execution and cluster management.
122
123
```python { .api }
124
class EmrJobRunner:
125
def run_job_flow(self, **kwargs): ...
126
def wait_for_completion(self): ...
127
128
def emr_pyspark_step_launcher(**kwargs): ...
129
130
class EmrError(Exception): ...
131
class EmrClusterState(Enum): ...
132
class EmrStepState(Enum): ...
133
```
134
135
[EMR Processing](./emr-processing.md)
136
137
### Redshift Data Warehousing
138
139
Connect to and execute queries against Amazon Redshift clusters with connection pooling and query optimization.
140
141
```python { .api }
142
class RedshiftResource(ResourceWithBoto3Configuration):
143
def get_connection(self): ...
144
def execute_query(self, query: str): ...
145
146
def redshift_resource(**kwargs) -> RedshiftResource: ...
147
148
class RedshiftError(Exception): ...
149
```
150
151
[Redshift Integration](./redshift-integration.md)
152
153
### Athena Query Service
154
155
Execute serverless SQL queries against data in S3 using Amazon Athena with result management and query optimization.
156
157
```python { .api }
158
class AthenaResource(ResourceWithBoto3Configuration):
159
def execute_query(self, query: str): ...
160
def get_query_results(self, execution_id: str): ...
161
162
def athena_resource(**kwargs) -> AthenaResource: ...
163
164
class AthenaError(Exception): ...
165
class AthenaTimeout(Exception): ...
166
```
167
168
[Athena Queries](./athena-queries.md)
169
170
### CloudWatch Logging
171
172
Send Dagster logs to Amazon CloudWatch for centralized log management and monitoring.
173
174
```python { .api }
175
def cloudwatch_logger(**kwargs): ...
176
```
177
178
[CloudWatch Logging](./cloudwatch-logging.md)
179
180
### Secrets Management
181
182
Integrate with AWS Secrets Manager for secure credential and configuration management within Dagster pipelines.
183
184
```python { .api }
185
class SecretsManagerResource(ResourceWithBoto3Configuration):
186
def get_secret(self, secret_id: str): ...
187
188
def secretsmanager_resource(**kwargs) -> SecretsManagerResource: ...
189
def get_secrets_from_arns(arns: list) -> dict: ...
190
def get_tagged_secrets(tags: dict) -> dict: ...
191
```
192
193
[Secrets Management](./secrets-management.md)
194
195
### Parameter Store Configuration
196
197
Access AWS Systems Manager Parameter Store for configuration management and secure parameter storage.
198
199
```python { .api }
200
class ParameterStoreResource(ResourceWithBoto3Configuration):
201
def get_parameter(self, name: str): ...
202
def get_parameters_by_path(self, path: str): ...
203
204
class SSMResource(ResourceWithBoto3Configuration): ...
205
206
def parameter_store_resource(**kwargs) -> ParameterStoreResource: ...
207
def ssm_resource(**kwargs) -> SSMResource: ...
208
```
209
210
[Parameter Store](./parameter-store.md)
211
212
### Container Registry Integration
213
214
Interact with Amazon ECR for container image management in containerized Dagster workflows.
215
216
```python { .api }
217
class ECRPublicResource(ResourceWithBoto3Configuration):
218
def get_authorization_token(self): ...
219
220
def ecr_public_resource(**kwargs) -> ECRPublicResource: ...
221
```
222
223
[ECR Integration](./ecr-integration.md)
224
225
### RDS Database Operations
226
227
Connect to and manage Amazon RDS instances for relational database operations within Dagster pipelines.
228
229
```python { .api }
230
class RDSResource(ResourceWithBoto3Configuration):
231
def get_connection(self): ...
232
```
233
234
[RDS Operations](./rds-operations.md)
235
236
### Pipes External Process Orchestration
237
238
Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication.
239
240
```python { .api }
241
class PipesECSClient(PipesClient):
242
def run(self, context, **kwargs): ...
243
244
class PipesLambdaClient(PipesClient):
245
def run(self, context, **kwargs): ...
246
247
class PipesS3ContextInjector(PipesContextInjector):
248
def inject_context(self, context): ...
249
250
class PipesCloudWatchLogReader(PipesLogReader):
251
def read_logs(self): ...
252
```
253
254
[Pipes Orchestration](./pipes-orchestration.md)
255
256
## Common Types
257
258
```python { .api }
259
class ResourceWithBoto3Configuration(ConfigurableResource):
260
"""
261
Base resource class for AWS services using boto3 with standard configuration options.
262
"""
263
region_name: Optional[str] = None
264
max_attempts: int = 5
265
profile_name: Optional[str] = None
266
use_ssl: bool = True
267
endpoint_url: Optional[str] = None
268
verify: Optional[bool] = True
269
aws_access_key_id: Optional[str] = None
270
aws_secret_access_key: Optional[str] = None
271
aws_session_token: Optional[str] = None
272
273
class ResourceWithS3Configuration(ConfigurableResource):
274
"""
275
Base resource class for S3-specific services with S3-focused configuration options.
276
"""
277
use_unsigned_session: bool = False
278
region_name: Optional[str] = None
279
endpoint_url: Optional[str] = None
280
max_attempts: int = 5
281
profile_name: Optional[str] = None
282
use_ssl: bool = True
283
verify: Optional[bool] = None
284
aws_access_key_id: Optional[str] = None
285
aws_secret_access_key: Optional[str] = None
286
aws_session_token: Optional[str] = None
287
288
def construct_boto_client_retry_config(max_attempts: int) -> dict:
289
"""
290
Construct retry configuration for boto3 clients.
291
292
Parameters:
293
max_attempts: Maximum number of retry attempts
294
295
Returns:
296
dict: Boto3 retry configuration
297
"""
298
```