or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

emr-processing.mddocs/

0

# EMR Big Data Processing

1

2

Integration with Amazon EMR (Elastic MapReduce) for big data processing workflows. This module provides cluster management, PySpark step execution, job orchestration, and comprehensive EMR state monitoring for large-scale data processing tasks.

3

4

## Capabilities

5

6

### EMR Job Runner

7

8

Manage EMR clusters and execute big data processing jobs with comprehensive cluster lifecycle management.

9

10

```python { .api }

11

class EmrJobRunner:

12

"""

13

Manages EMR job execution and cluster operations.

14

"""

15

16

def __init__(

17

self,

18

region: str,

19

cluster_id: Optional[str] = None,

20

**kwargs

21

): ...

22

23

def run_job_flow(

24

self,

25

job_flow_overrides: Dict[str, Any] = None,

26

**kwargs

27

) -> str:

28

"""

29

Start a new EMR cluster and run job flow.

30

31

Parameters:

32

job_flow_overrides: Custom EMR cluster configuration

33

**kwargs: Additional EMR RunJobFlow parameters

34

35

Returns:

36

str: EMR cluster ID

37

"""

38

39

def add_job_flow_steps(

40

self,

41

cluster_id: str,

42

steps: List[Dict[str, Any]]

43

) -> List[str]:

44

"""

45

Add steps to an existing EMR cluster.

46

47

Parameters:

48

cluster_id: EMR cluster identifier

49

steps: List of EMR step configurations

50

51

Returns:

52

List[str]: List of step IDs

53

"""

54

55

def wait_for_completion(

56

self,

57

cluster_id: str,

58

timeout_seconds: int = 3600

59

) -> bool:

60

"""

61

Wait for EMR cluster to complete all steps.

62

63

Parameters:

64

cluster_id: EMR cluster identifier

65

timeout_seconds: Maximum wait time in seconds

66

67

Returns:

68

bool: True if completed successfully, False if timeout

69

"""

70

71

def terminate_cluster(self, cluster_id: str) -> bool:

72

"""

73

Terminate an EMR cluster.

74

75

Parameters:

76

cluster_id: EMR cluster identifier

77

78

Returns:

79

bool: True if termination initiated successfully

80

"""

81

82

def get_cluster_status(self, cluster_id: str) -> EmrClusterState:

83

"""

84

Get current status of EMR cluster.

85

86

Parameters:

87

cluster_id: EMR cluster identifier

88

89

Returns:

90

EmrClusterState: Current cluster state

91

"""

92

```

93

94

### EMR PySpark Step Launcher

95

96

Execute PySpark applications as EMR steps with automatic step configuration and monitoring.

97

98

```python { .api }

99

def emr_pyspark_step_launcher(

100

cluster_id: str,

101

s3_bucket: str,

102

deploy_local_pyspark_deps: bool = True,

103

staging_bucket: Optional[str] = None,

104

wait_for_logs: bool = True,

105

local_job_package_path: Optional[str] = None,

106

action_on_failure: str = "TERMINATE_CLUSTER",

107

spark_config: Optional[Dict[str, str]] = None,

108

region_name: Optional[str] = None,

109

**kwargs

110

) -> StepLauncherDefinition:

111

"""

112

Step launcher for executing PySpark applications on EMR.

113

114

Parameters:

115

cluster_id: EMR cluster ID to run the step on

116

s3_bucket: S3 bucket for staging PySpark dependencies

117

deploy_local_pyspark_deps: Whether to deploy local dependencies

118

staging_bucket: S3 bucket for staging job artifacts

119

wait_for_logs: Whether to wait for CloudWatch logs

120

local_job_package_path: Path to local job package

121

action_on_failure: Action to take on step failure

122

spark_config: Spark configuration parameters

123

region_name: AWS region name

124

**kwargs: Additional step launcher configuration

125

126

Returns:

127

StepLauncherDefinition: Configured EMR PySpark step launcher

128

"""

129

```

130

131

### EMR State Management

132

133

Enumerations and constants for managing EMR cluster and step lifecycles.

134

135

