Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure Batch integration for creating and managing compute pools, jobs, and tasks in Azure Batch service. Azure Batch enables running large-scale parallel and high-performance computing (HPC) applications efficiently in the cloud.
Core hook for connecting to and managing Azure Batch resources including pools, jobs, and tasks.
class AzureBatchHook(BaseHook):
"""
Hook for Azure Batch APIs.
Provides methods for creating and managing Batch pools, jobs, and tasks
with support for various VM configurations and scaling options.
"""
def get_conn(self) -> BatchServiceClient: ...
def configure_pool(
self,
pool_id: str,
vm_size: str,
display_name: str | None = None,
target_dedicated_nodes: int | None = None,
target_low_priority_nodes: int | None = None,
enable_auto_scale: bool = False,
auto_scale_formula: str | None = None,
**kwargs: Any,
) -> PoolAddParameter: ...
def create_pool(self, pool: PoolAddParameter) -> None: ...
def wait_for_all_node_state(self, pool_id: str, node_state: set) -> list: ...
def configure_job(
self,
job_id: str,
pool_id: str,
display_name: str | None = None,
job_manager_task: JobManagerTask | None = None,
job_preparation_task: JobPreparationTask | None = None,
job_release_task: JobReleaseTask | None = None,
) -> JobAddParameter: ...
def create_job(self, job: JobAddParameter) -> None: ...
def configure_task(
self,
task_id: str,
command_line: str,
display_name: str | None = None,
container_settings: TaskContainerSettings | None = None,
resource_files: list[ResourceFile] | None = None,
output_files: list[OutputFile] | None = None,
user_identity: UserIdentity | None = None,
) -> TaskAddParameter: ...
def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...
def wait_for_job_tasks_to_complete(self, job_id: str, timeout: int) -> list[CloudTask]: ...Operators for executing jobs on Azure Batch service with comprehensive pool and task configuration options.
class AzureBatchOperator(BaseOperator):
"""
Executes a job on Azure Batch Service.
Parameters:
- batch_pool_id: Pool identifier within the Account
- batch_pool_vm_size: Size of virtual machines in the Pool
- batch_job_id: Job identifier within the Account
- batch_task_command_line: Command line for the Task
- batch_task_id: Task identifier within the Job
- target_dedicated_nodes: Desired number of dedicated compute nodes
- target_low_priority_nodes: Desired number of low-priority compute nodes
- enable_auto_scale: Whether Pool should auto-adjust size
- auto_scale_formula: Formula for desired compute nodes count
- timeout: Time to wait for job completion in minutes
- should_delete_job: Whether to delete job after execution
- should_delete_pool: Whether to delete pool after execution
"""
def __init__(
self,
*,
batch_pool_id: str,
batch_pool_vm_size: str,
batch_job_id: str,
batch_task_command_line: str,
batch_task_id: str,
azure_batch_conn_id: str = "azure_batch_default",
target_dedicated_nodes: int = 1,
target_low_priority_nodes: int = 0,
enable_auto_scale: bool = False,
timeout: int = 25,
should_delete_job: bool = False,
should_delete_pool: bool = False,
**kwargs,
): ...
def execute(self, context: Context) -> None: ...from airflow import DAG
from airflow.providers.microsoft.azure.operators.batch import AzureBatchOperator
from datetime import datetime, timedelta
dag = DAG(
'azure_batch_example',
default_args={'owner': 'data-team'},
description='Execute batch job on Azure Batch',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Execute a simple batch job
batch_task = AzureBatchOperator(
task_id='run_batch_computation',
batch_pool_id='compute-pool-001',
batch_pool_vm_size='Standard_D2_v2',
batch_job_id='data-processing-job',
batch_task_command_line='python process_data.py --input /mnt/data/input.csv --output /mnt/data/output.csv',
batch_task_id='process-task-001',
target_dedicated_nodes=2,
azure_batch_conn_id='azure_batch_connection',
timeout=60, # 1 hour timeout
should_delete_job=True,
dag=dag
)# Batch job with auto-scaling pool configuration
autoscale_batch = AzureBatchOperator(
task_id='autoscale_batch_job',
batch_pool_id='autoscale-pool',
batch_pool_vm_size='Standard_F4s_v2',
batch_job_id='ml-training-job',
batch_task_command_line='python train_model.py --epochs 100 --batch-size 32',
batch_task_id='training-task',
enable_auto_scale=True,
auto_scale_formula='''
startingNumberOfVMs = 1;
maxNumberofVMs = 10;
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
$TargetDedicatedNodes = min(maxNumberofVMs, pendingTaskSamples);
''',
timeout=180, # 3 hour timeout for ML training
should_delete_job=True,
should_delete_pool=True,
dag=dag
)# Batch job using Docker containers
container_batch = AzureBatchOperator(
task_id='containerized_batch_job',
batch_pool_id='container-pool',
batch_pool_vm_size='Standard_D4_v3',
batch_job_id='container-processing-job',
batch_task_command_line='python /app/analyze.py',
batch_task_id='analysis-container-task',
batch_task_container_settings={
'image_name': 'myregistry.azurecr.io/data-processor:latest',
'container_run_options': '--rm -v /mnt/data:/app/data',
'registry': {
'registry_server': 'myregistry.azurecr.io',
'user_name': 'registry_user',
'password': 'registry_password'
}
},
batch_task_resource_files=[
{
'http_url': 'https://mystorageaccount.blob.core.windows.net/data/input.json',
'file_path': 'input.json'
}
],
batch_task_output_files=[
{
'file_pattern': 'output/*.json',
'destination': {
'container': {
'container_url': 'https://mystorageaccount.blob.core.windows.net/results',
'path': 'processed/'
}
},
'upload_options': {
'upload_condition': 'task_completion'
}
}
],
target_dedicated_nodes=3,
dag=dag
)Azure Batch supports multiple authentication methods:
Connection configuration requires the Batch account URL and appropriate authentication credentials.
Azure Batch supports various VM configurations:
# Core Batch model types
class PoolAddParameter:
"""Configuration for creating a new Batch pool."""
id: str
vm_size: str
target_dedicated_nodes: int | None = None
target_low_priority_nodes: int | None = None
enable_auto_scale: bool = False
class JobAddParameter:
"""Configuration for creating a new Batch job."""
id: str
pool_info: PoolInformation
job_manager_task: JobManagerTask | None = None
job_preparation_task: JobPreparationTask | None = None
class TaskAddParameter:
"""Configuration for creating a new Batch task."""
id: str
command_line: str
container_settings: TaskContainerSettings | None = None
resource_files: list[ResourceFile] | None = None
output_files: list[OutputFile] | None = None
class CloudTask:
"""Represents a completed Batch task."""
id: str
state: str
execution_info: TaskExecutionInformationInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure