or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

emr-clusters.mddocs/

0

# EMR Cluster Management

1

2

Amazon EMR (Elastic MapReduce) integration for big data processing and analytics workloads. Provides cluster lifecycle management, job execution, and monitoring capabilities for Hadoop, Spark, and other big data frameworks running on AWS.

3

4

## Capabilities

5

6

### EMR Hook

7

8

Core EMR client providing comprehensive cluster and job management functionality.

9

10

```python { .api }

11

class EmrHook(AwsBaseHook):

12

def __init__(self, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):

13

"""

14

Initialize EMR Hook.

15

16

Parameters:

17

- aws_conn_id: AWS connection ID

18

- emr_conn_id: EMR-specific connection ID

19

"""

20

21

def get_cluster_id_by_name(self, emr_cluster_name: str, cluster_states: list) -> str:

22

"""

23

Get cluster ID by name and states.

24

25

Parameters:

26

- emr_cluster_name: Name of the EMR cluster

27

- cluster_states: List of acceptable cluster states

28

29

Returns:

30

Cluster ID if found

31

"""

32

33

def create_job_flow(self, job_flow_overrides: dict = None, **kwargs) -> str:

34

"""

35

Create EMR cluster (job flow).

36

37

Parameters:

38

- job_flow_overrides: Configuration overrides for cluster creation

39

40

Returns:

41

Cluster ID

42

"""

43

44

def add_job_flow_steps(self, job_flow_id: str, steps: list, **kwargs) -> list:

45

"""

46

Add steps to running EMR cluster.

47

48

Parameters:

49

- job_flow_id: EMR cluster ID

50

- steps: List of step configurations

51

52

Returns:

53

List of step IDs

54

"""

55

56

def describe_cluster(self, job_flow_id: str) -> dict:

57

"""

58

Get EMR cluster details.

59

60

Parameters:

61

- job_flow_id: EMR cluster ID

62

63

Returns:

64

Cluster configuration and status

65

"""

66

67

def describe_step(self, job_flow_id: str, step_id: str) -> dict:

68

"""

69

Get EMR step details.

70

71

Parameters:

72

- job_flow_id: EMR cluster ID

73

- step_id: Step ID

74

75

Returns:

76

Step configuration and status

77

"""

78

79

def list_steps(self, job_flow_id: str, step_states: list = None, step_ids: list = None) -> list:

80

"""

81

List steps in EMR cluster.

82

83

Parameters:

84

- job_flow_id: EMR cluster ID

85

- step_states: Filter by step states

86

- step_ids: Filter by specific step IDs

87

88

Returns:

89

List of step details

90

"""

91

92

def terminate_job_flow(self, job_flow_id: str) -> None:

93

"""

94

Terminate EMR cluster.

95

96

Parameters:

97

- job_flow_id: EMR cluster ID

98

"""

99

100

def modify_cluster(self, job_flow_id: str, step_concurrency_level: int) -> None:

101

"""

102

Modify EMR cluster configuration.

103

104

Parameters:

105

- job_flow_id: EMR cluster ID

106

- step_concurrency_level: Number of concurrent steps

107

"""

108

109

def get_job_flow_state(self, job_flow_id: str) -> str:

110

"""

111

Get EMR cluster state.

112

113

Parameters:

114

- job_flow_id: EMR cluster ID

115

116

Returns:

117

Current cluster state

118

"""

119

120

def check_query_output(self, qubole_conn_id: str, command_id: str) -> str:

121

"""

122

Check query output status.

123

124

Parameters:

125

- qubole_conn_id: Qubole connection ID

126

- command_id: Command ID

127

128

Returns:

129

Query output status

130

"""

131

```

132

133

### EMR Operators

134

135

Task implementations for EMR cluster and job management operations.

136

137