```python { .api }

136

class EmrClusterState(Enum):

137

"""

138

Enumeration of possible EMR cluster states.

139

"""

140

STARTING = "STARTING"

141

BOOTSTRAPPING = "BOOTSTRAPPING"

142

RUNNING = "RUNNING"

143

WAITING = "WAITING"

144

TERMINATING = "TERMINATING"

145

TERMINATED = "TERMINATED"

146

TERMINATED_WITH_ERRORS = "TERMINATED_WITH_ERRORS"

147

148

class EmrStepState(Enum):

149

"""

150

Enumeration of possible EMR step states.

151

"""

152

PENDING = "PENDING"

153

CANCEL_PENDING = "CANCEL_PENDING"

154

RUNNING = "RUNNING"

155

COMPLETED = "COMPLETED"

156

CANCELLED = "CANCELLED"

157

FAILED = "FAILED"

158

INTERRUPTED = "INTERRUPTED"

159

160

# Cluster state constants

161

EMR_CLUSTER_DONE_STATES: Set[EmrClusterState] = {

162

EmrClusterState.TERMINATED,

163

EmrClusterState.TERMINATED_WITH_ERRORS

164

}

165

166

EMR_CLUSTER_TERMINATED_STATES: Set[EmrClusterState] = {

167

EmrClusterState.TERMINATING,

168

EmrClusterState.TERMINATED,

169

EmrClusterState.TERMINATED_WITH_ERRORS

170

}

171

```

172

173

### EMR Exception Handling

174

175

Exception classes for EMR-specific error handling and operation failures.

176

177

```python { .api }

178

class EmrError(Exception):

179

"""

180

Exception raised for EMR-related errors.

181

182

Covers cluster failures, step failures, timeout errors,

183

and other EMR-specific operational issues.

184

"""

185

186

def __init__(self, message: str, cluster_id: Optional[str] = None): ...

187

```

188

189

## Usage Examples

190

191

### Basic EMR Cluster Management

192

193

```python

194

from dagster import op, job, Definitions

195

from dagster_aws.emr import EmrJobRunner, EmrError

196

197

@op

198

def create_emr_cluster():

199

"""Create and configure EMR cluster for big data processing."""

200

runner = EmrJobRunner(region="us-west-2")

201

202

job_flow_config = {

203

"Name": "Dagster EMR Cluster",

204

"ReleaseLabel": "emr-6.9.0",

205

"Instances": {

206

"InstanceGroups": [

207

{

208

"Name": "Master nodes",

209

"Market": "ON_DEMAND",

210

"InstanceRole": "MASTER",

211

"InstanceType": "m5.xlarge",

212

"InstanceCount": 1,

213

},

214

{

215

"Name": "Worker nodes",

216

"Market": "ON_DEMAND",

217

"InstanceRole": "CORE",

218

"InstanceType": "m5.xlarge",

219

"InstanceCount": 2,

220

}

221

],

222

"Ec2KeyName": "my-key-pair",

223

"KeepJobFlowAliveWhenNoSteps": True,

224

},

225

"Applications": [{"Name": "Spark"}, {"Name": "Hadoop"}],

226

"ServiceRole": "EMR_DefaultRole",

227

"JobFlowRole": "EMR_EC2_DefaultRole",

228

}

229

230

try:

231

cluster_id = runner.run_job_flow(job_flow_overrides=job_flow_config)

232

return cluster_id

233

except EmrError as e:

234

raise Exception(f"Failed to create EMR cluster: {e}")

235

236

@job

237

def emr_cluster_job():

238

create_emr_cluster()

239

240

defs = Definitions(jobs=[emr_cluster_job])

241

```

242

243

### PySpark Step Execution

244

245

```python

246

from dagster import op, job, Definitions

247

from dagster_aws.emr import emr_pyspark_step_launcher

248

249

# Configure PySpark step launcher

250

pyspark_launcher = emr_pyspark_step_launcher.configured({

251

"cluster_id": "j-XXXXXXXXXX", # Existing EMR cluster

252

"s3_bucket": "my-emr-bucket",

253

"deploy_local_pyspark_deps": True,

254

"wait_for_logs": True,

255

"spark_config": {

256

"spark.executor.memory": "4g",

257

"spark.executor.cores": "2",

258

"spark.default.parallelism": "100"

259

}

260

})

261

262

@op

263

def data_processing_step():

264

"""PySpark data processing operation."""

265

# This will be executed as a PySpark application on EMR

266

from pyspark.sql import SparkSession

267

268

spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

269

270

# Read data from S3

271

df = spark.read.parquet("s3://my-data-bucket/input/")

272

273

# Process data

274

processed_df = df.groupBy("category").sum("amount")

275

276

# Write results back to S3

277

processed_df.write.mode("overwrite").parquet("s3://my-data-bucket/output/")

278

279

spark.stop()

280

return "Processing complete"

281

282

@job(step_launcher_def=pyspark_launcher)

283

def pyspark_processing_job():

284

data_processing_step()

285

286

defs = Definitions(jobs=[pyspark_processing_job])

287

```

