0
# Azure Batch
1
2
Comprehensive Azure Batch integration for creating and managing compute pools, jobs, and tasks in Azure Batch service. Azure Batch enables running large-scale parallel and high-performance computing (HPC) applications efficiently in the cloud.
3
4
## Capabilities
5
6
### Azure Batch Hook
7
8
Core hook for connecting to and managing Azure Batch resources including pools, jobs, and tasks.
9
10
```python { .api }
11
class AzureBatchHook(BaseHook):
12
"""
13
Hook for Azure Batch APIs.
14
15
Provides methods for creating and managing Batch pools, jobs, and tasks
16
with support for various VM configurations and scaling options.
17
"""
18
19
def get_conn(self) -> BatchServiceClient: ...
20
21
def configure_pool(
22
self,
23
pool_id: str,
24
vm_size: str,
25
display_name: str | None = None,
26
target_dedicated_nodes: int | None = None,
27
target_low_priority_nodes: int | None = None,
28
enable_auto_scale: bool = False,
29
auto_scale_formula: str | None = None,
30
**kwargs: Any,
31
) -> PoolAddParameter: ...
32
33
def create_pool(self, pool: PoolAddParameter) -> None: ...
34
35
def wait_for_all_node_state(self, pool_id: str, node_state: set) -> list: ...
36
37
def configure_job(
38
self,
39
job_id: str,
40
pool_id: str,
41
display_name: str | None = None,
42
job_manager_task: JobManagerTask | None = None,
43
job_preparation_task: JobPreparationTask | None = None,
44
job_release_task: JobReleaseTask | None = None,
45
) -> JobAddParameter: ...
46
47
def create_job(self, job: JobAddParameter) -> None: ...
48
49
def configure_task(
50
self,
51
task_id: str,
52
command_line: str,
53
display_name: str | None = None,
54
container_settings: TaskContainerSettings | None = None,
55
resource_files: list[ResourceFile] | None = None,
56
output_files: list[OutputFile] | None = None,
57
user_identity: UserIdentity | None = None,
58
) -> TaskAddParameter: ...
59
60
def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...
61
62
def wait_for_job_tasks_to_complete(self, job_id: str, timeout: int) -> list[CloudTask]: ...
63
```
64
65
### Batch Job Execution Operations
66
67
Operators for executing jobs on Azure Batch service with comprehensive pool and task configuration options.
68
69
```python { .api }
70
class AzureBatchOperator(BaseOperator):
71
"""
72
Executes a job on Azure Batch Service.
73
74
Parameters:
75
- batch_pool_id: Pool identifier within the Account
76
- batch_pool_vm_size: Size of virtual machines in the Pool
77
- batch_job_id: Job identifier within the Account
78
- batch_task_command_line: Command line for the Task
79
- batch_task_id: Task identifier within the Job
80
- target_dedicated_nodes: Desired number of dedicated compute nodes
81
- target_low_priority_nodes: Desired number of low-priority compute nodes
82
- enable_auto_scale: Whether Pool should auto-adjust size
83
- auto_scale_formula: Formula for desired compute nodes count
84
- timeout: Time to wait for job completion in minutes
85
- should_delete_job: Whether to delete job after execution
86
- should_delete_pool: Whether to delete pool after execution
87
"""
88
89
def __init__(
90
self,
91
*,
92
batch_pool_id: str,
93
batch_pool_vm_size: str,
94
batch_job_id: str,
95
batch_task_command_line: str,
96
batch_task_id: str,
97
azure_batch_conn_id: str = "azure_batch_default",
98
target_dedicated_nodes: int = 1,
99
target_low_priority_nodes: int = 0,
100
enable_auto_scale: bool = False,
101
timeout: int = 25,
102
should_delete_job: bool = False,
103
should_delete_pool: bool = False,
104
**kwargs,
105
): ...
106
107
def execute(self, context: Context) -> None: ...
108
```
109
110
## Usage Examples
111
112
### Basic Batch Job Execution
113
114
```python
115
from airflow import DAG
116
from airflow.providers.microsoft.azure.operators.batch import AzureBatchOperator
117
from datetime import datetime, timedelta
118
119
dag = DAG(
120
'azure_batch_example',
121
default_args={'owner': 'data-team'},
122
description='Execute batch job on Azure Batch',
123
schedule_interval=timedelta(days=1),
124
start_date=datetime(2024, 1, 1),
125
catchup=False
126
)
127
128
# Execute a simple batch job
129
batch_task = AzureBatchOperator(
130
task_id='run_batch_computation',
131
batch_pool_id='compute-pool-001',
132
batch_pool_vm_size='Standard_D2_v2',
133
batch_job_id='data-processing-job',
134
batch_task_command_line='python process_data.py --input /mnt/data/input.csv --output /mnt/data/output.csv',
135
batch_task_id='process-task-001',
136
target_dedicated_nodes=2,
137
azure_batch_conn_id='azure_batch_connection',
138
timeout=60, # 1 hour timeout
139
should_delete_job=True,
140
dag=dag
141
)
142
```
143
144
### Auto-scaling Batch Pool
145
146
```python
147
# Batch job with auto-scaling pool configuration
148
autoscale_batch = AzureBatchOperator(
149
task_id='autoscale_batch_job',
150
batch_pool_id='autoscale-pool',
151
batch_pool_vm_size='Standard_F4s_v2',
152
batch_job_id='ml-training-job',
153
batch_task_command_line='python train_model.py --epochs 100 --batch-size 32',
154
batch_task_id='training-task',
155
enable_auto_scale=True,
156
auto_scale_formula='''
157
startingNumberOfVMs = 1;
158
maxNumberofVMs = 10;
159
pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);
160
pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));
161
$TargetDedicatedNodes = min(maxNumberofVMs, pendingTaskSamples);
162
''',
163
timeout=180, # 3 hour timeout for ML training
164
should_delete_job=True,
165
should_delete_pool=True,
166
dag=dag
167
)
168
```
169
170
### Batch Job with Container Settings
171
172
```python
173
# Batch job using Docker containers
174
container_batch = AzureBatchOperator(
175
task_id='containerized_batch_job',
176
batch_pool_id='container-pool',
177
batch_pool_vm_size='Standard_D4_v3',
178
batch_job_id='container-processing-job',
179
batch_task_command_line='python /app/analyze.py',
180
batch_task_id='analysis-container-task',
181
batch_task_container_settings={
182
'image_name': 'myregistry.azurecr.io/data-processor:latest',
183
'container_run_options': '--rm -v /mnt/data:/app/data',
184
'registry': {
185
'registry_server': 'myregistry.azurecr.io',
186
'user_name': 'registry_user',
187
'password': 'registry_password'
188
}
189
},
190
batch_task_resource_files=[
191
{
192
'http_url': 'https://mystorageaccount.blob.core.windows.net/data/input.json',
193
'file_path': 'input.json'
194
}
195
],
196
batch_task_output_files=[
197
{
198
'file_pattern': 'output/*.json',
199
'destination': {
200
'container': {
201
'container_url': 'https://mystorageaccount.blob.core.windows.net/results',
202
'path': 'processed/'
203
}
204
},
205
'upload_options': {
206
'upload_condition': 'task_completion'
207
}
208
}
209
],
210
target_dedicated_nodes=3,
211
dag=dag
212
)
213
```
214
215
## Authentication and Connection
216
217
Azure Batch supports multiple authentication methods:
218
219
- **Account Key**: Batch account name and access key
220
- **Service Principal**: Using Azure AD application credentials
221
- **Managed Identity**: For Azure-hosted Airflow instances
222
- **DefaultAzureCredential**: Azure SDK default credential chain
223
224
Connection configuration requires the Batch account URL and appropriate authentication credentials.
225
226
## VM and Image Configuration
227
228
Azure Batch supports various VM configurations:
229
230
- **VM Sizes**: Standard compute, memory-optimized, compute-optimized instances
231
- **Images**: Windows, Linux, custom images from Azure Marketplace
232
- **Node Agent**: Compatibility layer between Batch service and OS
233
- **Auto-scaling**: Dynamic pool sizing based on workload demands
234
235
## Types
236
237
```python { .api }
238
# Core Batch model types
239
class PoolAddParameter:
240
"""Configuration for creating a new Batch pool."""
241
id: str
242
vm_size: str
243
target_dedicated_nodes: int | None = None
244
target_low_priority_nodes: int | None = None
245
enable_auto_scale: bool = False
246
247
class JobAddParameter:
248
"""Configuration for creating a new Batch job."""
249
id: str
250
pool_info: PoolInformation
251
job_manager_task: JobManagerTask | None = None
252
job_preparation_task: JobPreparationTask | None = None
253
254
class TaskAddParameter:
255
"""Configuration for creating a new Batch task."""
256
id: str
257
command_line: str
258
container_settings: TaskContainerSettings | None = None
259
resource_files: list[ResourceFile] | None = None
260
output_files: list[OutputFile] | None = None
261
262
class CloudTask:
263
"""Represents a completed Batch task."""
264
id: str
265
state: str
266
execution_info: TaskExecutionInformation
267
```