Package for AWS-specific Dagster framework solid and resource components.
—
Integration with Amazon EMR (Elastic MapReduce) for big data processing workflows. This module provides cluster management, PySpark step execution, job orchestration, and comprehensive EMR state monitoring for large-scale data processing tasks.
Manage EMR clusters and execute big data processing jobs with comprehensive cluster lifecycle management.
class EmrJobRunner:
"""
Manages EMR job execution and cluster operations.
"""
def __init__(
self,
region: str,
cluster_id: Optional[str] = None,
**kwargs
): ...
def run_job_flow(
self,
job_flow_overrides: Dict[str, Any] = None,
**kwargs
) -> str:
"""
Start a new EMR cluster and run job flow.
Parameters:
job_flow_overrides: Custom EMR cluster configuration
**kwargs: Additional EMR RunJobFlow parameters
Returns:
str: EMR cluster ID
"""
def add_job_flow_steps(
self,
cluster_id: str,
steps: List[Dict[str, Any]]
) -> List[str]:
"""
Add steps to an existing EMR cluster.
Parameters:
cluster_id: EMR cluster identifier
steps: List of EMR step configurations
Returns:
List[str]: List of step IDs
"""
def wait_for_completion(
self,
cluster_id: str,
timeout_seconds: int = 3600
) -> bool:
"""
Wait for EMR cluster to complete all steps.
Parameters:
cluster_id: EMR cluster identifier
timeout_seconds: Maximum wait time in seconds
Returns:
bool: True if completed successfully, False if timeout
"""
def terminate_cluster(self, cluster_id: str) -> bool:
"""
Terminate an EMR cluster.
Parameters:
cluster_id: EMR cluster identifier
Returns:
bool: True if termination initiated successfully
"""
def get_cluster_status(self, cluster_id: str) -> EmrClusterState:
"""
Get current status of EMR cluster.
Parameters:
cluster_id: EMR cluster identifier
Returns:
EmrClusterState: Current cluster state
"""Execute PySpark applications as EMR steps with automatic step configuration and monitoring.
def emr_pyspark_step_launcher(
cluster_id: str,
s3_bucket: str,
deploy_local_pyspark_deps: bool = True,
staging_bucket: Optional[str] = None,
wait_for_logs: bool = True,
local_job_package_path: Optional[str] = None,
action_on_failure: str = "TERMINATE_CLUSTER",
spark_config: Optional[Dict[str, str]] = None,
region_name: Optional[str] = None,
**kwargs
) -> StepLauncherDefinition:
"""
Step launcher for executing PySpark applications on EMR.
Parameters:
cluster_id: EMR cluster ID to run the step on
s3_bucket: S3 bucket for staging PySpark dependencies
deploy_local_pyspark_deps: Whether to deploy local dependencies
staging_bucket: S3 bucket for staging job artifacts
wait_for_logs: Whether to wait for CloudWatch logs
local_job_package_path: Path to local job package
action_on_failure: Action to take on step failure
spark_config: Spark configuration parameters
region_name: AWS region name
**kwargs: Additional step launcher configuration
Returns:
StepLauncherDefinition: Configured EMR PySpark step launcher
"""Enumerations and constants for managing EMR cluster and step lifecycles.
class EmrClusterState(Enum):
"""
Enumeration of possible EMR cluster states.
"""
STARTING = "STARTING"
BOOTSTRAPPING = "BOOTSTRAPPING"
RUNNING = "RUNNING"
WAITING = "WAITING"
TERMINATING = "TERMINATING"
TERMINATED = "TERMINATED"
TERMINATED_WITH_ERRORS = "TERMINATED_WITH_ERRORS"
class EmrStepState(Enum):
"""
Enumeration of possible EMR step states.
"""
PENDING = "PENDING"
CANCEL_PENDING = "CANCEL_PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
CANCELLED = "CANCELLED"
FAILED = "FAILED"
INTERRUPTED = "INTERRUPTED"
# Cluster state constants
EMR_CLUSTER_DONE_STATES: Set[EmrClusterState] = {
EmrClusterState.TERMINATED,
EmrClusterState.TERMINATED_WITH_ERRORS
}
EMR_CLUSTER_TERMINATED_STATES: Set[EmrClusterState] = {
EmrClusterState.TERMINATING,
EmrClusterState.TERMINATED,
EmrClusterState.TERMINATED_WITH_ERRORS
}Exception classes for EMR-specific error handling and operation failures.
class EmrError(Exception):
"""
Exception raised for EMR-related errors.
Covers cluster failures, step failures, timeout errors,
and other EMR-specific operational issues.
"""
def __init__(self, message: str, cluster_id: Optional[str] = None): ...from dagster import op, job, Definitions
from dagster_aws.emr import EmrJobRunner, EmrError
@op
def create_emr_cluster():
"""Create and configure EMR cluster for big data processing."""
runner = EmrJobRunner(region="us-west-2")
job_flow_config = {
"Name": "Dagster EMR Cluster",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceGroups": [
{
"Name": "Master nodes",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
{
"Name": "Worker nodes",
"Market": "ON_DEMAND",
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2,
}
],
"Ec2KeyName": "my-key-pair",
"KeepJobFlowAliveWhenNoSteps": True,
},
"Applications": [{"Name": "Spark"}, {"Name": "Hadoop"}],
"ServiceRole": "EMR_DefaultRole",
"JobFlowRole": "EMR_EC2_DefaultRole",
}
try:
cluster_id = runner.run_job_flow(job_flow_overrides=job_flow_config)
return cluster_id
except EmrError as e:
raise Exception(f"Failed to create EMR cluster: {e}")
@job
def emr_cluster_job():
create_emr_cluster()
defs = Definitions(jobs=[emr_cluster_job])from dagster import op, job, Definitions
from dagster_aws.emr import emr_pyspark_step_launcher
# Configure PySpark step launcher
pyspark_launcher = emr_pyspark_step_launcher.configured({
"cluster_id": "j-XXXXXXXXXX", # Existing EMR cluster
"s3_bucket": "my-emr-bucket",
"deploy_local_pyspark_deps": True,
"wait_for_logs": True,
"spark_config": {
"spark.executor.memory": "4g",
"spark.executor.cores": "2",
"spark.default.parallelism": "100"
}
})
@op
def data_processing_step():
"""PySpark data processing operation."""
# This will be executed as a PySpark application on EMR
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
# Read data from S3
df = spark.read.parquet("s3://my-data-bucket/input/")
# Process data
processed_df = df.groupBy("category").sum("amount")
# Write results back to S3
processed_df.write.mode("overwrite").parquet("s3://my-data-bucket/output/")
spark.stop()
return "Processing complete"
@job(step_launcher_def=pyspark_launcher)
def pyspark_processing_job():
data_processing_step()
defs = Definitions(jobs=[pyspark_processing_job])from dagster import op, job, Definitions, DependencyDefinition
from dagster_aws.emr import EmrJobRunner, EmrClusterState, EmrError
@op
def provision_emr_cluster():
"""Provision EMR cluster with custom configuration."""
runner = EmrJobRunner(region="us-east-1")
cluster_config = {
"Name": "Advanced Processing Cluster",
"ReleaseLabel": "emr-6.9.0",
"Instances": {
"InstanceFleets": [
{
"Name": "Master Fleet",
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [
{
"InstanceType": "m5.2xlarge",
"EbsConfiguration": {
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification": {
"VolumeType": "gp2",
"SizeInGB": 100
},
"VolumesPerInstance": 1
}
]
}
}
]
}
],
"Ec2SubnetId": "subnet-12345",
"EmrManagedMasterSecurityGroup": "sg-master",
"EmrManagedSlaveSecurityGroup": "sg-slave"
},
"Applications": [
{"Name": "Spark"},
{"Name": "Hadoop"},
{"Name": "Hive"}
],
"Configurations": [
{
"Classification": "spark-defaults",
"Properties": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
]
}
cluster_id = runner.run_job_flow(job_flow_overrides=cluster_config)
# Wait for cluster to be ready
if not runner.wait_for_completion(cluster_id, timeout_seconds=1800):
raise EmrError("Cluster provisioning timed out", cluster_id)
return cluster_id
@op
def run_data_processing(cluster_id: str):
"""Execute data processing steps on EMR cluster."""
runner = EmrJobRunner(region="us-east-1")
processing_steps = [
{
"Name": "Data Ingestion",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"s3://my-scripts-bucket/ingest_data.py"
]
}
},
{
"Name": "Data Transformation",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode", "cluster",
"--conf", "spark.executor.instances=4",
"s3://my-scripts-bucket/transform_data.py"
]
}
}
]
step_ids = runner.add_job_flow_steps(cluster_id, processing_steps)
# Monitor step completion
if not runner.wait_for_completion(cluster_id, timeout_seconds=3600):
raise EmrError("Data processing steps timed out", cluster_id)
return step_ids
@op
def cleanup_cluster(cluster_id: str):
"""Terminate EMR cluster after processing."""
runner = EmrJobRunner(region="us-east-1")
if runner.terminate_cluster(cluster_id):
return f"Cluster {cluster_id} termination initiated"
else:
raise EmrError(f"Failed to terminate cluster {cluster_id}")
@job
def advanced_emr_workflow():
cluster_id = provision_emr_cluster()
step_ids = run_data_processing(cluster_id)
cleanup_cluster(cluster_id)
defs = Definitions(jobs=[advanced_emr_workflow])Install with Tessl CLI
npx tessl i tessl/pypi-dagster-aws