Apache Airflow provider package for Windows Remote Management (WinRM) protocol integration enabling remote command execution on Windows systems
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Airflow operator for executing Windows commands and PowerShell scripts on remote systems through WinRM connections. Integrates seamlessly into Airflow DAGs with templating support, return code validation, and comprehensive error handling.
from collections.abc import Sequence
from airflow.utils.context import Context
from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
from airflow.providers.microsoft.winrm.version_compat import BaseOperatorCreate WinRM operator instances for Airflow task definitions with flexible configuration and templating support.
class WinRMOperator(BaseOperator):
template_fields: Sequence[str] = ("command",)
template_fields_renderers = {"command": "powershell"}
def __init__(
self,
*,
winrm_hook: WinRMHook | None = None,
ssh_conn_id: str | None = None,
remote_host: str | None = None,
command: str | None = None,
ps_path: str | None = None,
output_encoding: str = "utf-8",
timeout: int = 10,
expected_return_code: int | list[int] | range = 0,
**kwargs,
) -> None:
"""
Initialize WinRM operator for Airflow task execution.
Parameters:
- winrm_hook: Pre-configured WinRMHook instance
- ssh_conn_id: Airflow connection ID for WinRM settings
- remote_host: Remote Windows host (overrides connection setting)
- command: Command or script to execute (supports Airflow templating)
- ps_path: PowerShell executable path ('powershell', 'pwsh', or full path)
- output_encoding: Output encoding for stdout/stderr decoding
- timeout: Command execution timeout in seconds
- expected_return_code: Expected return code(s) for success validation
- **kwargs: Additional BaseOperator parameters (task_id, dag, etc.)
"""Template Field Support:
The command parameter supports Airflow templating with Jinja2. The operator defines:
template_fields: Sequence[str] = ("command",)template_fields_renderers = {"command": "powershell"} (enables PowerShell syntax highlighting in UI)# Using templating with execution date
WinRMOperator(
task_id='daily_backup',
command='robocopy C:\\Data D:\\Backup\\{{ ds }} /E /R:3 /W:10',
ssh_conn_id='winrm_default'
)
# Using templating with custom variables
WinRMOperator(
task_id='process_files',
command='powershell -Command "Get-ChildItem {{ params.source_dir }} | Where-Object LastWriteTime -gt (Get-Date).AddDays(-{{ params.days }})"',
params={'source_dir': 'C:\\ProcessingQueue', 'days': 1},
ps_path='powershell',
ssh_conn_id='winrm_default'
)
# Using XCom values in templates
WinRMOperator(
task_id='process_xcom_data',
command='echo "Processing: {{ ti.xcom_pull(task_ids="extract_data") }}" | Out-File C:\\temp\\result.txt',
ps_path='powershell',
ssh_conn_id='winrm_default'
)Execute WinRM commands within Airflow task context with comprehensive error handling and output management.
def execute(self, context: Context) -> list | str:
"""
Execute WinRM command within Airflow task context.
Handles hook creation, command execution, return code validation,
and XCom result processing based on Airflow configuration.
Parameters:
- context: Airflow task execution context
Returns:
Command output for XCom (if do_xcom_push=True):
- list[bytes]: Raw stdout buffer (if enable_xcom_pickling=True)
- str: Base64-encoded stdout (if enable_xcom_pickling=False)
Raises:
AirflowException: Hook creation failures, missing command, execution errors,
unexpected return codes
"""Simple Windows commands for system administration and file operations.
from airflow import DAG
from airflow.providers.microsoft.winrm.operators.winrm import WinRMOperator
from datetime import datetime, timedelta
dag = DAG(
'windows_admin_tasks',
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(hours=1),
catchup=False
)
# Check disk space
check_disk = WinRMOperator(
task_id='check_disk_space',
command='wmic logicaldisk get size,freespace,caption',
ssh_conn_id='winrm_default',
dag=dag
)
# List running services
list_services = WinRMOperator(
task_id='list_services',
command='sc query state= running',
expected_return_code=0,
ssh_conn_id='winrm_default',
dag=dag
)Complex PowerShell operations for system management and data processing.
# System information gathering
system_info = WinRMOperator(
task_id='gather_system_info',
command='''
$info = @{
'ComputerName' = $env:COMPUTERNAME
'OS' = (Get-WmiObject Win32_OperatingSystem).Caption
'Memory' = [math]::Round((Get-WmiObject Win32_ComputerSystem).TotalPhysicalMemory / 1GB, 2)
'Uptime' = (Get-WmiObject Win32_OperatingSystem).LastBootUpTime
}
$info | ConvertTo-Json
''',
ps_path='powershell',
ssh_conn_id='winrm_default',
dag=dag
)
# File processing with error handling
process_files = WinRMOperator(
task_id='process_csv_files',
command='''
try {
$files = Get-ChildItem "C:\\Data\\*.csv" | Where-Object LastWriteTime -gt (Get-Date).AddHours(-1)
foreach ($file in $files) {
Write-Host "Processing: $($file.FullName)"
# Add your processing logic here
Move-Item $file.FullName "C:\\Processed\\"
}
Write-Host "Processed $($files.Count) files successfully"
} catch {
Write-Error "Error processing files: $_"
exit 1
}
''',
ps_path='powershell',
expected_return_code=0,
ssh_conn_id='winrm_default',
dag=dag
)Complex scenarios with custom hooks, multiple return codes, and error handling.
from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
# Custom hook with specific authentication
custom_hook = WinRMHook(
remote_host='192.168.1.100',
username='service_account',
password='{{ var.value.winrm_password }}',
transport='ntlm',
operation_timeout_sec=300
)
# Database backup with custom hook
backup_task = WinRMOperator(
task_id='backup_database',
winrm_hook=custom_hook,
command='sqlcmd -S localhost -E -Q "BACKUP DATABASE MyDB TO DISK = \'C:\\Backups\\MyDB_{{ ds }}.bak\'"',
expected_return_code=[0, 1], # Allow return codes 0 or 1
timeout=600, # 10 minute timeout
dag=dag
)
# Batch file execution with multiple valid return codes
batch_execution = WinRMOperator(
task_id='run_batch_process',
command='C:\\Scripts\\data_processing.bat "{{ ds }}"',
expected_return_code=range(0, 3), # Allow return codes 0, 1, 2
remote_host='processing-server.local', # Override connection host
ssh_conn_id='winrm_default',
dag=dag
)Chaining WinRM tasks with data passing through XCom.
# Task that generates data for XCom
generate_data = WinRMOperator(
task_id='generate_report_data',
command='''
$data = Get-Process | Sort-Object CPU -Descending | Select-Object -First 5 Name,CPU
$data | ConvertTo-Json -Compress
''',
ps_path='powershell',
do_xcom_push=True,
ssh_conn_id='winrm_default',
dag=dag
)
# Task that uses XCom data (in templated command)
process_data = WinRMOperator(
task_id='process_report_data',
command='echo "{{ ti.xcom_pull(task_ids="generate_report_data") }}" > C:\\Reports\\process_report_{{ ds }}.json',
ssh_conn_id='winrm_default',
dag=dag
)
generate_data >> process_dataThe operator validates command execution success based on return codes:
WinRMOperator(
task_id='standard_command',
command='dir C:\\',
expected_return_code=0, # Only 0 is success
ssh_conn_id='winrm_default'
)WinRMOperator(
task_id='robocopy_command',
command='robocopy C:\\Source D:\\Dest /E',
expected_return_code=[0, 1, 2, 4], # Robocopy success codes
ssh_conn_id='winrm_default'
)WinRMOperator(
task_id='flexible_command',
command='custom_app.exe --process-data',
expected_return_code=range(0, 10), # 0-9 are acceptable
ssh_conn_id='winrm_default'
)The operator handles command output based on Airflow configuration and automatically configures logging to suppress WinRM connection warnings by setting urllib3 logger level to ERROR.
do_xcom_push=True)With Pickling Enabled (enable_xcom_pickling=True):
list[bytes] - Raw stdout byte chunks from hook's run() methodairflow.cfg under [core] sectionWith Pickling Disabled (enable_xcom_pickling=False):
str - Base64-encoded stdout using base64.b64encode()do_xcom_push=False)The operator automatically configures logging to prevent WinRM connection header parsing warnings:
# Automatically applied in execute() method
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)The WinRMOperator raises AirflowException for:
winrm_hook nor ssh_conn_id providedError Message Example:
Error running cmd: Get-Process invalid_process, return code: 1,
error: Get-Process : Cannot find a process with the name "invalid_process"return_output=False for commands with large outputInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-winrm