CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-amazon

Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

batch-processing.mddocs/

AWS Batch Processing

Amazon Batch enables efficient execution of containerized jobs and workloads at scale. This service automatically provisions the optimal quantity and type of compute resources based on workload requirements, supporting both managed and self-managed compute environments.

Capabilities

Job Execution and Management

Execute containerized batch jobs with comprehensive lifecycle management, monitoring, and logging integration.

class BatchOperator(AwsBaseOperator):
    """
    Execute a job on AWS Batch.
    
    Parameters:
    - job_name: str - the name for the job that will run on AWS Batch
    - job_definition: str - the job definition name on AWS Batch
    - job_queue: str - the queue name on AWS Batch
    - container_overrides: dict - the containerOverrides parameter for boto3
    - ecs_properties_override: dict - the ecsPropertiesOverride parameter for boto3
    - eks_properties_override: dict - the eksPropertiesOverride parameter for boto3
    - node_overrides: dict - the nodeOverrides parameter for boto3
    - share_identifier: str - the share identifier for the job
    - scheduling_priority_override: int - the scheduling priority for the job
    - array_properties: dict - the arrayProperties parameter for boto3
    - parameters: dict - the parameters for boto3
    - job_id: str - the job ID (usually None until submit_job)
    - max_retries: int - exponential back-off retries (default: 4200)
    - status_retries: int - HTTP retries to get job status (default: 10)
    - tags: dict - collection of tags to apply to the job submission
    - deferrable: bool - run operator in deferrable mode
    - awslogs_enabled: bool - whether CloudWatch logs should be printed
    - awslogs_fetch_interval: int - interval for fetching CloudWatch logs (default: 30)
    - poll_interval: int - time in seconds between polling (deferrable mode)
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Job ID from AWS Batch
    """
    def __init__(
        self,
        job_name: str,
        job_definition: str, 
        job_queue: str,
        container_overrides: dict = None,
        ecs_properties_override: dict = None,
        eks_properties_override: dict = None,
        node_overrides: dict = None,
        share_identifier: str = None,
        scheduling_priority_override: int = None,
        array_properties: dict = None,
        parameters: dict = None,
        job_id: str = None,
        max_retries: int = 4200,
        status_retries: int = 10,
        tags: dict = None,
        deferrable: bool = False,
        awslogs_enabled: bool = False,
        awslogs_fetch_interval: int = 30,
        poll_interval: int = 30,
        **kwargs
    ): ...

Job Status Monitoring

Monitor and wait for AWS Batch job completion with configurable polling and retry logic.

class BatchSensor(BaseSensorOperator):
    """
    Wait for AWS Batch job to complete.
    
    Parameters:
    - job_id: str - the job ID to monitor
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - poke_interval: int - time in seconds between checks
    - timeout: int - maximum time to wait
    - mode: str - sensor mode ('poke' or 'reschedule')
    
    Returns:
    bool: True when job completes successfully
    """
    def __init__(
        self,
        job_id: str,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        **kwargs
    ): ...

Compute Environment Management

Create and manage AWS Batch compute environments with support for both managed and unmanaged configurations.

class BatchCreateComputeEnvironmentOperator(AwsBaseOperator):
    """
    Create an AWS Batch compute environment.
    
    Parameters:
    - compute_environment_name: str - name of the compute environment
    - environment_type: str - type of environment ('MANAGED' or 'UNMANAGED')
    - state: str - desired state of the compute environment
    - compute_resources: dict - compute resource configuration
    - service_role: str - service role ARN for Batch
    - tags: dict - tags to apply to the compute environment
    - deferrable: bool - run operator in deferrable mode
    - poll_interval: int - polling interval for deferrable mode
    - max_retries: int - maximum number of retries
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Compute environment ARN
    """
    def __init__(
        self,
        compute_environment_name: str,
        environment_type: str,
        state: str = 'ENABLED',
        compute_resources: dict = None,
        service_role: str = None,
        tags: dict = None,
        deferrable: bool = False,
        poll_interval: int = 30,
        max_retries: int = 10,
        **kwargs
    ): ...

Job Queue Management

Manage AWS Batch job queues with priority settings and compute environment associations.

class BatchCreateJobQueueOperator(AwsBaseOperator):
    """
    Create an AWS Batch job queue.
    
    Parameters:
    - job_queue_name: str - name of the job queue
    - state: str - desired state of the job queue
    - priority: int - priority of the job queue
    - compute_environment_order: list - list of compute environments
    - tags: dict - tags to apply to the job queue
    - aws_conn_id: str - Airflow connection for AWS credentials
    
    Returns:
    str: Job queue ARN
    """
    def __init__(
        self,
        job_queue_name: str,
        state: str = 'ENABLED',
        priority: int = 1,
        compute_environment_order: list = None,
        tags: dict = None,
        **kwargs
    ): ...