```python { .api }

138

class EmrCreateJobFlowOperator(BaseOperator):

139

def __init__(self, job_flow_overrides: dict = None, aws_conn_id: str = 'aws_default', emr_conn_id: str = None, **kwargs):

140

"""

141

Create EMR cluster.

142

143

Parameters:

144

- job_flow_overrides: Cluster configuration overrides

145

- aws_conn_id: AWS connection ID

146

- emr_conn_id: EMR-specific connection ID

147

"""

148

149

class EmrTerminateJobFlowOperator(BaseOperator):

150

def __init__(self, job_flow_id: str, aws_conn_id: str = 'aws_default', **kwargs):

151

"""

152

Terminate EMR cluster.

153

154

Parameters:

155

- job_flow_id: EMR cluster ID

156

- aws_conn_id: AWS connection ID

157

"""

158

159

class EmrAddStepsOperator(BaseOperator):

160

def __init__(self, job_flow_id: str, steps: list = None, aws_conn_id: str = 'aws_default', **kwargs):

161

"""

162

Add steps to EMR cluster.

163

164

Parameters:

165

- job_flow_id: EMR cluster ID

166

- steps: List of step configurations

167

- aws_conn_id: AWS connection ID

168

"""

169

170

class EmrModifyClusterOperator(BaseOperator):

171

def __init__(self, job_flow_id: str, step_concurrency_level: int, aws_conn_id: str = 'aws_default', **kwargs):

172

"""

173

Modify EMR cluster configuration.

174

175

Parameters:

176

- job_flow_id: EMR cluster ID

177

- step_concurrency_level: Number of concurrent steps

178

- aws_conn_id: AWS connection ID

179

"""

180

181

class EmrContainerOperator(BaseOperator):

182

def __init__(self, name: str, virtual_cluster_id: str, execution_role_arn: str, release_label: str, job_driver: dict, configuration_overrides: dict = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, max_tries: int = None, **kwargs):

183

"""

184

Run job on EMR on EKS.

185

186

Parameters:

187

- name: Job name

188

- virtual_cluster_id: EMR on EKS virtual cluster ID

189

- execution_role_arn: IAM role ARN for job execution

190

- release_label: EMR release version

191

- job_driver: Job driver configuration

192

- configuration_overrides: Additional configuration overrides

193

- aws_conn_id: AWS connection ID

194

- poll_interval: Polling interval in seconds

195

- max_tries: Maximum number of polling attempts

196

"""

197

198

class EmrServerlessCreateApplicationOperator(BaseOperator):

199

def __init__(self, release_label: str, job_type: str, name: str = None, initial_capacity: dict = None, maximum_capacity: dict = None, tags: dict = None, aws_conn_id: str = 'aws_default', **kwargs):

200

"""

201

Create EMR Serverless application.

202

203

Parameters:

204

- release_label: EMR release version

205

- job_type: Type of job ('SPARK' or 'HIVE')

206

- name: Application name

207

- initial_capacity: Initial capacity configuration

208

- maximum_capacity: Maximum capacity configuration

209

- tags: Resource tags

210

- aws_conn_id: AWS connection ID

211

"""

212

213

class EmrServerlessStartJobOperator(BaseOperator):

214

def __init__(self, application_id: str, execution_role_arn: str, job_driver: dict, configuration_overrides: dict = None, name: str = None, tags: dict = None, aws_conn_id: str = 'aws_default', wait_for_completion: bool = True, **kwargs):

215

"""

216

Start EMR Serverless job.

217

218

Parameters:

219

- application_id: EMR Serverless application ID

220

- execution_role_arn: IAM role ARN for job execution

221

- job_driver: Job driver configuration

222

- configuration_overrides: Additional configuration overrides

223

- name: Job name

224

- tags: Job tags

225

- aws_conn_id: AWS connection ID

226

- wait_for_completion: Wait for job completion

227

"""

228

229

class EmrServerlessDeleteApplicationOperator(BaseOperator):

230

def __init__(self, application_id: str, aws_conn_id: str = 'aws_default', **kwargs):

231

"""

232

Delete EMR Serverless application.

233

234

Parameters:

235

- application_id: EMR Serverless application ID

236

- aws_conn_id: AWS connection ID

237

"""

238

```

239

240

### EMR Sensors

241

242

Monitoring tasks that wait for EMR cluster states and job completion.

243

244

```python { .api }

245

class EmrJobFlowSensor(BaseSensorOperator):

246

def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):

247

"""

248

Wait for EMR cluster to reach target state.

249

250

Parameters:

251

- job_flow_id: EMR cluster ID

252

- target_states: List of acceptable target states

253

- failed_states: List of states considered as failures

254

- aws_conn_id: AWS connection ID

255

"""

256

257

class EmrStepSensor(BaseSensorOperator):

258

def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', **kwargs):

259

"""

260

Wait for EMR step completion.

261

262

Parameters:

263

- job_flow_id: EMR cluster ID

264

- step_id: Step ID to monitor

265

- target_states: List of acceptable target states

266

- failed_states: List of states considered as failures

267

- aws_conn_id: AWS connection ID

268

"""

269

270

class EmrContainerSensor(BaseSensorOperator):

271

def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):

272

"""

273

Wait for EMR on EKS job completion.

274

275

Parameters:

276

- job_id: EMR on EKS job ID

277

- aws_conn_id: AWS connection ID

278

- poll_interval: Polling interval in seconds

279

"""

280

281

class EmrServerlessJobSensor(BaseSensorOperator):

282

def __init__(self, application_id: str, job_run_id: str, aws_conn_id: str = 'aws_default', **kwargs):

283

"""

284

Wait for EMR Serverless job completion.

285

286

Parameters:

287

- application_id: EMR Serverless application ID

288

- job_run_id: Job run ID

289

- aws_conn_id: AWS connection ID

290

"""

291

```

