CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-aws

Package for AWS-specific Dagster framework solid and resource components.

Pending
Overview
Eval results
Files

emr-processing.mddocs/

EMR Big Data Processing

Integration with Amazon EMR (Elastic MapReduce) for big data processing workflows. This module provides cluster management, PySpark step execution, job orchestration, and comprehensive EMR state monitoring for large-scale data processing tasks.

Capabilities

EMR Job Runner

Manage EMR clusters and execute big data processing jobs with comprehensive cluster lifecycle management.

class EmrJobRunner:
    """
    Manages EMR job execution and cluster operations.
    """
    
    def __init__(
        self,
        region: str,
        cluster_id: Optional[str] = None,
        **kwargs
    ): ...
    
    def run_job_flow(
        self,
        job_flow_overrides: Dict[str, Any] = None,
        **kwargs
    ) -> str:
        """
        Start a new EMR cluster and run job flow.
        
        Parameters:
            job_flow_overrides: Custom EMR cluster configuration
            **kwargs: Additional EMR RunJobFlow parameters
            
        Returns:
            str: EMR cluster ID
        """
    
    def add_job_flow_steps(
        self,
        cluster_id: str, 
        steps: List[Dict[str, Any]]
    ) -> List[str]:
        """
        Add steps to an existing EMR cluster.
        
        Parameters:
            cluster_id: EMR cluster identifier
            steps: List of EMR step configurations
            
        Returns:
            List[str]: List of step IDs
        """
    
    def wait_for_completion(
        self,
        cluster_id: str,
        timeout_seconds: int = 3600
    ) -> bool:
        """
        Wait for EMR cluster to complete all steps.
        
        Parameters:
            cluster_id: EMR cluster identifier
            timeout_seconds: Maximum wait time in seconds
            
        Returns:
            bool: True if completed successfully, False if timeout
        """
    
    def terminate_cluster(self, cluster_id: str) -> bool:
        """
        Terminate an EMR cluster.
        
        Parameters:
            cluster_id: EMR cluster identifier
            
        Returns:
            bool: True if termination initiated successfully
        """
    
    def get_cluster_status(self, cluster_id: str) -> EmrClusterState:
        """
        Get current status of EMR cluster.
        
        Parameters:
            cluster_id: EMR cluster identifier
            
        Returns:
            EmrClusterState: Current cluster state
        """

EMR PySpark Step Launcher

Execute PySpark applications as EMR steps with automatic step configuration and monitoring.

def emr_pyspark_step_launcher(
    cluster_id: str,
    s3_bucket: str,
    deploy_local_pyspark_deps: bool = True,
    staging_bucket: Optional[str] = None,
    wait_for_logs: bool = True,
    local_job_package_path: Optional[str] = None,
    action_on_failure: str = "TERMINATE_CLUSTER",
    spark_config: Optional[Dict[str, str]] = None,
    region_name: Optional[str] = None,
    **kwargs
) -> StepLauncherDefinition:
    """
    Step launcher for executing PySpark applications on EMR.
    
    Parameters:
        cluster_id: EMR cluster ID to run the step on
        s3_bucket: S3 bucket for staging PySpark dependencies
        deploy_local_pyspark_deps: Whether to deploy local dependencies
        staging_bucket: S3 bucket for staging job artifacts  
        wait_for_logs: Whether to wait for CloudWatch logs
        local_job_package_path: Path to local job package
        action_on_failure: Action to take on step failure
        spark_config: Spark configuration parameters
        region_name: AWS region name
        **kwargs: Additional step launcher configuration
        
    Returns:
        StepLauncherDefinition: Configured EMR PySpark step launcher
    """

EMR State Management

Enumerations and constants for managing EMR cluster and step lifecycles.

class EmrClusterState(Enum):
    """
    Enumeration of possible EMR cluster states.
    """
    STARTING = "STARTING"
    BOOTSTRAPPING = "BOOTSTRAPPING" 
    RUNNING = "RUNNING"
    WAITING = "WAITING"
    TERMINATING = "TERMINATING"
    TERMINATED = "TERMINATED"
    TERMINATED_WITH_ERRORS = "TERMINATED_WITH_ERRORS"

