0
# WinRM Operator
1
2
Airflow operator for executing Windows commands and PowerShell scripts on remote systems through WinRM connections. Integrates seamlessly into Airflow DAGs with templating support, return code validation, and comprehensive error handling.
3
4
## Types
5
6
```python { .api }
7
from collections.abc import Sequence
8
from airflow.utils.context import Context
9
from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
10
from airflow.providers.microsoft.winrm.version_compat import BaseOperator
11
```
12
13
## Capabilities
14
15
### Operator Initialization
16
17
Create WinRM operator instances for Airflow task definitions with flexible configuration and templating support.
18
19
```python { .api }
20
class WinRMOperator(BaseOperator):
21
template_fields: Sequence[str] = ("command",)
22
template_fields_renderers = {"command": "powershell"}
23
24
def __init__(
25
self,
26
*,
27
winrm_hook: WinRMHook | None = None,
28
ssh_conn_id: str | None = None,
29
remote_host: str | None = None,
30
command: str | None = None,
31
ps_path: str | None = None,
32
output_encoding: str = "utf-8",
33
timeout: int = 10,
34
expected_return_code: int | list[int] | range = 0,
35
**kwargs,
36
) -> None:
37
"""
38
Initialize WinRM operator for Airflow task execution.
39
40
Parameters:
41
- winrm_hook: Pre-configured WinRMHook instance
42
- ssh_conn_id: Airflow connection ID for WinRM settings
43
- remote_host: Remote Windows host (overrides connection setting)
44
- command: Command or script to execute (supports Airflow templating)
45
- ps_path: PowerShell executable path ('powershell', 'pwsh', or full path)
46
- output_encoding: Output encoding for stdout/stderr decoding
47
- timeout: Command execution timeout in seconds
48
- expected_return_code: Expected return code(s) for success validation
49
- **kwargs: Additional BaseOperator parameters (task_id, dag, etc.)
50
"""
51
```
52
53
**Template Field Support:**
54
55
The `command` parameter supports Airflow templating with Jinja2. The operator defines:
56
- `template_fields: Sequence[str] = ("command",)`
57
- `template_fields_renderers = {"command": "powershell"}` (enables PowerShell syntax highlighting in UI)
58
59
```python
60
# Using templating with execution date
61
WinRMOperator(
62
task_id='daily_backup',
63
command='robocopy C:\\Data D:\\Backup\\{{ ds }} /E /R:3 /W:10',
64
ssh_conn_id='winrm_default'
65
)
66
67
# Using templating with custom variables
68
WinRMOperator(
69
task_id='process_files',
70
command='powershell -Command "Get-ChildItem {{ params.source_dir }} | Where-Object LastWriteTime -gt (Get-Date).AddDays(-{{ params.days }})"',
71
params={'source_dir': 'C:\\ProcessingQueue', 'days': 1},
72
ps_path='powershell',
73
ssh_conn_id='winrm_default'
74
)
75
76
# Using XCom values in templates
77
WinRMOperator(
78
task_id='process_xcom_data',
79
command='echo "Processing: {{ ti.xcom_pull(task_ids="extract_data") }}" | Out-File C:\\temp\\result.txt',
80
ps_path='powershell',
81
ssh_conn_id='winrm_default'
82
)
83
```
84
85
### Task Execution
86
87
Execute WinRM commands within Airflow task context with comprehensive error handling and output management.
88
89
```python { .api }
90
def execute(self, context: Context) -> list | str:
91
"""
92
Execute WinRM command within Airflow task context.
93
94
Handles hook creation, command execution, return code validation,
95
and XCom result processing based on Airflow configuration.
96
97
Parameters:
98
- context: Airflow task execution context
99
100
Returns:
101
Command output for XCom (if do_xcom_push=True):
102
- list[bytes]: Raw stdout buffer (if enable_xcom_pickling=True)
103
- str: Base64-encoded stdout (if enable_xcom_pickling=False)
104
105
Raises:
106
AirflowException: Hook creation failures, missing command, execution errors,
107
unexpected return codes
108
"""
109
```
110
111
## Usage Patterns
112
113
### Basic Command Execution
114
115
Simple Windows commands for system administration and file operations.
116
117
```python
118
from airflow import DAG
119
from airflow.providers.microsoft.winrm.operators.winrm import WinRMOperator
120
from datetime import datetime, timedelta
121
122
dag = DAG(
123
'windows_admin_tasks',
124
start_date=datetime(2024, 1, 1),
125
schedule_interval=timedelta(hours=1),
126
catchup=False
127
)
128
129
# Check disk space
130
check_disk = WinRMOperator(
131
task_id='check_disk_space',
132
command='wmic logicaldisk get size,freespace,caption',
133
ssh_conn_id='winrm_default',
134
dag=dag
135
)
136
137
# List running services
138
list_services = WinRMOperator(
139
task_id='list_services',
140
command='sc query state= running',
141
expected_return_code=0,
142
ssh_conn_id='winrm_default',
143
dag=dag
144
)
145
```
146
147
### PowerShell Script Execution
148
149
Complex PowerShell operations for system management and data processing.
150
151
```python
152
# System information gathering
153
system_info = WinRMOperator(
154
task_id='gather_system_info',
155
command='''
156
$info = @{
157
'ComputerName' = $env:COMPUTERNAME
158
'OS' = (Get-WmiObject Win32_OperatingSystem).Caption
159
'Memory' = [math]::Round((Get-WmiObject Win32_ComputerSystem).TotalPhysicalMemory / 1GB, 2)
160
'Uptime' = (Get-WmiObject Win32_OperatingSystem).LastBootUpTime
161
}
162
$info | ConvertTo-Json
163
''',
164
ps_path='powershell',
165
ssh_conn_id='winrm_default',
166
dag=dag
167
)
168
169
# File processing with error handling
170
process_files = WinRMOperator(
171
task_id='process_csv_files',
172
command='''
173
try {
174
$files = Get-ChildItem "C:\\Data\\*.csv" | Where-Object LastWriteTime -gt (Get-Date).AddHours(-1)
175
foreach ($file in $files) {
176
Write-Host "Processing: $($file.FullName)"
177
# Add your processing logic here
178
Move-Item $file.FullName "C:\\Processed\\"
179
}
180
Write-Host "Processed $($files.Count) files successfully"
181
} catch {
182
Write-Error "Error processing files: $_"
183
exit 1
184
}
185
''',
186
ps_path='powershell',
187
expected_return_code=0,
188
ssh_conn_id='winrm_default',
189
dag=dag
190
)
191
```
192
193
### Advanced Configuration
194
195
Complex scenarios with custom hooks, multiple return codes, and error handling.
196
197
```python
198
from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
199
200
# Custom hook with specific authentication
201
custom_hook = WinRMHook(
202
remote_host='192.168.1.100',
203
username='service_account',
204
password='{{ var.value.winrm_password }}',
205
transport='ntlm',
206
operation_timeout_sec=300
207
)
208
209
# Database backup with custom hook
210
backup_task = WinRMOperator(
211
task_id='backup_database',
212
winrm_hook=custom_hook,
213
command='sqlcmd -S localhost -E -Q "BACKUP DATABASE MyDB TO DISK = \'C:\\Backups\\MyDB_{{ ds }}.bak\'"',
214
expected_return_code=[0, 1], # Allow return codes 0 or 1
215
timeout=600, # 10 minute timeout
216
dag=dag
217
)
218
219
# Batch file execution with multiple valid return codes
220
batch_execution = WinRMOperator(
221
task_id='run_batch_process',
222
command='C:\\Scripts\\data_processing.bat "{{ ds }}"',
223
expected_return_code=range(0, 3), # Allow return codes 0, 1, 2
224
remote_host='processing-server.local', # Override connection host
225
ssh_conn_id='winrm_default',
226
dag=dag
227
)
228
```
229
230
### Task Dependencies and XCom
231
232
Chaining WinRM tasks with data passing through XCom.
233
234
```python
235
# Task that generates data for XCom
236
generate_data = WinRMOperator(
237
task_id='generate_report_data',
238
command='''
239
$data = Get-Process | Sort-Object CPU -Descending | Select-Object -First 5 Name,CPU
240
$data | ConvertTo-Json -Compress
241
''',
242
ps_path='powershell',
243
do_xcom_push=True,
244
ssh_conn_id='winrm_default',
245
dag=dag
246
)
247
248
# Task that uses XCom data (in templated command)
249
process_data = WinRMOperator(
250
task_id='process_report_data',
251
command='echo "{{ ti.xcom_pull(task_ids="generate_report_data") }}" > C:\\Reports\\process_report_{{ ds }}.json',
252
ssh_conn_id='winrm_default',
253
dag=dag
254
)
255
256
generate_data >> process_data
257
```
258
259
## Return Code Handling
260
261
The operator validates command execution success based on return codes:
262
263
### Single Return Code
264
```python
265
WinRMOperator(
266
task_id='standard_command',
267
command='dir C:\\',
268
expected_return_code=0, # Only 0 is success
269
ssh_conn_id='winrm_default'
270
)
271
```
272
273
### Multiple Return Codes
274
```python
275
WinRMOperator(
276
task_id='robocopy_command',
277
command='robocopy C:\\Source D:\\Dest /E',
278
expected_return_code=[0, 1, 2, 4], # Robocopy success codes
279
ssh_conn_id='winrm_default'
280
)
281
```
282
283
### Return Code Ranges
284
```python
285
WinRMOperator(
286
task_id='flexible_command',
287
command='custom_app.exe --process-data',
288
expected_return_code=range(0, 10), # 0-9 are acceptable
289
ssh_conn_id='winrm_default'
290
)
291
```
292
293
## Output and XCom Handling
294
295
The operator handles command output based on Airflow configuration and automatically configures logging to suppress WinRM connection warnings by setting `urllib3` logger level to `ERROR`.
296
297
### XCom Push Enabled (`do_xcom_push=True`)
298
299
**With Pickling Enabled** (`enable_xcom_pickling=True`):
300
- Returns: `list[bytes]` - Raw stdout byte chunks from hook's `run()` method
301
- Use case: Binary output, custom deserialization, large datasets
302
- Configuration: Set in `airflow.cfg` under `[core]` section
303
304
**With Pickling Disabled** (`enable_xcom_pickling=False`):
305
- Returns: `str` - Base64-encoded stdout using `base64.b64encode()`
306
- Use case: Text output, JSON data, standard processing
307
- Encoding: Uses UTF-8 encoding before base64 conversion
308
309
### XCom Push Disabled (`do_xcom_push=False`)
310
311
- Returns: None
312
- Use case: Large outputs, commands with side effects only
313
- Performance: More efficient for commands that don't need output capture
314
315
### Logging Configuration
316
317
The operator automatically configures logging to prevent WinRM connection header parsing warnings:
318
319
```python
320
# Automatically applied in execute() method
321
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
322
```
323
324
## Error Conditions
325
326
The WinRMOperator raises `AirflowException` for:
327
328
- **Missing Hook**: Neither `winrm_hook` nor `ssh_conn_id` provided
329
- **Missing Command**: No command specified for execution
330
- **Connection Failures**: Unable to establish WinRM connection
331
- **Authentication Failures**: Invalid credentials or authentication method
332
- **Command Execution Failures**: Command execution errors, timeouts
333
- **Return Code Validation**: Command returned unexpected return code
334
335
**Error Message Example:**
336
```
337
Error running cmd: Get-Process invalid_process, return code: 1,
338
error: Get-Process : Cannot find a process with the name "invalid_process"
339
```
340
341
## Best Practices
342
343
### Security
344
- Use encrypted transports (SSL, Kerberos, NTLM) for production
345
- Store credentials in Airflow connections, not in code
346
- Use service accounts with minimal required permissions
347
- Enable server certificate validation for SSL connections
348
349
### Performance
350
- Set appropriate timeouts for long-running operations
351
- Use `return_output=False` for commands with large output
352
- Consider PowerShell for complex operations vs. multiple commands
353
354
### Reliability
355
- Specify expected return codes for all command types
356
- Implement proper error handling in PowerShell scripts
357
- Use templating for dynamic command generation
358
- Test commands interactively before DAG deployment