Apache Airflow provider package that provides comprehensive AWS service integrations for orchestrating cloud workflows and data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Amazon Batch enables efficient execution of containerized jobs and workloads at scale. This service automatically provisions the optimal quantity and type of compute resources based on workload requirements, supporting both managed and self-managed compute environments.
Execute containerized batch jobs with comprehensive lifecycle management, monitoring, and logging integration.
class BatchOperator(AwsBaseOperator):
"""
Execute a job on AWS Batch.
Parameters:
- job_name: str - the name for the job that will run on AWS Batch
- job_definition: str - the job definition name on AWS Batch
- job_queue: str - the queue name on AWS Batch
- container_overrides: dict - the containerOverrides parameter for boto3
- ecs_properties_override: dict - the ecsPropertiesOverride parameter for boto3
- eks_properties_override: dict - the eksPropertiesOverride parameter for boto3
- node_overrides: dict - the nodeOverrides parameter for boto3
- share_identifier: str - the share identifier for the job
- scheduling_priority_override: int - the scheduling priority for the job
- array_properties: dict - the arrayProperties parameter for boto3
- parameters: dict - the parameters for boto3
- job_id: str - the job ID (usually None until submit_job)
- max_retries: int - exponential back-off retries (default: 4200)
- status_retries: int - HTTP retries to get job status (default: 10)
- tags: dict - collection of tags to apply to the job submission
- deferrable: bool - run operator in deferrable mode
- awslogs_enabled: bool - whether CloudWatch logs should be printed
- awslogs_fetch_interval: int - interval for fetching CloudWatch logs (default: 30)
- poll_interval: int - time in seconds between polling (deferrable mode)
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Job ID from AWS Batch
"""
def __init__(
self,
job_name: str,
job_definition: str,
job_queue: str,
container_overrides: dict = None,
ecs_properties_override: dict = None,
eks_properties_override: dict = None,
node_overrides: dict = None,
share_identifier: str = None,
scheduling_priority_override: int = None,
array_properties: dict = None,
parameters: dict = None,
job_id: str = None,
max_retries: int = 4200,
status_retries: int = 10,
tags: dict = None,
deferrable: bool = False,
awslogs_enabled: bool = False,
awslogs_fetch_interval: int = 30,
poll_interval: int = 30,
**kwargs
): ...Monitor and wait for AWS Batch job completion with configurable polling and retry logic.
class BatchSensor(BaseSensorOperator):
"""
Wait for AWS Batch job to complete.
Parameters:
- job_id: str - the job ID to monitor
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- poke_interval: int - time in seconds between checks
- timeout: int - maximum time to wait
- mode: str - sensor mode ('poke' or 'reschedule')
Returns:
bool: True when job completes successfully
"""
def __init__(
self,
job_id: str,
aws_conn_id: str = 'aws_default',
region_name: str = None,
**kwargs
): ...Create and manage AWS Batch compute environments with support for both managed and unmanaged configurations.
class BatchCreateComputeEnvironmentOperator(AwsBaseOperator):
"""
Create an AWS Batch compute environment.
Parameters:
- compute_environment_name: str - name of the compute environment
- environment_type: str - type of environment ('MANAGED' or 'UNMANAGED')
- state: str - desired state of the compute environment
- compute_resources: dict - compute resource configuration
- service_role: str - service role ARN for Batch
- tags: dict - tags to apply to the compute environment
- deferrable: bool - run operator in deferrable mode
- poll_interval: int - polling interval for deferrable mode
- max_retries: int - maximum number of retries
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Compute environment ARN
"""
def __init__(
self,
compute_environment_name: str,
environment_type: str,
state: str = 'ENABLED',
compute_resources: dict = None,
service_role: str = None,
tags: dict = None,
deferrable: bool = False,
poll_interval: int = 30,
max_retries: int = 10,
**kwargs
): ...Manage AWS Batch job queues with priority settings and compute environment associations.
class BatchCreateJobQueueOperator(AwsBaseOperator):
"""
Create an AWS Batch job queue.
Parameters:
- job_queue_name: str - name of the job queue
- state: str - desired state of the job queue
- priority: int - priority of the job queue
- compute_environment_order: list - list of compute environments
- tags: dict - tags to apply to the job queue
- aws_conn_id: str - Airflow connection for AWS credentials
Returns:
str: Job queue ARN
"""
def __init__(
self,
job_queue_name: str,
state: str = 'ENABLED',
priority: int = 1,
compute_environment_order: list = None,
tags: dict = None,
**kwargs
): ...Direct access to AWS Batch client for advanced operations and custom integrations.
class BatchClientHook(AwsBaseHook):
"""
Hook for AWS Batch Client.
Parameters:
- aws_conn_id: str - Airflow connection for AWS credentials
- region_name: str - AWS region name
- verify: bool - whether to verify SSL certificates
"""
def __init__(
self,
aws_conn_id: str = 'aws_default',
region_name: str = None,
verify: bool = None,
**kwargs
): ...
def submit_job(
self,
jobName: str,
jobQueue: str,
jobDefinition: str,
arrayProperties: dict = None,
parameters: dict = None,
containerOverrides: dict = None,
dependsOn: list = None,
jobId: str = None,
timeout: dict = None,
tags: dict = None,
**kwargs
) -> str:
"""
Submit a job to AWS Batch.
Returns:
str: Job ID
"""
...
def describe_jobs(self, jobs: list[str]) -> dict:
"""Get job descriptions from AWS Batch."""
...
def list_jobs(
self,
jobQueue: str = None,
arrayJobId: str = None,
multiNodeJobId: str = None,
jobStatus: str = None,
maxResults: int = None,
nextToken: str = None
) -> dict:
"""List AWS Batch jobs."""
...
def cancel_job(self, jobId: str, reason: str) -> dict:
"""Cancel an AWS Batch job."""
...
def terminate_job(self, jobId: str, reason: str) -> dict:
"""Terminate an AWS Batch job."""
...from airflow.providers.amazon.aws.operators.batch import BatchOperator
# Submit a simple batch job
batch_job = BatchOperator(
task_id='run_batch_job',
job_name='my-data-processing-job',
job_definition='my-job-def',
job_queue='my-job-queue',
container_overrides={
'environment': [
{'name': 'INPUT_PATH', 'value': 's3://my-bucket/input/'},
{'name': 'OUTPUT_PATH', 'value': 's3://my-bucket/output/'}
],
'memory': 2048,
'vcpus': 1
},
parameters={
'inputFile': 'data.csv',
'outputFormat': 'parquet'
},
tags={'Environment': 'prod', 'Team': 'data-engineering'},
awslogs_enabled=True,
aws_conn_id='aws_default'
)# Process multiple files in parallel using array jobs
array_batch_job = BatchOperator(
task_id='process_files_array',
job_name='file-processing-array',
job_definition='file-processor-def',
job_queue='high-priority-queue',
array_properties={'size': 10}, # Process 10 files in parallel
container_overrides={
'environment': [
{'name': 'BATCH_SIZE', 'value': '100'},
{'name': 'S3_BUCKET', 'value': 'my-data-bucket'}
]
},
scheduling_priority_override=100,
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.operators.batch import BatchCreateComputeEnvironmentOperator
# Create a managed compute environment
create_compute_env = BatchCreateComputeEnvironmentOperator(
task_id='create_batch_compute_env',
compute_environment_name='my-compute-env',
environment_type='MANAGED',
state='ENABLED',
compute_resources={
'type': 'EC2',
'minvCpus': 0,
'maxvCpus': 256,
'desiredvCpus': 4,
'instanceTypes': ['m5.large', 'm5.xlarge'],
'subnets': ['subnet-12345', 'subnet-67890'],
'securityGroupIds': ['sg-abcdef'],
'instanceRole': 'arn:aws:iam::123456789012:instance-profile/ecsInstanceRole',
'tags': {'Environment': 'prod'}
},
service_role='arn:aws:iam::123456789012:role/AWSBatchServiceRole',
aws_conn_id='aws_default'
)from airflow.providers.amazon.aws.operators.batch import (
BatchOperator,
BatchCreateComputeEnvironmentOperator,
BatchCreateJobQueueOperator
)
from airflow.providers.amazon.aws.sensors.batch import BatchSensor
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHookInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-amazon