class EmrStepState(Enum):
    """
    Enumeration of possible EMR step states.
    """
    PENDING = "PENDING"
    CANCEL_PENDING = "CANCEL_PENDING"
    RUNNING = "RUNNING"  
    COMPLETED = "COMPLETED"
    CANCELLED = "CANCELLED"
    FAILED = "FAILED"
    INTERRUPTED = "INTERRUPTED"

# Cluster state constants
EMR_CLUSTER_DONE_STATES: Set[EmrClusterState] = {
    EmrClusterState.TERMINATED,
    EmrClusterState.TERMINATED_WITH_ERRORS
}

EMR_CLUSTER_TERMINATED_STATES: Set[EmrClusterState] = {
    EmrClusterState.TERMINATING,
    EmrClusterState.TERMINATED,
    EmrClusterState.TERMINATED_WITH_ERRORS
}

EMR Exception Handling

Exception classes for EMR-specific error handling and operation failures.

class EmrError(Exception):
    """
    Exception raised for EMR-related errors.
    
    Covers cluster failures, step failures, timeout errors,
    and other EMR-specific operational issues.
    """
    
    def __init__(self, message: str, cluster_id: Optional[str] = None): ...

Usage Examples

Basic EMR Cluster Management

from dagster import op, job, Definitions
from dagster_aws.emr import EmrJobRunner, EmrError

@op
def create_emr_cluster():
    """Create and configure EMR cluster for big data processing."""
    runner = EmrJobRunner(region="us-west-2")
    
    job_flow_config = {
        "Name": "Dagster EMR Cluster",
        "ReleaseLabel": "emr-6.9.0",
        "Instances": {
            "InstanceGroups": [
                {
                    "Name": "Master nodes",
                    "Market": "ON_DEMAND",
                    "InstanceRole": "MASTER",
                    "InstanceType": "m5.xlarge",
                    "InstanceCount": 1,
                },
                {
                    "Name": "Worker nodes", 
                    "Market": "ON_DEMAND",
                    "InstanceRole": "CORE",
                    "InstanceType": "m5.xlarge", 
                    "InstanceCount": 2,
                }
            ],
            "Ec2KeyName": "my-key-pair",
            "KeepJobFlowAliveWhenNoSteps": True,
        },
        "Applications": [{"Name": "Spark"}, {"Name": "Hadoop"}],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
    }
    
    try:
        cluster_id = runner.run_job_flow(job_flow_overrides=job_flow_config)
        return cluster_id
    except EmrError as e:
        raise Exception(f"Failed to create EMR cluster: {e}")

@job
def emr_cluster_job():
    create_emr_cluster()
    
defs = Definitions(jobs=[emr_cluster_job])

PySpark Step Execution

from dagster import op, job, Definitions
from dagster_aws.emr import emr_pyspark_step_launcher

# Configure PySpark step launcher
pyspark_launcher = emr_pyspark_step_launcher.configured({
    "cluster_id": "j-XXXXXXXXXX",  # Existing EMR cluster
    "s3_bucket": "my-emr-bucket",
    "deploy_local_pyspark_deps": True,
    "wait_for_logs": True,
    "spark_config": {
        "spark.executor.memory": "4g",
        "spark.executor.cores": "2",
        "spark.default.parallelism": "100"
    }
})

@op
def data_processing_step():
    """PySpark data processing operation."""
    # This will be executed as a PySpark application on EMR
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("DataProcessing").getOrCreate()
    
    # Read data from S3
    df = spark.read.parquet("s3://my-data-bucket/input/")
    
    # Process data
    processed_df = df.groupBy("category").sum("amount")
    
    # Write results back to S3
    processed_df.write.mode("overwrite").parquet("s3://my-data-bucket/output/")
    
    spark.stop()
    return "Processing complete"

