CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

azure-batch.mddocs/

Azure Batch

Comprehensive Azure Batch integration for creating and managing compute pools, jobs, and tasks in Azure Batch service. Azure Batch enables running large-scale parallel and high-performance computing (HPC) applications efficiently in the cloud.

Capabilities

Azure Batch Hook

Core hook for connecting to and managing Azure Batch resources including pools, jobs, and tasks.

class AzureBatchHook(BaseHook):
    """
    Hook for Azure Batch APIs.
    
    Provides methods for creating and managing Batch pools, jobs, and tasks
    with support for various VM configurations and scaling options.
    """
    
    def get_conn(self) -> BatchServiceClient: ...
    
    def configure_pool(
        self,
        pool_id: str,
        vm_size: str,
        display_name: str | None = None,
        target_dedicated_nodes: int | None = None,
        target_low_priority_nodes: int | None = None,
        enable_auto_scale: bool = False,
        auto_scale_formula: str | None = None,
        **kwargs: Any,
    ) -> PoolAddParameter: ...
    
    def create_pool(self, pool: PoolAddParameter) -> None: ...
    
    def wait_for_all_node_state(self, pool_id: str, node_state: set) -> list: ...
    
    def configure_job(
        self,
        job_id: str,
        pool_id: str,
        display_name: str | None = None,
        job_manager_task: JobManagerTask | None = None,
        job_preparation_task: JobPreparationTask | None = None,
        job_release_task: JobReleaseTask | None = None,
    ) -> JobAddParameter: ...
    
    def create_job(self, job: JobAddParameter) -> None: ...
    
    def configure_task(
        self,
        task_id: str,
        command_line: str,
        display_name: str | None = None,
        container_settings: TaskContainerSettings | None = None,
        resource_files: list[ResourceFile] | None = None,
        output_files: list[OutputFile] | None = None,
        user_identity: UserIdentity | None = None,
    ) -> TaskAddParameter: ...
    
    def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...
    
    def wait_for_job_tasks_to_complete(self, job_id: str, timeout: int) -> list[CloudTask]: ...

Batch Job Execution Operations

Operators for executing jobs on Azure Batch service with comprehensive pool and task configuration options.

class AzureBatchOperator(BaseOperator):
    """
    Executes a job on Azure Batch Service.
    
    Parameters:
    - batch_pool_id: Pool identifier within the Account
    - batch_pool_vm_size: Size of virtual machines in the Pool
    - batch_job_id: Job identifier within the Account  
    - batch_task_command_line: Command line for the Task
    - batch_task_id: Task identifier within the Job
    - target_dedicated_nodes: Desired number of dedicated compute nodes
    - target_low_priority_nodes: Desired number of low-priority compute nodes
    - enable_auto_scale: Whether Pool should auto-adjust size
    - auto_scale_formula: Formula for desired compute nodes count
    - timeout: Time to wait for job completion in minutes
    - should_delete_job: Whether to delete job after execution
    - should_delete_pool: Whether to delete pool after execution
    """
    
    def __init__(
        self,
        *,
        batch_pool_id: str,
        batch_pool_vm_size: str,
        batch_job_id: str,
        batch_task_command_line: str,
        batch_task_id: str,
        azure_batch_conn_id: str = "azure_batch_default",
        target_dedicated_nodes: int = 1,
        target_low_priority_nodes: int = 0,
        enable_auto_scale: bool = False,
        timeout: int = 25,
        should_delete_job: bool = False,
        should_delete_pool: bool = False,
        **kwargs,
    ): ...
    
    def execute(self, context: Context) -> None: ...

Usage Examples

Basic Batch Job Execution

from airflow import DAG
from airflow.providers.microsoft.azure.operators.batch import AzureBatchOperator
from datetime import datetime, timedelta

