CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-winrm

Apache Airflow provider package for Windows Remote Management (WinRM) protocol integration enabling remote command execution on Windows systems

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

winrm-operator.mddocs/

WinRM Operator

Airflow operator for executing Windows commands and PowerShell scripts on remote systems through WinRM connections. Integrates seamlessly into Airflow DAGs with templating support, return code validation, and comprehensive error handling.

Types

from collections.abc import Sequence
from airflow.utils.context import Context
from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook
from airflow.providers.microsoft.winrm.version_compat import BaseOperator

Capabilities

Operator Initialization

Create WinRM operator instances for Airflow task definitions with flexible configuration and templating support.

class WinRMOperator(BaseOperator):
    template_fields: Sequence[str] = ("command",)
    template_fields_renderers = {"command": "powershell"}
    
    def __init__(
        self,
        *,
        winrm_hook: WinRMHook | None = None,
        ssh_conn_id: str | None = None,
        remote_host: str | None = None,
        command: str | None = None,
        ps_path: str | None = None,
        output_encoding: str = "utf-8",
        timeout: int = 10,
        expected_return_code: int | list[int] | range = 0,
        **kwargs,
    ) -> None:
        """
        Initialize WinRM operator for Airflow task execution.

        Parameters:
        - winrm_hook: Pre-configured WinRMHook instance
        - ssh_conn_id: Airflow connection ID for WinRM settings
        - remote_host: Remote Windows host (overrides connection setting)
        - command: Command or script to execute (supports Airflow templating)
        - ps_path: PowerShell executable path ('powershell', 'pwsh', or full path)
        - output_encoding: Output encoding for stdout/stderr decoding
        - timeout: Command execution timeout in seconds
        - expected_return_code: Expected return code(s) for success validation
        - **kwargs: Additional BaseOperator parameters (task_id, dag, etc.)
        """

Template Field Support:

The command parameter supports Airflow templating with Jinja2. The operator defines:

  • template_fields: Sequence[str] = ("command",)
  • template_fields_renderers = {"command": "powershell"} (enables PowerShell syntax highlighting in UI)
# Using templating with execution date
WinRMOperator(
    task_id='daily_backup',
    command='robocopy C:\\Data D:\\Backup\\{{ ds }} /E /R:3 /W:10',
    ssh_conn_id='winrm_default'
)

# Using templating with custom variables
WinRMOperator(
    task_id='process_files',
    command='powershell -Command "Get-ChildItem {{ params.source_dir }} | Where-Object LastWriteTime -gt (Get-Date).AddDays(-{{ params.days }})"',
    params={'source_dir': 'C:\\ProcessingQueue', 'days': 1},
    ps_path='powershell',
    ssh_conn_id='winrm_default'
)

# Using XCom values in templates
WinRMOperator(
    task_id='process_xcom_data',
    command='echo "Processing: {{ ti.xcom_pull(task_ids="extract_data") }}" | Out-File C:\\temp\\result.txt',
    ps_path='powershell',
    ssh_conn_id='winrm_default'
)

Task Execution

Execute WinRM commands within Airflow task context with comprehensive error handling and output management.

def execute(self, context: Context) -> list | str:
    """
    Execute WinRM command within Airflow task context.
    
    Handles hook creation, command execution, return code validation,
    and XCom result processing based on Airflow configuration.

    Parameters:
    - context: Airflow task execution context

    Returns:
        Command output for XCom (if do_xcom_push=True):
        - list[bytes]: Raw stdout buffer (if enable_xcom_pickling=True)  
        - str: Base64-encoded stdout (if enable_xcom_pickling=False)

    Raises:
        AirflowException: Hook creation failures, missing command, execution errors, 
                         unexpected return codes
    """

Usage Patterns

Basic Command Execution

Simple Windows commands for system administration and file operations.

from airflow import DAG
from airflow.providers.microsoft.winrm.operators.winrm import WinRMOperator
from datetime import datetime, timedelta

dag = DAG(
    'windows_admin_tasks',
    start_date=datetime(2024, 1, 1),
    schedule_interval=timedelta(hours=1),
    catchup=False
)

# Check disk space
check_disk = WinRMOperator(
    task_id='check_disk_space',
    command='wmic logicaldisk get size,freespace,caption',
    ssh_conn_id='winrm_default',
    dag=dag
)

# List running services
list_services = WinRMOperator(
    task_id='list_services',
    command='sc query state= running',
    expected_return_code=0,
    ssh_conn_id='winrm_default',
    dag=dag
)

PowerShell Script Execution

Complex PowerShell operations for system management and data processing.

# System information gathering
system_info = WinRMOperator(
    task_id='gather_system_info',
    command='''
    $info = @{
        'ComputerName' = $env:COMPUTERNAME
        'OS' = (Get-WmiObject Win32_OperatingSystem).Caption
        'Memory' = [math]::Round((Get-WmiObject Win32_ComputerSystem).TotalPhysicalMemory / 1GB, 2)
        'Uptime' = (Get-WmiObject Win32_OperatingSystem).LastBootUpTime
    }
    $info | ConvertTo-Json
    ''',
    ps_path='powershell',
    ssh_conn_id='winrm_default',
    dag=dag
)