292

293

### EMR Triggers

294

295

Asynchronous triggers for efficient EMR monitoring without blocking workers.

296

297

```python { .api }

298

class EmrJobFlowTrigger(BaseTrigger):

299

def __init__(self, job_flow_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):

300

"""

301

Asynchronous trigger for EMR cluster state monitoring.

302

303

Parameters:

304

- job_flow_id: EMR cluster ID

305

- target_states: List of acceptable target states

306

- failed_states: List of states considered as failures

307

- aws_conn_id: AWS connection ID

308

- poll_interval: Polling interval in seconds

309

"""

310

311

class EmrStepTrigger(BaseTrigger):

312

def __init__(self, job_flow_id: str, step_id: str, target_states: list = None, failed_states: list = None, aws_conn_id: str = 'aws_default', poll_interval: int = 30, **kwargs):

313

"""

314

Asynchronous trigger for EMR step monitoring.

315

316

Parameters:

317

- job_flow_id: EMR cluster ID

318

- step_id: Step ID to monitor

319

- target_states: List of acceptable target states

320

- failed_states: List of states considered as failures

321

- aws_conn_id: AWS connection ID

322

- poll_interval: Polling interval in seconds

323

"""

324

325

class EmrContainerTrigger(BaseTrigger):

326

def __init__(self, job_id: str, aws_conn_id: str = 'aws_default', poll_interval: int = 60, **kwargs):

327

"""

328

Asynchronous trigger for EMR on EKS job monitoring.

329

330

Parameters:

331

- job_id: EMR on EKS job ID

332

- aws_conn_id: AWS connection ID

333

- poll_interval: Polling interval in seconds

334

"""

335

```

336

337

## Usage Examples

338

339

### Basic EMR Cluster Workflow

340

341

```python

342

from airflow import DAG

343

from airflow.providers.amazon.aws.operators.emr import (

344

EmrCreateJobFlowOperator,

345

EmrAddStepsOperator,

346

EmrTerminateJobFlowOperator

347

)

348

from airflow.providers.amazon.aws.sensors.emr import EmrJobFlowSensor, EmrStepSensor

349

350

dag = DAG('emr_spark_job', start_date=datetime(2023, 1, 1))

351

352

# Cluster configuration

353

JOB_FLOW_OVERRIDES = {

354

'Name': 'data-processing-cluster',

355

'ReleaseLabel': 'emr-6.10.0',

356

'Applications': [{'Name': 'Spark'}, {'Name': 'Hadoop'}],

357

'Instances': {

358

'InstanceGroups': [

359

{

360

'Name': 'Master nodes',

361

'Market': 'ON_DEMAND',

362

'InstanceRole': 'MASTER',

363

'InstanceType': 'm5.xlarge',

364

'InstanceCount': 1,

365

},

366

{

367

'Name': 'Core nodes',

368

'Market': 'ON_DEMAND',

369

'InstanceRole': 'CORE',

370

'InstanceType': 'm5.xlarge',

371

'InstanceCount': 2,

372

}

373

],

374

'Ec2KeyName': 'my-key-pair',

375

'KeepJobFlowAliveWhenNoSteps': False,

376

},

377

'JobFlowRole': 'EMR_EC2_DefaultRole',

378

'ServiceRole': 'EMR_DefaultRole',

379

'LogUri': 's3://my-emr-logs/',

380

}

381

382

# Spark job steps

383

SPARK_STEPS = [

384

{

385

'Name': 'Data Processing Job',

386

'ActionOnFailure': 'TERMINATE_CLUSTER',

387

'HadoopJarStep': {

388

'Jar': 'command-runner.jar',

389

'Args': [

390

'spark-submit',

391

'--class', 'com.example.DataProcessor',

392

's3://my-spark-jobs/data-processor.jar',

393

'--input', 's3://my-data/input/{{ ds }}/',

394

'--output', 's3://my-data/output/{{ ds }}/',

395

'--date', '{{ ds }}'

396

],

397

},

398

}

399

]

400

401

# Create cluster

402

create_cluster = EmrCreateJobFlowOperator(

403

task_id='create_cluster',

404

job_flow_overrides=JOB_FLOW_OVERRIDES,

405

dag=dag

406

)

407

408

# Wait for cluster to be ready

409

wait_for_cluster = EmrJobFlowSensor(

410

task_id='wait_for_cluster',

411

job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",

412

target_states=['WAITING'],

413

dag=dag

414

)

415

416

# Add processing steps

417

add_steps = EmrAddStepsOperator(

418

task_id='add_steps',

419

job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",

420

steps=SPARK_STEPS,

421

dag=dag

422

)

423

424

# Wait for steps to complete

425

wait_for_step = EmrStepSensor(

426

task_id='wait_for_step',

427

job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster', key='return_value') }}",

428

step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",

429

target_states=['COMPLETED'],

430

dag=dag

431

)

432

433

create_cluster >> wait_for_cluster >> add_steps >> wait_for_step

434

```

