0
# Pipes External Process Orchestration
1
2
Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication. This enables seamless integration with AWS compute services while maintaining observability and data flow.
3
4
## Capabilities
5
6
### Pipes Clients
7
8
Clients for orchestrating external processes on different AWS compute services.
9
10
```python { .api }
11
class PipesECSClient(PipesClient):
12
"""
13
Pipes client for running external processes on ECS.
14
"""
15
16
def __init__(
17
self,
18
client=None,
19
context_injector=None,
20
message_reader=None,
21
forward_termination=True
22
): ...
23
24
def run(
25
self,
26
context: OpExecutionContext,
27
extras: Optional[Dict] = None,
28
**kwargs
29
) -> PipesExecutionResult:
30
"""
31
Execute external process on ECS.
32
33
Parameters:
34
context: Dagster execution context
35
extras: Additional context for external process
36
**kwargs: ECS task configuration overrides
37
38
Returns:
39
PipesExecutionResult: Execution results
40
"""
41
42
class PipesLambdaClient(PipesClient):
43
"""
44
Pipes client for running external processes on Lambda.
45
"""
46
47
def run(
48
self,
49
function_name: str,
50
event: Dict,
51
context: OpExecutionContext,
52
**kwargs
53
) -> PipesExecutionResult:
54
"""
55
Execute external process on Lambda.
56
57
Parameters:
58
function_name: Lambda function name
59
event: Event payload for Lambda function
60
context: Dagster execution context
61
**kwargs: Additional Lambda invocation parameters
62
63
Returns:
64
PipesExecutionResult: Execution results
65
"""
66
67
class PipesGlueClient(PipesClient):
68
"""
69
Pipes client for running external processes on AWS Glue.
70
"""
71
72
class PipesEMRClient(PipesClient):
73
"""
74
Pipes client for running external processes on EMR.
75
"""
76
77
class PipesEMRContainersClient(PipesClient):
78
"""
79
Pipes client for running external processes on EMR on EKS.
80
"""
81
82
class PipesEMRServerlessClient(PipesClient):
83
"""
84
Pipes client for running external processes on EMR Serverless.
85
"""
86
```
87
88
### Context Injectors
89
90
Inject Dagster context into external processes running on AWS services.
91
92
```python { .api }
93
class PipesS3ContextInjector(PipesContextInjector):
94
"""
95
Inject context via S3 for external processes.
96
"""
97
98
def __init__(
99
self,
100
*,
101
bucket: str,
102
client
103
): ...
104
105
def inject_context(
106
self,
107
context: OpExecutionContext
108
) -> PipesContextData: ...
109
110
class PipesLambdaEventContextInjector(PipesContextInjector):
111
"""
112
Inject context via Lambda event payload.
113
"""
114
```
115
116
### Message Readers
117
118
Read messages and logs from external processes running on AWS services.
119
120
```python { .api }
121
class PipesS3MessageReader(PipesMessageReader):
122
"""
123
Read messages from S3 for external processes.
124
"""
125
126
def __init__(
127
self,
128
bucket: str,
129
key_prefix: str = "dagster-pipes",
130
**kwargs
131
): ...
132
133
class PipesS3LogReader(PipesLogReader):
134
"""
135
Read logs from S3 for external processes.
136
"""
137
138
class PipesCloudWatchMessageReader(PipesMessageReader):
139
"""
140
Read messages from CloudWatch logs.
141
"""
142
143
def __init__(
144
self,
145
log_group: str,
146
log_stream_prefix: str = "",
147
**kwargs
148
): ...
149
150
class PipesCloudWatchLogReader(PipesLogReader):
151
"""
152
Read logs from CloudWatch.
153
"""
154
155
class PipesLambdaLogsMessageReader(PipesMessageReader):
156
"""
157
Read messages from Lambda execution logs.
158
"""
159
```
160
161
## Usage Examples
162
163
### ECS Pipes Client
164
165
```python
166
from dagster import op, job, Definitions
167
from dagster_aws.pipes import (
168
PipesECSClient,
169
PipesS3ContextInjector,
170
PipesCloudWatchLogReader
171
)
172
173
ecs_pipes_client = PipesECSClient(
174
cluster="my-dagster-cluster",
175
task_definition="my-external-task",
176
subnets=["subnet-12345"],
177
security_group_ids=["sg-67890"],
178
context_injector=PipesS3ContextInjector(
179
bucket="my-pipes-bucket",
180
key_prefix="context"
181
),
182
message_reader=PipesCloudWatchLogReader(
183
log_group="/aws/ecs/my-external-task"
184
)
185
)
186
187
@op
188
def run_external_processing(context, pipes_client: PipesECSClient):
189
"""
190
Run external data processing on ECS via Pipes.
191
"""
192
return pipes_client.run(
193
context=context,
194
extras={"input_path": "s3://data-bucket/input/"},
195
task_overrides={
196
"cpu": "1024",
197
"memory": "2048"
198
}
199
)
200
201
@job(
202
resource_defs={
203
"pipes_ecs_client": ecs_pipes_client
204
}
205
)
206
def external_processing_job():
207
run_external_processing()
208
209
defs = Definitions(jobs=[external_processing_job])
210
```
211
212
### Lambda Pipes Client
213
214
```python
215
from dagster import op, job, Definitions
216
from dagster_aws.pipes import (
217
PipesLambdaClient,
218
PipesLambdaEventContextInjector,
219
PipesLambdaLogsMessageReader
220
)
221
222
lambda_pipes_client = PipesLambdaClient(
223
context_injector=PipesLambdaEventContextInjector(),
224
message_reader=PipesLambdaLogsMessageReader()
225
)
226
227
@op
228
def invoke_lambda_function(context, pipes_client: PipesLambdaClient):
229
"""
230
Invoke Lambda function via Pipes for serverless processing.
231
"""
232
return pipes_client.run(
233
context=context,
234
function_name="my-data-processor",
235
payload={
236
"input_bucket": "source-data",
237
"output_bucket": "processed-data"
238
}
239
)
240
241
@job(
242
resource_defs={
243
"pipes_lambda_client": lambda_pipes_client
244
}
245
)
246
def serverless_processing_job():
247
invoke_lambda_function()
248
249
defs = Definitions(jobs=[serverless_processing_job])
250
```