dag = DAG(
    'azure_batch_example',
    default_args={'owner': 'data-team'},
    description='Execute batch job on Azure Batch',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Execute a simple batch job
batch_task = AzureBatchOperator(
    task_id='run_batch_computation',
    batch_pool_id='compute-pool-001',
    batch_pool_vm_size='Standard_D2_v2',
    batch_job_id='data-processing-job',
    batch_task_command_line='python process_data.py --input /mnt/data/input.csv --output /mnt/data/output.csv',
    batch_task_id='process-task-001',
    target_dedicated_nodes=2,
    azure_batch_conn_id='azure_batch_connection',
    timeout=60,  # 1 hour timeout
    should_delete_job=True,
    dag=dag
)

Auto-scaling Batch Pool

# Batch job with auto-scaling pool configuration
autoscale_batch = AzureBatchOperator(
    task_id='autoscale_batch_job',
    batch_pool_id='autoscale-pool',
    batch_pool_vm_size='Standard_F4s_v2',
    batch_job_id='ml-training-job',
    batch_task_command_line='python train_model.py --epochs 100 --batch-size 32',
    batch_task_id='training-task',
    enable_auto_scale=True,
    auto_scale_formula='''
        startingNumberOfVMs = 1;
        maxNumberofVMs = 10;
        pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
        pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
        $TargetDedicatedNodes = min(maxNumberofVMs, pendingTaskSamples);
    ''',
    timeout=180,  # 3 hour timeout for ML training
    should_delete_job=True,
    should_delete_pool=True,
    dag=dag
)

Batch Job with Container Settings

# Batch job using Docker containers
container_batch = AzureBatchOperator(
    task_id='containerized_batch_job',
    batch_pool_id='container-pool',
    batch_pool_vm_size='Standard_D4_v3',
    batch_job_id='container-processing-job',
    batch_task_command_line='python /app/analyze.py',
    batch_task_id='analysis-container-task',
    batch_task_container_settings={
        'image_name': 'myregistry.azurecr.io/data-processor:latest',
        'container_run_options': '--rm -v /mnt/data:/app/data',
        'registry': {
            'registry_server': 'myregistry.azurecr.io',
            'user_name': 'registry_user',
            'password': 'registry_password'
        }
    },
    batch_task_resource_files=[
        {
            'http_url': 'https://mystorageaccount.blob.core.windows.net/data/input.json',
            'file_path': 'input.json'
        }
    ],
    batch_task_output_files=[
        {
            'file_pattern': 'output/*.json',
            'destination': {
                'container': {
                    'container_url': 'https://mystorageaccount.blob.core.windows.net/results',
                    'path': 'processed/'
                }
            },
            'upload_options': {
                'upload_condition': 'task_completion'
            }
        }
    ],
    target_dedicated_nodes=3,
    dag=dag
)

Authentication and Connection

Azure Batch supports multiple authentication methods:

  • Account Key: Batch account name and access key
  • Service Principal: Using Azure AD application credentials
  • Managed Identity: For Azure-hosted Airflow instances
  • DefaultAzureCredential: Azure SDK default credential chain

Connection configuration requires the Batch account URL and appropriate authentication credentials.

VM and Image Configuration

Azure Batch supports various VM configurations:

  • VM Sizes: Standard compute, memory-optimized, compute-optimized instances
  • Images: Windows, Linux, custom images from Azure Marketplace
  • Node Agent: Compatibility layer between Batch service and OS
  • Auto-scaling: Dynamic pool sizing based on workload demands

Types

# Core Batch model types
class PoolAddParameter:
    """Configuration for creating a new Batch pool."""
    id: str
    vm_size: str
    target_dedicated_nodes: int | None = None
    target_low_priority_nodes: int | None = None
    enable_auto_scale: bool = False

class JobAddParameter:
    """Configuration for creating a new Batch job."""
    id: str
    pool_info: PoolInformation
    job_manager_task: JobManagerTask | None = None
    job_preparation_task: JobPreparationTask | None = None

class TaskAddParameter:
    """Configuration for creating a new Batch task."""
    id: str
    command_line: str
    container_settings: TaskContainerSettings | None = None
    resource_files: list[ResourceFile] | None = None
    output_files: list[OutputFile] | None = None

class CloudTask:
    """Represents a completed Batch task."""
    id: str
    state: str
    execution_info: TaskExecutionInformation

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json