435

436

### EMR Serverless Job

437

438

```python

439

from airflow.providers.amazon.aws.operators.emr import (

440

EmrServerlessCreateApplicationOperator,

441

EmrServerlessStartJobOperator,

442

EmrServerlessDeleteApplicationOperator

443

)

444

445

# Create serverless application

446

create_app = EmrServerlessCreateApplicationOperator(

447

task_id='create_serverless_app',

448

release_label='emr-6.10.0',

449

job_type='SPARK',

450

name='data-processing-app',

451

initial_capacity={

452

'DRIVER': {

453

'workerCount': 1,

454

'workerConfiguration': {

455

'cpu': '2 vCPU',

456

'memory': '4 GB'

457

}

458

},

459

'EXECUTOR': {

460

'workerCount': 4,

461

'workerConfiguration': {

462

'cpu': '4 vCPU',

463

'memory': '8 GB'

464

}

465

}

466

},

467

maximum_capacity={

468

'DRIVER': {

469

'workerCount': 1,

470

'workerConfiguration': {

471

'cpu': '2 vCPU',

472

'memory': '4 GB'

473

}

474

},

475

'EXECUTOR': {

476

'workerCount': 10,

477

'workerConfiguration': {

478

'cpu': '4 vCPU',

479

'memory': '8 GB'

480

}

481

}

482

},

483

dag=dag

484

)

485

486

# Run Spark job

487

run_job = EmrServerlessStartJobOperator(

488

task_id='run_spark_job',

489

application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",

490

execution_role_arn='arn:aws:iam::123456789012:role/EMRServerlessExecutionRole',

491

job_driver={

492

'sparkSubmit': {

493

'entryPoint': 's3://my-spark-jobs/data-processor.py',

494

'entryPointArguments': [

495

'--input', 's3://my-data/input/{{ ds }}/',

496

'--output', 's3://my-data/output/{{ ds }}/'

497

],

498

'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true'

499

}

500

},

501

configuration_overrides={

502

'monitoringConfiguration': {

503

's3MonitoringConfiguration': {

504

'logUri': 's3://my-emr-serverless-logs/'

505

}

506

}

507

},

508

wait_for_completion=True,

509

dag=dag

510

)

511

512

# Clean up application

513

delete_app = EmrServerlessDeleteApplicationOperator(

514

task_id='delete_serverless_app',

515

application_id="{{ task_instance.xcom_pull(task_ids='create_serverless_app', key='application_id') }}",

516

dag=dag

517

)

518

519

create_app >> run_job >> delete_app

520

```

521

522

### EMR on EKS Job

523

524