@job(step_launcher_def=pyspark_launcher)
def pyspark_processing_job():
    data_processing_step()

defs = Definitions(jobs=[pyspark_processing_job])

Advanced EMR Workflow

from dagster import op, job, Definitions, DependencyDefinition
from dagster_aws.emr import EmrJobRunner, EmrClusterState, EmrError

@op
def provision_emr_cluster():
    """Provision EMR cluster with custom configuration."""
    runner = EmrJobRunner(region="us-east-1")
    
    cluster_config = {
        "Name": "Advanced Processing Cluster",
        "ReleaseLabel": "emr-6.9.0", 
        "Instances": {
            "InstanceFleets": [
                {
                    "Name": "Master Fleet",
                    "InstanceFleetType": "MASTER",
                    "TargetOnDemandCapacity": 1,
                    "InstanceTypeConfigs": [
                        {
                            "InstanceType": "m5.2xlarge",
                            "EbsConfiguration": {
                                "EbsBlockDeviceConfigs": [
                                    {
                                        "VolumeSpecification": {
                                            "VolumeType": "gp2",
                                            "SizeInGB": 100
                                        },
                                        "VolumesPerInstance": 1
                                    }
                                ]
                            }
                        }
                    ]
                }
            ],
            "Ec2SubnetId": "subnet-12345",
            "EmrManagedMasterSecurityGroup": "sg-master",
            "EmrManagedSlaveSecurityGroup": "sg-slave"
        },
        "Applications": [
            {"Name": "Spark"},
            {"Name": "Hadoop"},
            {"Name": "Hive"}
        ],
        "Configurations": [
            {
                "Classification": "spark-defaults",
                "Properties": {
                    "spark.sql.adaptive.enabled": "true",
                    "spark.sql.adaptive.coalescePartitions.enabled": "true"
                }
            }
        ]
    }
    
    cluster_id = runner.run_job_flow(job_flow_overrides=cluster_config)
    
    # Wait for cluster to be ready
    if not runner.wait_for_completion(cluster_id, timeout_seconds=1800):
        raise EmrError("Cluster provisioning timed out", cluster_id)
        
    return cluster_id

@op
def run_data_processing(cluster_id: str):
    """Execute data processing steps on EMR cluster."""
    runner = EmrJobRunner(region="us-east-1")
    
    processing_steps = [
        {
            "Name": "Data Ingestion",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--deploy-mode", "cluster",
                    "s3://my-scripts-bucket/ingest_data.py"
                ]
            }
        },
        {
            "Name": "Data Transformation", 
            "ActionOnFailure": "TERMINATE_CLUSTER",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--deploy-mode", "cluster", 
                    "--conf", "spark.executor.instances=4",
                    "s3://my-scripts-bucket/transform_data.py"
                ]
            }
        }
    ]
    
    step_ids = runner.add_job_flow_steps(cluster_id, processing_steps)
    
    # Monitor step completion
    if not runner.wait_for_completion(cluster_id, timeout_seconds=3600):
        raise EmrError("Data processing steps timed out", cluster_id)
        
    return step_ids

@op
def cleanup_cluster(cluster_id: str):
    """Terminate EMR cluster after processing."""
    runner = EmrJobRunner(region="us-east-1")
    
    if runner.terminate_cluster(cluster_id):
        return f"Cluster {cluster_id} termination initiated"
    else:
        raise EmrError(f"Failed to terminate cluster {cluster_id}")

@job
def advanced_emr_workflow():
    cluster_id = provision_emr_cluster()
    step_ids = run_data_processing(cluster_id)
    cleanup_cluster(cluster_id)

defs = Definitions(jobs=[advanced_emr_workflow])

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-aws

docs

athena-queries.md

cloudwatch-logging.md

ecr-integration.md

ecs-orchestration.md

emr-processing.md

index.md

parameter-store.md

pipes-orchestration.md

rds-operations.md

redshift-integration.md

s3-storage.md

secrets-management.md

tile.json