Low-Level Client Operations

Direct access to AWS Batch client for advanced operations and custom integrations.

class BatchClientHook(AwsBaseHook):
    """
    Hook for AWS Batch Client.
    
    Parameters:
    - aws_conn_id: str - Airflow connection for AWS credentials
    - region_name: str - AWS region name
    - verify: bool - whether to verify SSL certificates
    """
    def __init__(
        self,
        aws_conn_id: str = 'aws_default',
        region_name: str = None,
        verify: bool = None,
        **kwargs
    ): ...
    
    def submit_job(
        self,
        jobName: str,
        jobQueue: str,
        jobDefinition: str,
        arrayProperties: dict = None,
        parameters: dict = None,
        containerOverrides: dict = None,
        dependsOn: list = None,
        jobId: str = None,
        timeout: dict = None,
        tags: dict = None,
        **kwargs
    ) -> str:
        """
        Submit a job to AWS Batch.
        
        Returns:
        str: Job ID
        """
        ...
    
    def describe_jobs(self, jobs: list[str]) -> dict:
        """Get job descriptions from AWS Batch."""
        ...
    
    def list_jobs(
        self,
        jobQueue: str = None,
        arrayJobId: str = None,
        multiNodeJobId: str = None,
        jobStatus: str = None,
        maxResults: int = None,
        nextToken: str = None
    ) -> dict:
        """List AWS Batch jobs."""
        ...
    
    def cancel_job(self, jobId: str, reason: str) -> dict:
        """Cancel an AWS Batch job."""
        ...
    
    def terminate_job(self, jobId: str, reason: str) -> dict:
        """Terminate an AWS Batch job."""
        ...

Usage Examples

Basic Job Execution

from airflow.providers.amazon.aws.operators.batch import BatchOperator

# Submit a simple batch job
batch_job = BatchOperator(
    task_id='run_batch_job',
    job_name='my-data-processing-job',
    job_definition='my-job-def',
    job_queue='my-job-queue',
    container_overrides={
        'environment': [
            {'name': 'INPUT_PATH', 'value': 's3://my-bucket/input/'},
            {'name': 'OUTPUT_PATH', 'value': 's3://my-bucket/output/'}
        ],
        'memory': 2048,
        'vcpus': 1
    },
    parameters={
        'inputFile': 'data.csv',
        'outputFormat': 'parquet'
    },
    tags={'Environment': 'prod', 'Team': 'data-engineering'},
    awslogs_enabled=True,
    aws_conn_id='aws_default'
)

Array Job Processing

# Process multiple files in parallel using array jobs
array_batch_job = BatchOperator(
    task_id='process_files_array',
    job_name='file-processing-array',
    job_definition='file-processor-def',
    job_queue='high-priority-queue',
    array_properties={'size': 10},  # Process 10 files in parallel
    container_overrides={
        'environment': [
            {'name': 'BATCH_SIZE', 'value': '100'},
            {'name': 'S3_BUCKET', 'value': 'my-data-bucket'}
        ]
    },
    scheduling_priority_override=100,
    aws_conn_id='aws_default'
)

Compute Environment Setup

from airflow.providers.amazon.aws.operators.batch import BatchCreateComputeEnvironmentOperator

# Create a managed compute environment
create_compute_env = BatchCreateComputeEnvironmentOperator(
    task_id='create_batch_compute_env',
    compute_environment_name='my-compute-env',
    environment_type='MANAGED',
    state='ENABLED',
    compute_resources={
        'type': 'EC2',
        'minvCpus': 0,
        'maxvCpus': 256,
        'desiredvCpus': 4,
        'instanceTypes': ['m5.large', 'm5.xlarge'],
        'subnets': ['subnet-12345', 'subnet-67890'],
        'securityGroupIds': ['sg-abcdef'],
        'instanceRole': 'arn:aws:iam::123456789012:instance-profile/ecsInstanceRole',
        'tags': {'Environment': 'prod'}
    },
    service_role='arn:aws:iam::123456789012:role/AWSBatchServiceRole',
    aws_conn_id='aws_default'
)

Import Statements

from airflow.providers.amazon.aws.operators.batch import (
    BatchOperator,
    BatchCreateComputeEnvironmentOperator,
    BatchCreateJobQueueOperator
)
from airflow.providers.amazon.aws.sensors.batch import BatchSensor
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-amazon

docs

athena-analytics.md

authentication.md

batch-processing.md

data-transfers.md

dms-migration.md

dynamodb-nosql.md

ecs-containers.md

eks-kubernetes.md

emr-clusters.md

glue-processing.md

index.md

lambda-functions.md

messaging-sns-sqs.md

rds-databases.md

redshift-warehouse.md

s3-storage.md

sagemaker-ml.md

tile.json