288

289

### Advanced EMR Workflow

290

291

```python

292

from dagster import op, job, Definitions, DependencyDefinition

293

from dagster_aws.emr import EmrJobRunner, EmrClusterState, EmrError

294

295

@op

296

def provision_emr_cluster():

297

"""Provision EMR cluster with custom configuration."""

298

runner = EmrJobRunner(region="us-east-1")

299

300

cluster_config = {

301

"Name": "Advanced Processing Cluster",

302

"ReleaseLabel": "emr-6.9.0",

303

"Instances": {

304

"InstanceFleets": [

305

{

306

"Name": "Master Fleet",

307

"InstanceFleetType": "MASTER",

308

"TargetOnDemandCapacity": 1,

309

"InstanceTypeConfigs": [

310

{

311

"InstanceType": "m5.2xlarge",

312

"EbsConfiguration": {

313

"EbsBlockDeviceConfigs": [

314

{

315

"VolumeSpecification": {

316

"VolumeType": "gp2",

317

"SizeInGB": 100

318

},

319

"VolumesPerInstance": 1

320

}

321

]

322

}

323

}

324

]

325

}

326

],

327

"Ec2SubnetId": "subnet-12345",

328

"EmrManagedMasterSecurityGroup": "sg-master",

329

"EmrManagedSlaveSecurityGroup": "sg-slave"

330

},

331

"Applications": [

332

{"Name": "Spark"},

333

{"Name": "Hadoop"},

334

{"Name": "Hive"}

335

],

336

"Configurations": [

337

{

338

"Classification": "spark-defaults",

339

"Properties": {

340

"spark.sql.adaptive.enabled": "true",

341

"spark.sql.adaptive.coalescePartitions.enabled": "true"

342

}

343

}

344

]

345

}

346

347

cluster_id = runner.run_job_flow(job_flow_overrides=cluster_config)

348

349

# Wait for cluster to be ready

350

if not runner.wait_for_completion(cluster_id, timeout_seconds=1800):

351

raise EmrError("Cluster provisioning timed out", cluster_id)

352

353

return cluster_id

354

355

@op

356

def run_data_processing(cluster_id: str):

357

"""Execute data processing steps on EMR cluster."""

358

runner = EmrJobRunner(region="us-east-1")

359

360

processing_steps = [

361

{

362

"Name": "Data Ingestion",

363

"ActionOnFailure": "CONTINUE",

364

"HadoopJarStep": {

365

"Jar": "command-runner.jar",

366

"Args": [

367

"spark-submit",

368

"--deploy-mode", "cluster",

369

"s3://my-scripts-bucket/ingest_data.py"

370

]

371

}

372

},

373

{

374

"Name": "Data Transformation",

375

"ActionOnFailure": "TERMINATE_CLUSTER",

376

"HadoopJarStep": {

377

"Jar": "command-runner.jar",

378

"Args": [

379

"spark-submit",

380

"--deploy-mode", "cluster",

381

"--conf", "spark.executor.instances=4",

382

"s3://my-scripts-bucket/transform_data.py"

383

]

384

}

385

}

386

]

387

388

step_ids = runner.add_job_flow_steps(cluster_id, processing_steps)

389

390

# Monitor step completion

391

if not runner.wait_for_completion(cluster_id, timeout_seconds=3600):

392

raise EmrError("Data processing steps timed out", cluster_id)

393

394

return step_ids

395

396

@op

397

def cleanup_cluster(cluster_id: str):

398

"""Terminate EMR cluster after processing."""

399

runner = EmrJobRunner(region="us-east-1")

400

401

if runner.terminate_cluster(cluster_id):

402

return f"Cluster {cluster_id} termination initiated"

403

else:

404

raise EmrError(f"Failed to terminate cluster {cluster_id}")

405

406

@job

407

def advanced_emr_workflow():

408

cluster_id = provision_emr_cluster()

409

step_ids = run_data_processing(cluster_id)

410

cleanup_cluster(cluster_id)

411

412

defs = Definitions(jobs=[advanced_emr_workflow])

413

```