# File processing with error handling
process_files = WinRMOperator(
    task_id='process_csv_files',
    command='''
    try {
        $files = Get-ChildItem "C:\\Data\\*.csv" | Where-Object LastWriteTime -gt (Get-Date).AddHours(-1)
        foreach ($file in $files) {
            Write-Host "Processing: $($file.FullName)"
            # Add your processing logic here
            Move-Item $file.FullName "C:\\Processed\\"
        }
        Write-Host "Processed $($files.Count) files successfully"
    } catch {
        Write-Error "Error processing files: $_"
        exit 1
    }
    ''',
    ps_path='powershell',
    expected_return_code=0,
    ssh_conn_id='winrm_default',
    dag=dag
)

Advanced Configuration

Complex scenarios with custom hooks, multiple return codes, and error handling.

from airflow.providers.microsoft.winrm.hooks.winrm import WinRMHook

# Custom hook with specific authentication
custom_hook = WinRMHook(
    remote_host='192.168.1.100',
    username='service_account',
    password='{{ var.value.winrm_password }}',
    transport='ntlm',
    operation_timeout_sec=300
)

# Database backup with custom hook
backup_task = WinRMOperator(
    task_id='backup_database',
    winrm_hook=custom_hook,
    command='sqlcmd -S localhost -E -Q "BACKUP DATABASE MyDB TO DISK = \'C:\\Backups\\MyDB_{{ ds }}.bak\'"',
    expected_return_code=[0, 1],  # Allow return codes 0 or 1
    timeout=600,  # 10 minute timeout
    dag=dag
)

# Batch file execution with multiple valid return codes
batch_execution = WinRMOperator(
    task_id='run_batch_process',
    command='C:\\Scripts\\data_processing.bat "{{ ds }}"',
    expected_return_code=range(0, 3),  # Allow return codes 0, 1, 2
    remote_host='processing-server.local',  # Override connection host
    ssh_conn_id='winrm_default',
    dag=dag
)

Task Dependencies and XCom

Chaining WinRM tasks with data passing through XCom.

# Task that generates data for XCom
generate_data = WinRMOperator(
    task_id='generate_report_data',
    command='''
    $data = Get-Process | Sort-Object CPU -Descending | Select-Object -First 5 Name,CPU
    $data | ConvertTo-Json -Compress
    ''',
    ps_path='powershell',
    do_xcom_push=True,
    ssh_conn_id='winrm_default',
    dag=dag
)

# Task that uses XCom data (in templated command)
process_data = WinRMOperator(
    task_id='process_report_data',
    command='echo "{{ ti.xcom_pull(task_ids="generate_report_data") }}" > C:\\Reports\\process_report_{{ ds }}.json',
    ssh_conn_id='winrm_default',
    dag=dag
)

generate_data >> process_data

Return Code Handling

The operator validates command execution success based on return codes:

Single Return Code

WinRMOperator(
    task_id='standard_command',
    command='dir C:\\',
    expected_return_code=0,  # Only 0 is success
    ssh_conn_id='winrm_default'
)

Multiple Return Codes

WinRMOperator(
    task_id='robocopy_command',
    command='robocopy C:\\Source D:\\Dest /E',
    expected_return_code=[0, 1, 2, 4],  # Robocopy success codes
    ssh_conn_id='winrm_default'
)

Return Code Ranges

WinRMOperator(
    task_id='flexible_command',
    command='custom_app.exe --process-data',
    expected_return_code=range(0, 10),  # 0-9 are acceptable
    ssh_conn_id='winrm_default'
)

Output and XCom Handling

The operator handles command output based on Airflow configuration and automatically configures logging to suppress WinRM connection warnings by setting urllib3 logger level to ERROR.

XCom Push Enabled (do_xcom_push=True)

With Pickling Enabled (enable_xcom_pickling=True):

  • Returns: list[bytes] - Raw stdout byte chunks from hook's run() method
  • Use case: Binary output, custom deserialization, large datasets
  • Configuration: Set in airflow.cfg under [core] section

With Pickling Disabled (enable_xcom_pickling=False):

  • Returns: str - Base64-encoded stdout using base64.b64encode()
  • Use case: Text output, JSON data, standard processing
  • Encoding: Uses UTF-8 encoding before base64 conversion

XCom Push Disabled (do_xcom_push=False)

  • Returns: None
  • Use case: Large outputs, commands with side effects only
  • Performance: More efficient for commands that don't need output capture

Logging Configuration

The operator automatically configures logging to prevent WinRM connection header parsing warnings:

# Automatically applied in execute() method  
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)

Error Conditions

The WinRMOperator raises AirflowException for:

  • Missing Hook: Neither winrm_hook nor ssh_conn_id provided
  • Missing Command: No command specified for execution
  • Connection Failures: Unable to establish WinRM connection
  • Authentication Failures: Invalid credentials or authentication method
  • Command Execution Failures: Command execution errors, timeouts
  • Return Code Validation: Command returned unexpected return code

Error Message Example:

Error running cmd: Get-Process invalid_process, return code: 1, 
error: Get-Process : Cannot find a process with the name "invalid_process"

Best Practices

Security

  • Use encrypted transports (SSL, Kerberos, NTLM) for production
  • Store credentials in Airflow connections, not in code
  • Use service accounts with minimal required permissions
  • Enable server certificate validation for SSL connections

Performance

  • Set appropriate timeouts for long-running operations
  • Use return_output=False for commands with large output
  • Consider PowerShell for complex operations vs. multiple commands

Reliability

  • Specify expected return codes for all command types
  • Implement proper error handling in PowerShell scripts
  • Use templating for dynamic command generation
  • Test commands interactively before DAG deployment

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-winrm

docs

index.md

version-compatibility.md

winrm-hook.md

winrm-operator.md

tile.json