or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

azure-batch.mddocs/

0

# Azure Batch

1

2

Comprehensive Azure Batch integration for creating and managing compute pools, jobs, and tasks in Azure Batch service. Azure Batch enables running large-scale parallel and high-performance computing (HPC) applications efficiently in the cloud.

3

4

## Capabilities

5

6

### Azure Batch Hook

7

8

Core hook for connecting to and managing Azure Batch resources including pools, jobs, and tasks.

9

10

```python { .api }

11

class AzureBatchHook(BaseHook):

12

"""

13

Hook for Azure Batch APIs.

14

15

Provides methods for creating and managing Batch pools, jobs, and tasks

16

with support for various VM configurations and scaling options.

17

"""

18

19

def get_conn(self) -> BatchServiceClient: ...

20

21

def configure_pool(

22

self,

23

pool_id: str,

24

vm_size: str,

25

display_name: str | None = None,

26

target_dedicated_nodes: int | None = None,

27

target_low_priority_nodes: int | None = None,

28

enable_auto_scale: bool = False,

29

auto_scale_formula: str | None = None,

30

**kwargs: Any,

31

) -> PoolAddParameter: ...

32

33

def create_pool(self, pool: PoolAddParameter) -> None: ...

34

35

def wait_for_all_node_state(self, pool_id: str, node_state: set) -> list: ...

36

37

def configure_job(

38

self,

39

job_id: str,

40

pool_id: str,

41

display_name: str | None = None,

42

job_manager_task: JobManagerTask | None = None,

43

job_preparation_task: JobPreparationTask | None = None,

44

job_release_task: JobReleaseTask | None = None,

45

) -> JobAddParameter: ...

46

47

def create_job(self, job: JobAddParameter) -> None: ...

48

49

def configure_task(

50

self,

51

task_id: str,

52

command_line: str,

53

display_name: str | None = None,

54

container_settings: TaskContainerSettings | None = None,

55

resource_files: list[ResourceFile] | None = None,

56

output_files: list[OutputFile] | None = None,

57

user_identity: UserIdentity | None = None,

58

) -> TaskAddParameter: ...

59

60

def add_single_task_to_job(self, job_id: str, task: TaskAddParameter) -> None: ...

61

62

def wait_for_job_tasks_to_complete(self, job_id: str, timeout: int) -> list[CloudTask]: ...

63

```

64

65

### Batch Job Execution Operations

66

67

Operators for executing jobs on Azure Batch service with comprehensive pool and task configuration options.

68

69

```python { .api }

70

class AzureBatchOperator(BaseOperator):

71

"""

72

Executes a job on Azure Batch Service.

73

74

Parameters:

75

- batch_pool_id: Pool identifier within the Account

76

- batch_pool_vm_size: Size of virtual machines in the Pool

77

- batch_job_id: Job identifier within the Account

78

- batch_task_command_line: Command line for the Task

79

- batch_task_id: Task identifier within the Job

80

- target_dedicated_nodes: Desired number of dedicated compute nodes

81

- target_low_priority_nodes: Desired number of low-priority compute nodes

82

- enable_auto_scale: Whether Pool should auto-adjust size

83

- auto_scale_formula: Formula for desired compute nodes count

84

- timeout: Time to wait for job completion in minutes

85

- should_delete_job: Whether to delete job after execution

86

- should_delete_pool: Whether to delete pool after execution

87

"""

88

89

def __init__(

90

self,

91

*,

92

batch_pool_id: str,

93

batch_pool_vm_size: str,

94

batch_job_id: str,

95

batch_task_command_line: str,

96

batch_task_id: str,

97

azure_batch_conn_id: str = "azure_batch_default",

98

target_dedicated_nodes: int = 1,

99

target_low_priority_nodes: int = 0,

100

enable_auto_scale: bool = False,

101

timeout: int = 25,

102

should_delete_job: bool = False,

103

should_delete_pool: bool = False,

104

**kwargs,

105

): ...

106

107

def execute(self, context: Context) -> None: ...

108

```

109

110

## Usage Examples

111

112

### Basic Batch Job Execution

113

114

```python

115

from airflow import DAG

116

from airflow.providers.microsoft.azure.operators.batch import AzureBatchOperator

117

from datetime import datetime, timedelta

118

119

dag = DAG(

120

'azure_batch_example',

121

default_args={'owner': 'data-team'},

122

description='Execute batch job on Azure Batch',

123

schedule_interval=timedelta(days=1),

124

start_date=datetime(2024, 1, 1),

125

catchup=False

126

)

127

128

# Execute a simple batch job

129

batch_task = AzureBatchOperator(

130

task_id='run_batch_computation',

131

batch_pool_id='compute-pool-001',

132

batch_pool_vm_size='Standard_D2_v2',

133

batch_job_id='data-processing-job',

134

batch_task_command_line='python process_data.py --input /mnt/data/input.csv --output /mnt/data/output.csv',

135

batch_task_id='process-task-001',

136

target_dedicated_nodes=2,

137

azure_batch_conn_id='azure_batch_connection',

138

timeout=60, # 1 hour timeout

139

should_delete_job=True,

140

dag=dag

141

)

142

```