```python

525

from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator

526

527

# Run job on EMR on EKS

528

emr_eks_job = EmrContainerOperator(

529

task_id='run_emr_eks_job',

530

name='data-processing-job',

531

virtual_cluster_id='abc123def456',

532

execution_role_arn='arn:aws:iam::123456789012:role/EMRContainersExecutionRole',

533

release_label='emr-6.10.0-latest',

534

job_driver={

535

'sparkSubmitJobDriver': {

536

'entryPoint': 's3://my-spark-jobs/data-processor.py',

537

'entryPointArguments': [

538

'--input-path', 's3://my-data/input/{{ ds }}/',

539

'--output-path', 's3://my-data/output/{{ ds }}/'

540

],

541

'sparkSubmitParameters': '--conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.coalescePartitions.enabled=true'

542

}

543

},

544

configuration_overrides={

545

'applicationConfiguration': [

546

{

547

'classification': 'spark-defaults',

548

'properties': {

549

'spark.sql.adaptive.enabled': 'true',

550

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

551

'spark.kubernetes.container.image': 'my-account.dkr.ecr.us-east-1.amazonaws.com/spark:latest'

552

}

553

}

554

],

555

'monitoringConfiguration': {

556

's3MonitoringConfiguration': {

557

'logUri': 's3://my-emr-eks-logs/'

558

}

559

}

560

},

561

dag=dag

562

)

563

```

564

565

## Types

566

567

```python { .api }

568

# EMR cluster states

569

class EmrClusterState:

570

STARTING = 'STARTING'

571

BOOTSTRAPPING = 'BOOTSTRAPPING'

572

RUNNING = 'RUNNING'

573

WAITING = 'WAITING'

574

TERMINATING = 'TERMINATING'

575

TERMINATED = 'TERMINATED'

576

TERMINATED_WITH_ERRORS = 'TERMINATED_WITH_ERRORS'

577

578

# EMR step states

579

class EmrStepState:

580

PENDING = 'PENDING'

581

CANCEL_PENDING = 'CANCEL_PENDING'

582

RUNNING = 'RUNNING'

583

COMPLETED = 'COMPLETED'

584

CANCELLED = 'CANCELLED'

585

FAILED = 'FAILED'

586

INTERRUPTED = 'INTERRUPTED'

587

588

# EMR instance types

589

class EmrInstanceType:

590

M5_LARGE = 'm5.large'

591

M5_XLARGE = 'm5.xlarge'

592

M5_2XLARGE = 'm5.2xlarge'

593

M5_4XLARGE = 'm5.4xlarge'

594

M5_8XLARGE = 'm5.8xlarge'

595

C5_LARGE = 'c5.large'

596

C5_XLARGE = 'c5.xlarge'

597

R5_LARGE = 'r5.large'

598

R5_XLARGE = 'r5.xlarge'

599

600

# Job flow configuration

601

class JobFlowConfig:

602

name: str

603

release_label: str

604

applications: list

605

instances: dict

606

steps: list = None

607

bootstrap_actions: list = None

608

configurations: list = None

609

service_role: str

610

job_flow_role: str

611

security_configuration: str = None

612

auto_scaling_role: str = None

613

scale_down_behavior: str = None

614

custom_ami_id: str = None

615

ebs_root_volume_size: int = None

616

repo_upgrade_on_boot: str = None

617

kerberos_attributes: dict = None

618

step_concurrency_level: int = 1

619

managed_scaling_policy: dict = None

620

placement_group_configs: list = None

621

auto_termination_policy: dict = None

622

os_release_label: str = None

623

log_uri: str = None

624

log_encryption_kms_key_id: str = None

625

additional_info: str = None

626

tags: list = None

627

628

# Instance group configuration

629

class InstanceGroupConfig:

630

name: str

631

instance_role: str # 'MASTER', 'CORE', 'TASK'

632

instance_type: str

633

instance_count: int

634

market: str = 'ON_DEMAND' # 'ON_DEMAND', 'SPOT'

635

bid_price: str = None

636

ebs_configuration: dict = None

637

auto_scaling_policy: dict = None

638

configurations: list = None

639

custom_ami_id: str = None

640

641

# EMR step configuration

642

class StepConfig:

643

name: str

644

action_on_failure: str # 'TERMINATE_JOB_FLOW', 'TERMINATE_CLUSTER', 'CANCEL_AND_WAIT', 'CONTINUE'

645

hadoop_jar_step: dict

646

647

# EMR Serverless configuration

648

class EmrServerlessConfig:

649

release_label: str

650

job_type: str # 'SPARK', 'HIVE'

651

name: str = None

652

initial_capacity: dict = None

653

maximum_capacity: dict = None

654

auto_start_configuration: dict = None

655

auto_stop_configuration: dict = None

656

network_configuration: dict = None

657

tags: dict = None

658

659

# Worker configuration for EMR Serverless

660

class WorkerConfiguration:

661

cpu: str # e.g., '2 vCPU', '4 vCPU'

662

memory: str # e.g., '4 GB', '8 GB'

663

disk: str = None # e.g., '20 GB'

664

```