or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

pipes-orchestration.mddocs/

0

# Pipes External Process Orchestration

1

2

Orchestrate external processes running on various AWS services using Dagster's Pipes protocol for subprocess communication. This enables seamless integration with AWS compute services while maintaining observability and data flow.

3

4

## Capabilities

5

6

### Pipes Clients

7

8

Clients for orchestrating external processes on different AWS compute services.

9

10

```python { .api }

11

class PipesECSClient(PipesClient):

12

"""

13

Pipes client for running external processes on ECS.

14

"""

15

16

def __init__(

17

self,

18

client=None,

19

context_injector=None,

20

message_reader=None,

21

forward_termination=True

22

): ...

23

24

def run(

25

self,

26

context: OpExecutionContext,

27

extras: Optional[Dict] = None,

28

**kwargs

29

) -> PipesExecutionResult:

30

"""

31

Execute external process on ECS.

32

33

Parameters:

34

context: Dagster execution context

35

extras: Additional context for external process

36

**kwargs: ECS task configuration overrides

37

38

Returns:

39

PipesExecutionResult: Execution results

40

"""

41

42

class PipesLambdaClient(PipesClient):

43

"""

44

Pipes client for running external processes on Lambda.

45

"""

46

47

def run(

48

self,

49

function_name: str,

50

event: Dict,

51

context: OpExecutionContext,

52

**kwargs

53

) -> PipesExecutionResult:

54

"""

55

Execute external process on Lambda.

56

57

Parameters:

58

function_name: Lambda function name

59

event: Event payload for Lambda function

60

context: Dagster execution context

61

**kwargs: Additional Lambda invocation parameters

62

63

Returns:

64

PipesExecutionResult: Execution results

65

"""

66

67

class PipesGlueClient(PipesClient):

68

"""

69

Pipes client for running external processes on AWS Glue.

70

"""

71

72

class PipesEMRClient(PipesClient):

73

"""

74

Pipes client for running external processes on EMR.

75

"""

76

77

class PipesEMRContainersClient(PipesClient):

78

"""

79

Pipes client for running external processes on EMR on EKS.

80

"""

81

82

class PipesEMRServerlessClient(PipesClient):

83

"""

84

Pipes client for running external processes on EMR Serverless.

85

"""

86

```

87

88

### Context Injectors

89

90

Inject Dagster context into external processes running on AWS services.

91

92

```python { .api }

93

class PipesS3ContextInjector(PipesContextInjector):

94

"""

95

Inject context via S3 for external processes.

96

"""

97

98

def __init__(

99

self,

100

*,

101

bucket: str,

102

client

103

): ...

104

105

def inject_context(

106

self,

107

context: OpExecutionContext

108

) -> PipesContextData: ...

109

110

class PipesLambdaEventContextInjector(PipesContextInjector):

111

"""

112

Inject context via Lambda event payload.

113

"""

114

```

115

116

### Message Readers

117

118

Read messages and logs from external processes running on AWS services.

119

120

```python { .api }

121

class PipesS3MessageReader(PipesMessageReader):

122

"""

123

Read messages from S3 for external processes.

124

"""

125

126

def __init__(

127

self,

128

bucket: str,

129

key_prefix: str = "dagster-pipes",

130

**kwargs

131

): ...

132

133

class PipesS3LogReader(PipesLogReader):

134

"""

135

Read logs from S3 for external processes.

136

"""

137

138

class PipesCloudWatchMessageReader(PipesMessageReader):

139

"""

140

Read messages from CloudWatch logs.

141

"""

142

143

def __init__(

144

self,

145

log_group: str,

146

log_stream_prefix: str = "",

147

**kwargs

148

): ...

149

150

class PipesCloudWatchLogReader(PipesLogReader):

151

"""

152

Read logs from CloudWatch.

153

"""

154

155

class PipesLambdaLogsMessageReader(PipesMessageReader):

156

"""

157

Read messages from Lambda execution logs.

158

"""

159

```

160

161

## Usage Examples

162

163

### ECS Pipes Client

164

165

```python

166

from dagster import op, job, Definitions

167

from dagster_aws.pipes import (

168

PipesECSClient,

169

PipesS3ContextInjector,

170

PipesCloudWatchLogReader

171

)

172

173

ecs_pipes_client = PipesECSClient(

174

cluster="my-dagster-cluster",

175

task_definition="my-external-task",

176

subnets=["subnet-12345"],

177

security_group_ids=["sg-67890"],

178

context_injector=PipesS3ContextInjector(

179

bucket="my-pipes-bucket",

180

key_prefix="context"

181

),

182

message_reader=PipesCloudWatchLogReader(

183

log_group="/aws/ecs/my-external-task"

184

)

185

)

186

187

@op

188

def run_external_processing(context, pipes_client: PipesECSClient):

189

"""

190

Run external data processing on ECS via Pipes.

191

"""

192

return pipes_client.run(

193

context=context,

194

extras={"input_path": "s3://data-bucket/input/"},

195

task_overrides={

196

"cpu": "1024",

197

"memory": "2048"

198

}

199

)

200

201

@job(

202

resource_defs={

203

"pipes_ecs_client": ecs_pipes_client

204

}

205

)

206

def external_processing_job():

207

run_external_processing()

208

209

defs = Definitions(jobs=[external_processing_job])

210

```

211

212

### Lambda Pipes Client

213

214

```python

215

from dagster import op, job, Definitions

216

from dagster_aws.pipes import (

217

PipesLambdaClient,

218

PipesLambdaEventContextInjector,

219

PipesLambdaLogsMessageReader

220

)

221

222

lambda_pipes_client = PipesLambdaClient(

223

context_injector=PipesLambdaEventContextInjector(),

224

message_reader=PipesLambdaLogsMessageReader()

225

)

226

227

@op

228

def invoke_lambda_function(context, pipes_client: PipesLambdaClient):

229

"""

230

Invoke Lambda function via Pipes for serverless processing.

231

"""

232

return pipes_client.run(

233

context=context,

234

function_name="my-data-processor",

235

payload={

236

"input_bucket": "source-data",

237

"output_bucket": "processed-data"

238

}

239

)

240

241

@job(

242

resource_defs={

243

"pipes_lambda_client": lambda_pipes_client

244

}

245

)

246

def serverless_processing_job():

247

invoke_lambda_function()

248

249

defs = Definitions(jobs=[serverless_processing_job])

250

```