143

144

### Auto-scaling Batch Pool

145

146

```python

147

# Batch job with auto-scaling pool configuration

148

autoscale_batch = AzureBatchOperator(

149

task_id='autoscale_batch_job',

150

batch_pool_id='autoscale-pool',

151

batch_pool_vm_size='Standard_F4s_v2',

152

batch_job_id='ml-training-job',

153

batch_task_command_line='python train_model.py --epochs 100 --batch-size 32',

154

batch_task_id='training-task',

155

enable_auto_scale=True,

156

auto_scale_formula='''

157

startingNumberOfVMs = 1;

158

maxNumberofVMs = 10;

159

pendingTaskSamplePercent = $PendingTasks.GetSamplePercent(180 * TimeInterval_Second);

160

pendingTaskSamples = pendingTaskSamplePercent < 70 ? startingNumberOfVMs : avg($PendingTasks.GetSample(180 * TimeInterval_Second));

161

$TargetDedicatedNodes = min(maxNumberofVMs, pendingTaskSamples);

162

''',

163

timeout=180, # 3 hour timeout for ML training

164

should_delete_job=True,

165

should_delete_pool=True,

166

dag=dag

167

)

168

```

169

170

### Batch Job with Container Settings

171

172

```python

173

# Batch job using Docker containers

174

container_batch = AzureBatchOperator(

175

task_id='containerized_batch_job',

176

batch_pool_id='container-pool',

177

batch_pool_vm_size='Standard_D4_v3',

178

batch_job_id='container-processing-job',

179

batch_task_command_line='python /app/analyze.py',

180

batch_task_id='analysis-container-task',

181

batch_task_container_settings={

182

'image_name': 'myregistry.azurecr.io/data-processor:latest',

183

'container_run_options': '--rm -v /mnt/data:/app/data',

184

'registry': {

185

'registry_server': 'myregistry.azurecr.io',

186

'user_name': 'registry_user',

187

'password': 'registry_password'

188

}

189

},

190

batch_task_resource_files=[

191

{

192

'http_url': 'https://mystorageaccount.blob.core.windows.net/data/input.json',

193

'file_path': 'input.json'

194

}

195

],

196

batch_task_output_files=[

197

{

198

'file_pattern': 'output/*.json',

199

'destination': {

200

'container': {

201

'container_url': 'https://mystorageaccount.blob.core.windows.net/results',

202

'path': 'processed/'

203

}

204

},

205

'upload_options': {

206

'upload_condition': 'task_completion'

207

}

208

}

209

],

210

target_dedicated_nodes=3,

211

dag=dag

212

)

213

```

214

215

## Authentication and Connection

216

217

Azure Batch supports multiple authentication methods:

218

219

- **Account Key**: Batch account name and access key

220

- **Service Principal**: Using Azure AD application credentials

221

- **Managed Identity**: For Azure-hosted Airflow instances

222

- **DefaultAzureCredential**: Azure SDK default credential chain

223

224

Connection configuration requires the Batch account URL and appropriate authentication credentials.

225

226

## VM and Image Configuration

227

228

Azure Batch supports various VM configurations:

229

230

- **VM Sizes**: Standard compute, memory-optimized, compute-optimized instances

231

- **Images**: Windows, Linux, custom images from Azure Marketplace

232

- **Node Agent**: Compatibility layer between Batch service and OS

233

- **Auto-scaling**: Dynamic pool sizing based on workload demands

234

235

## Types

236

237

```python { .api }

238

# Core Batch model types

239

class PoolAddParameter:

240

"""Configuration for creating a new Batch pool."""

241

id: str

242

vm_size: str

243

target_dedicated_nodes: int | None = None

244

target_low_priority_nodes: int | None = None

245

enable_auto_scale: bool = False

246

247

class JobAddParameter:

248

"""Configuration for creating a new Batch job."""

249

id: str

250

pool_info: PoolInformation

251

job_manager_task: JobManagerTask | None = None

252

job_preparation_task: JobPreparationTask | None = None

253

254

class TaskAddParameter:

255

"""Configuration for creating a new Batch task."""

256

id: str

257

command_line: str

258

container_settings: TaskContainerSettings | None = None

259

resource_files: list[ResourceFile] | None = None

260

output_files: list[OutputFile] | None = None

261

262

class CloudTask:

263

"""Represents a completed Batch task."""

264

id: str

265

state: str

266

execution_info: TaskExecutionInformation

267

```