CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

microsoft-graph.mddocs/

Microsoft Graph API

Access Microsoft Graph API for Microsoft 365 services integration with comprehensive support for Graph API endpoints, operations, and asynchronous processing capabilities for Microsoft cloud services.

Capabilities

Microsoft Graph Request Adapter Hook

Primary interface for Microsoft Graph API operations using Kiota request adapter for modern Graph API access patterns.

class KiotaRequestAdapterHook(BaseHook):
    """
    Hook for Microsoft Graph API using Kiota request adapter.
    
    Provides methods for accessing Microsoft Graph API endpoints with
    modern authentication and request handling capabilities.
    """
    
    def get_conn(self) -> RequestAdapter:
        """
        Get authenticated Graph API request adapter.
        
        Returns:
            RequestAdapter: Kiota request adapter for Graph API calls
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Microsoft Graph API connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """
    
    def request_information(
        self,
        url: str,
        method: str = "GET",
        headers: dict[str, str] | None = None,
        query_parameters: dict[str, Any] | None = None,
        content: bytes | None = None
    ) -> RequestInformation:
        """
        Create request information for Graph API calls.
        
        Args:
            url (str): Graph API endpoint URL
            method (str): HTTP method (default: "GET")
            headers (dict[str, str] | None): Additional headers
            query_parameters (dict[str, Any] | None): Query parameters
            content (bytes | None): Request body content
            
        Returns:
            RequestInformation: Configured request information object
        """
    
    def get_api_version(self) -> str:
        """
        Get the Microsoft Graph API version being used.
        
        Returns:
            str: Graph API version (e.g., "v1.0", "beta")
        """
    
    def get_base_url(self) -> str:
        """
        Get the base URL for Microsoft Graph API.
        
        Returns:
            str: Base URL for Graph API endpoints
        """
    
    def send_request(
        self,
        request_info: RequestInformation,
        response_handler: ResponseHandler | None = None
    ) -> Any:
        """
        Send a request to Microsoft Graph API.
        
        Args:
            request_info (RequestInformation): Request configuration
            response_handler (ResponseHandler | None): Custom response handler
            
        Returns:
            Any: Response from Graph API
        """
    
    def batch_request(
        self,
        requests: list[RequestInformation],
        max_batch_size: int = 20
    ) -> list[Any]:
        """
        Send multiple requests in batches to Graph API.
        
        Args:
            requests (list[RequestInformation]): List of request configurations
            max_batch_size (int): Maximum requests per batch (default: 20)
            
        Returns:
            list[Any]: List of responses from Graph API
        """
    
    def get_user(
        self,
        user_id: str,
        select_properties: list[str] | None = None
    ) -> dict[str, Any]:
        """
        Get user information from Microsoft Graph.
        
        Args:
            user_id (str): User ID or principal name
            select_properties (list[str] | None): Properties to select
            
        Returns:
            dict[str, Any]: User information
        """
    
    def list_users(
        self,
        filter_expression: str | None = None,
        select_properties: list[str] | None = None,
        top: int = 100
    ) -> list[dict[str, Any]]:
        """
        List users from Microsoft Graph.
        
        Args:
            filter_expression (str | None): OData filter expression
            select_properties (list[str] | None): Properties to select
            top (int): Maximum number of results (default: 100)
            
        Returns:
            list[dict[str, Any]]: List of user information
        """
    
    def get_group(
        self,
        group_id: str,
        select_properties: list[str] | None = None
    ) -> dict[str, Any]:
        """
        Get group information from Microsoft Graph.
        
        Args:
            group_id (str): Group ID
            select_properties (list[str] | None): Properties to select
            
        Returns:
            dict[str, Any]: Group information
        """
    
    def list_groups(
        self,
        filter_expression: str | None = None,
        select_properties: list[str] | None = None,
        top: int = 100
    ) -> list[dict[str, Any]]:
        """
        List groups from Microsoft Graph.
        
        Args:
            filter_expression (str | None): OData filter expression
            select_properties (list[str] | None): Properties to select
            top (int): Maximum number of results (default: 100)
            
        Returns:
            list[dict[str, Any]]: List of group information
        """
    
    def send_email(
        self,
        user_id: str,
        subject: str,
        body: str,
        to_recipients: list[str],
        cc_recipients: list[str] | None = None,
        bcc_recipients: list[str] | None = None,
        attachments: list[dict[str, Any]] | None = None
    ) -> dict[str, Any]:
        """
        Send email through Microsoft Graph.
        
        Args:
            user_id (str): Sender user ID
            subject (str): Email subject
            body (str): Email body content
            to_recipients (list[str]): To recipients email addresses
            cc_recipients (list[str] | None): CC recipients email addresses
            bcc_recipients (list[str] | None): BCC recipients email addresses
            attachments (list[dict[str, Any]] | None): Email attachments
            
        Returns:
            dict[str, Any]: Email send response
        """
    
    def create_calendar_event(
        self,
        user_id: str,
        subject: str,
        start_time: datetime,
        end_time: datetime,
        attendees: list[str] | None = None,
        body: str | None = None,
        location: str | None = None
    ) -> dict[str, Any]:
        """
        Create calendar event through Microsoft Graph.
        
        Args:
            user_id (str): User ID to create event for
            subject (str): Event subject
            start_time (datetime): Event start time
            end_time (datetime): Event end time
            attendees (list[str] | None): Attendee email addresses
            body (str | None): Event description
            location (str | None): Event location
            
        Returns:
            dict[str, Any]: Created event information
        """
    
    def upload_file_to_onedrive(
        self,
        user_id: str,
        file_path: str,
        content: bytes,
        conflict_behavior: str = "rename"
    ) -> dict[str, Any]:
        """
        Upload file to user's OneDrive through Microsoft Graph.
        
        Args:
            user_id (str): User ID
            file_path (str): Path where to store the file in OneDrive
            content (bytes): File content
            conflict_behavior (str): Conflict resolution behavior (default: "rename")
            
        Returns:
            dict[str, Any]: Upload response with file information
        """
    
    def get_sharepoint_site(
        self,
        site_id: str,
        select_properties: list[str] | None = None
    ) -> dict[str, Any]:
        """
        Get SharePoint site information.
        
        Args:
            site_id (str): SharePoint site ID
            select_properties (list[str] | None): Properties to select
            
        Returns:
            dict[str, Any]: SharePoint site information
        """
    
    def list_sharepoint_lists(
        self,
        site_id: str,
        select_properties: list[str] | None = None
    ) -> list[dict[str, Any]]:
        """
        List SharePoint lists in a site.
        
        Args:
            site_id (str): SharePoint site ID
            select_properties (list[str] | None): Properties to select
            
        Returns:
            list[dict[str, Any]]: List of SharePoint lists
        """
    
    def create_sharepoint_list_item(
        self,
        site_id: str,
        list_id: str,
        item_data: dict[str, Any]
    ) -> dict[str, Any]:
        """
        Create item in SharePoint list.
        
        Args:
            site_id (str): SharePoint site ID
            list_id (str): SharePoint list ID
            item_data (dict[str, Any]): Item data to create
            
        Returns:
            dict[str, Any]: Created item information
        """
    
    def get_teams_channels(
        self,
        team_id: str,
        select_properties: list[str] | None = None
    ) -> list[dict[str, Any]]:
        """
        Get channels in a Microsoft Teams team.
        
        Args:
            team_id (str): Teams team ID
            select_properties (list[str] | None): Properties to select
            
        Returns:
            list[dict[str, Any]]: List of team channels
        """
    
    def send_teams_message(
        self,
        team_id: str,
        channel_id: str,
        message: str,
        message_type: str = "message"
    ) -> dict[str, Any]:
        """
        Send message to Microsoft Teams channel.
        
        Args:
            team_id (str): Teams team ID
            channel_id (str): Channel ID
            message (str): Message content
            message_type (str): Message type (default: "message")
            
        Returns:
            dict[str, Any]: Message send response
        """

Response Handler

Supporting class for handling Microsoft Graph API responses with proper serialization and error handling.

class DefaultResponseHandler(ResponseHandler):
    """
    Default response handler for Microsoft Graph API.
    
    Provides standard response handling, error processing,
    and data serialization for Graph API calls.
    """
    
    def handle_response_async(
        self,
        response: Any,
        error_map: dict[str, type] | None = None
    ) -> Any:
        """
        Handle asynchronous response from Graph API.
        
        Args:
            response (Any): HTTP response from Graph API
            error_map (dict[str, type] | None): Error mapping configuration
            
        Returns:
            Any: Processed response data
        """
    
    def handle_error_response(
        self,
        response: Any
    ) -> Exception:
        """
        Handle error responses from Graph API.
        
        Args:
            response (Any): Error response from Graph API
            
        Returns:
            Exception: Appropriate exception for the error
        """
    
    def serialize_response(
        self,
        response_data: Any
    ) -> dict[str, Any]:
        """
        Serialize response data from Graph API.
        
        Args:
            response_data (Any): Raw response data
            
        Returns:
            dict[str, Any]: Serialized response data
        """

Microsoft Graph Operators

Execute Microsoft Graph API operations as Airflow tasks with comprehensive Microsoft 365 service integration.

Asynchronous Graph API Operator

class MSGraphAsyncOperator(BaseOperator):
    """
    Executes Microsoft Graph API operations asynchronously.
    
    Supports various Graph API operations with deferrable execution
    for long-running Microsoft 365 operations.
    """
    
    def __init__(
        self,
        *,
        conn_id: str = "msgraph_default",
        url: str,
        method: str = "GET",
        query_parameters: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        data: dict[str, Any] | str | None = None,
        response_filter: str | None = None,
        response_type: type | None = None,
        **kwargs
    ):
        """
        Initialize Microsoft Graph async operator.
        
        Args:
            conn_id (str): Airflow connection ID for Microsoft Graph
            url (str): Graph API endpoint URL
            method (str): HTTP method (default: "GET")
            query_parameters (dict[str, Any] | None): Query parameters
            headers (dict[str, str] | None): Additional headers
            data (dict[str, Any] | str | None): Request body data
            response_filter (str | None): Response filtering expression
            response_type (type | None): Expected response type
        """
    
    def execute(self, context: Context) -> Any:
        """
        Execute Microsoft Graph API operation.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            Any: Response from Graph API operation
        """
    
    def execute_defer(self, context: Context) -> None:
        """
        Execute operation in deferrable mode for long-running operations.
        
        Args:
            context (Context): Airflow task context
        """

Microsoft Graph Sensors

Monitor Microsoft 365 resources and wait for specific conditions using Microsoft Graph API.

Graph API Sensor

class MSGraphSensor(BaseSensorOperator):
    """
    Monitors Microsoft Graph API resources.
    
    Provides sensor capabilities for waiting on Microsoft 365 resource
    states and conditions through Graph API polling.
    """
    
    def __init__(
        self,
        *,
        conn_id: str = "msgraph_default",
        url: str,
        method: str = "GET",
        query_parameters: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        response_filter: str | None = None,
        success_condition: callable | None = None,
        **kwargs
    ):
        """
        Initialize Microsoft Graph sensor.
        
        Args:
            conn_id (str): Airflow connection ID for Microsoft Graph
            url (str): Graph API endpoint URL to monitor
            method (str): HTTP method (default: "GET")
            query_parameters (dict[str, Any] | None): Query parameters
            headers (dict[str, str] | None): Additional headers
            response_filter (str | None): Response filtering expression
            success_condition (callable | None): Function to evaluate success condition
        """
    
    def poke(self, context: Context) -> bool:
        """
        Poke Microsoft Graph API resource for condition.
        
        Args:
            context (Context): Airflow task context
            
        Returns:
            bool: True if condition is met, False otherwise
        """

Microsoft Graph Triggers

Enable asynchronous monitoring and event-driven operations for Microsoft 365 services.

Graph API Trigger

class MSGraphTrigger(BaseTrigger):
    """
    Async trigger for Microsoft Graph API operations.
    
    Provides asynchronous monitoring and event handling for
    Microsoft 365 resources through Graph API.
    """
    
    def __init__(
        self,
        conn_id: str,
        url: str,
        method: str = "GET",
        query_parameters: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        response_filter: str | None = None,
        timeout: int = 3600,
        check_interval: int = 60
    ):
        """
        Initialize Microsoft Graph trigger.
        
        Args:
            conn_id (str): Airflow connection ID for Microsoft Graph
            url (str): Graph API endpoint URL
            method (str): HTTP method (default: "GET")
            query_parameters (dict[str, Any] | None): Query parameters
            headers (dict[str, str] | None): Additional headers
            response_filter (str | None): Response filtering expression
            timeout (int): Timeout in seconds (default: 3600)
            check_interval (int): Check interval in seconds (default: 60)
        """
    
    def run(self) -> AsyncIterator[TriggerEvent]:
        """
        Run asynchronous monitoring of Graph API resource.
        
        Yields:
            TriggerEvent: Events when conditions are met or timeout occurs
        """
    
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serialize trigger configuration for persistence.
        
        Returns:
            tuple[str, dict[str, Any]]: Serialized trigger data
        """

Response Serializer

Supporting class for serializing Microsoft Graph API responses in trigger operations.

class ResponseSerializer:
    """
    Serializer for Graph API responses.
    
    Provides serialization capabilities for Graph API response data
    in trigger and asynchronous operations.
    """
    
    @staticmethod
    def serialize_response(response: Any) -> dict[str, Any]:
        """
        Serialize Graph API response for storage or transmission.
        
        Args:
            response (Any): Graph API response object
            
        Returns:
            dict[str, Any]: Serialized response data
        """
    
    @staticmethod
    def deserialize_response(data: dict[str, Any]) -> Any:
        """
        Deserialize Graph API response from stored data.
        
        Args:
            data (dict[str, Any]): Serialized response data
            
        Returns:
            Any: Deserialized response object
        """

Usage Examples

Basic Microsoft Graph Operations

from airflow import DAG
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_user_data(**context):
    """Process user data retrieved from Microsoft Graph."""
    users_data = context['task_instance'].xcom_pull(task_ids='get_users')
    
    print(f"Retrieved {len(users_data.get('value', []))} users")
    
    for user in users_data.get('value', []):
        print(f"User: {user.get('displayName')} ({user.get('userPrincipalName')})")
    
    return len(users_data.get('value', []))

def send_notification_email():
    """Send notification email using Microsoft Graph."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    # Send email notification
    result = hook.send_email(
        user_id='admin@company.com',
        subject='Airflow Workflow Notification',
        body='Your daily data processing workflow has completed successfully.',
        to_recipients=['team@company.com', 'manager@company.com'],
        cc_recipients=['notifications@company.com']
    )
    
    print(f"Email sent successfully: {result}")
    return result

dag = DAG(
    'msgraph_basic_workflow',
    default_args={
        'owner': 'integration-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=3)
    },
    description='Basic Microsoft Graph API workflow',
    schedule_interval=timedelta(hours=12),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Get list of users
get_users = MSGraphAsyncOperator(
    task_id='get_users',
    conn_id='msgraph_conn',
    url='users',
    method='GET',
    query_parameters={
        '$select': 'id,displayName,userPrincipalName,mail',
        '$filter': "accountEnabled eq true",
        '$top': 50
    },
    dag=dag
)

# Process user data
process_users = PythonOperator(
    task_id='process_users',
    python_callable=process_user_data,
    dag=dag
)

# Send notification
send_notification = PythonOperator(
    task_id='send_notification',
    python_callable=send_notification_email,
    dag=dag
)

get_users >> process_users >> send_notification

Advanced Microsoft 365 Integration

from airflow import DAG
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json

def manage_groups_and_users():
    """Manage Microsoft 365 groups and user memberships."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    # Get all security groups
    groups = hook.list_groups(
        filter_expression="groupTypes/any(c:c eq 'Unified') or securityEnabled eq true",
        select_properties=['id', 'displayName', 'groupTypes', 'securityEnabled'],
        top=100
    )
    
    print(f"Found {len(groups)} groups")
    
    # Check membership for specific groups
    critical_groups = ['Data-Scientists', 'Security-Admins', 'Project-Managers']
    
    group_analysis = {}
    
    for group in groups:
        if group['displayName'] in critical_groups:
            # Get group members
            members_url = f"groups/{group['id']}/members"
            members_response = hook.send_request(
                hook.request_information(
                    url=members_url,
                    query_parameters={'$select': 'id,displayName,userPrincipalName'}
                )
            )
            
            group_analysis[group['displayName']] = {
                'id': group['id'],
                'member_count': len(members_response.get('value', [])),
                'members': members_response.get('value', [])
            }
    
    return group_analysis

def manage_calendar_events():
    """Manage calendar events for team coordination."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    # Create recurring team meeting
    team_meeting = hook.create_calendar_event(
        user_id='teamlead@company.com',
        subject='Weekly Data Pipeline Review',
        start_time=datetime.now().replace(hour=10, minute=0, second=0, microsecond=0),
        end_time=datetime.now().replace(hour=11, minute=0, second=0, microsecond=0),
        attendees=[
            'dataengineer1@company.com',
            'dataengineer2@company.com',
            'analyst@company.com'
        ],
        body='Weekly review of data pipeline status, issues, and improvements.',
        location='Conference Room A / Teams'
    )
    
    print(f"Team meeting created: {team_meeting}")
    
    # Create workflow completion notification event
    notification_event = hook.create_calendar_event(
        user_id='admin@company.com',
        subject='Data Processing Workflow Completed',
        start_time=datetime.now(),
        end_time=datetime.now() + timedelta(minutes=30),
        body='Daily data processing workflow has completed. Check results in dashboard.'
    )
    
    return {
        'team_meeting': team_meeting,
        'notification_event': notification_event
    }

def sync_sharepoint_data():
    """Sync data with SharePoint lists."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    site_id = 'company.sharepoint.com,site-id,web-id'
    
    # Get SharePoint lists
    lists = hook.list_sharepoint_lists(
        site_id=site_id,
        select_properties=['id', 'displayName', 'list']
    )
    
    # Find the data tracking list
    data_list = None
    for sp_list in lists:
        if sp_list['displayName'] == 'Data Processing Status':
            data_list = sp_list
            break
    
    if data_list:
        # Create status entry
        status_item = hook.create_sharepoint_list_item(
            site_id=site_id,
            list_id=data_list['id'],
            item_data={
                'Title': f'Pipeline Run {datetime.now().strftime("%Y-%m-%d %H:%M")}',
                'Status': 'Completed',
                'ProcessedRecords': 150000,
                'StartTime': datetime.now().isoformat(),
                'EndTime': (datetime.now() + timedelta(hours=2)).isoformat()
            }
        )
        
        print(f"Status item created: {status_item}")
        return status_item
    else:
        print("Data Processing Status list not found")
        return None

def send_teams_notifications():
    """Send notifications to Microsoft Teams channels."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    team_id = 'data-engineering-team-id'
    
    # Get team channels
    channels = hook.get_teams_channels(
        team_id=team_id,
        select_properties=['id', 'displayName']
    )
    
    # Find general channel
    general_channel = None
    for channel in channels:
        if channel['displayName'].lower() == 'general':
            general_channel = channel
            break
    
    if general_channel:
        # Send completion notification
        message_result = hook.send_teams_message(
            team_id=team_id,
            channel_id=general_channel['id'],
            message=f"""
            🎉 **Daily Data Pipeline Completed Successfully** 
            
            **Execution Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
            **Status**: ✅ Success
            **Records Processed**: 150,000
            **Duration**: 2 hours 15 minutes
            
            Check the dashboard for detailed results: [Dashboard Link](https://company-dashboard.com)
            """
        )
        
        print(f"Teams message sent: {message_result}")
        return message_result
    
    return None

dag = DAG(
    'msgraph_advanced_workflow',
    default_args={
        'owner': 'integration-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Advanced Microsoft 365 integration workflow',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Manage groups and users
manage_groups = PythonOperator(
    task_id='manage_groups',
    python_callable=manage_groups_and_users,
    dag=dag
)

# Manage calendar events
manage_calendar = PythonOperator(
    task_id='manage_calendar',
    python_callable=manage_calendar_events,
    dag=dag
)

# Sync SharePoint data
sync_sharepoint = PythonOperator(
    task_id='sync_sharepoint',
    python_callable=sync_sharepoint_data,
    dag=dag
)

# Send Teams notifications
notify_teams = PythonOperator(
    task_id='notify_teams',
    python_callable=send_teams_notifications,
    dag=dag
)

# Get Office 365 usage reports
get_reports = MSGraphAsyncOperator(
    task_id='get_usage_reports',
    conn_id='msgraph_conn',
    url='reports/getOffice365ActiveUserDetail(period=\'D7\')',
    method='GET',
    dag=dag
)

manage_groups >> [manage_calendar, sync_sharepoint] >> notify_teams >> get_reports

Microsoft Graph Sensor Example

from airflow import DAG
from airflow.providers.microsoft.azure.sensors.msgraph import MSGraphSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def check_file_processing_condition(response_data):
    """Check if file processing condition is met."""
    files = response_data.get('value', [])
    
    # Check if any new files have been uploaded in the last hour
    recent_files = []
    current_time = datetime.now()
    
    for file in files:
        file_modified = datetime.fromisoformat(file.get('lastModifiedDateTime', '').replace('Z', '+00:00'))
        if (current_time - file_modified).total_seconds() < 3600:  # 1 hour
            recent_files.append(file)
    
    print(f"Found {len(recent_files)} recent files")
    return len(recent_files) > 0

def process_detected_files(**context):
    """Process files that were detected by the sensor."""
    sensor_result = context['task_instance'].xcom_pull(task_ids='wait_for_new_files')
    
    print(f"Sensor detected new files: {sensor_result}")
    
    # Process the detected files
    # Implementation would include file processing logic
    
    return "Files processed successfully"

dag = DAG(
    'msgraph_sensor_workflow',
    default_args={
        'owner': 'monitoring-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=2)
    },
    description='Microsoft Graph sensor workflow',
    schedule_interval=timedelta(minutes=30),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Wait for new files in OneDrive
wait_for_files = MSGraphSensor(
    task_id='wait_for_new_files',
    conn_id='msgraph_conn',
    url='me/drive/root/children',
    method='GET',
    query_parameters={
        '$select': 'id,name,lastModifiedDateTime,size',
        '$filter': "folder eq null"  # Only files, not folders
    },
    success_condition=check_file_processing_condition,
    timeout=1800,  # 30 minutes timeout
    poke_interval=60,  # Check every minute
    dag=dag
)

# Process detected files
process_files = PythonOperator(
    task_id='process_files',
    python_callable=process_detected_files,
    dag=dag
)

wait_for_files >> process_files

Connection Configuration

Microsoft Graph Connection (msgraph)

Configure Microsoft Graph API connections in Airflow:

# Connection configuration for Microsoft Graph
{
    "conn_id": "msgraph_default",
    "conn_type": "msgraph",
    "host": "graph.microsoft.com",  # Graph API endpoint
    "extra": {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret",
        "api_version": "v1.0"  # or "beta" for preview features
    }
}

Authentication Methods

Microsoft Graph API supports multiple authentication methods:

  1. Application (Client Credentials) Authentication:

    extra = {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret",
        "auth_type": "client_credentials"
    }
  2. Delegated Authentication (Authorization Code):

    extra = {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret",
        "auth_type": "authorization_code",
        "scopes": ["https://graph.microsoft.com/.default"]
    }
  3. Certificate-based Authentication:

    extra = {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "certificate_path": "/path/to/certificate.pem",
        "certificate_thumbprint": "cert-thumbprint",
        "auth_type": "certificate"
    }
  4. Managed Identity Authentication:

    extra = {
        "managed_identity_client_id": "your-managed-identity-client-id",
        "auth_type": "managed_identity"
    }

Error Handling

Common Exception Patterns

from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from kiota_abstractions.api_error import APIError

def robust_graph_operations():
    """Demonstrate error handling patterns for Graph API operations."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    try:
        # Attempt to get user information
        user_info = hook.get_user('user@company.com')
        print(f"User found: {user_info}")
        
    except APIError as api_error:
        if api_error.response_status_code == 404:
            print("User not found")
            return None
        elif api_error.response_status_code == 403:
            print("Insufficient permissions to access user information")
            raise PermissionError("Insufficient Graph API permissions")
        elif api_error.response_status_code == 429:
            print("Rate limit exceeded, implementing retry logic")
            # Implement exponential backoff retry
            import time
            time.sleep(60)  # Wait 1 minute before retry
            return hook.get_user('user@company.com')
        else:
            print(f"API error: {api_error}")
            raise
            
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise
    
    try:
        # Batch request with error handling
        requests = [
            hook.request_information(url='users/user1@company.com'),
            hook.request_information(url='users/user2@company.com'),
            hook.request_information(url='users/nonexistent@company.com')  # This will fail
        ]
        
        responses = hook.batch_request(requests)
        
        for i, response in enumerate(responses):
            if isinstance(response, dict) and 'error' in response:
                print(f"Request {i} failed: {response['error']}")
            else:
                print(f"Request {i} succeeded: {response.get('displayName', 'Unknown')}")
                
    except Exception as e:
        print(f"Batch request error: {e}")
        # Handle batch errors appropriately

def implement_retry_logic():
    """Implement retry logic for Graph API operations."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    import time
    import random
    
    def retry_with_backoff(operation, max_retries=3, base_delay=1):
        """Retry operation with exponential backoff."""
        for attempt in range(max_retries):
            try:
                return operation()
            except APIError as e:
                if e.response_status_code == 429:  # Rate limit
                    retry_after = int(e.response_headers.get('Retry-After', base_delay * (2 ** attempt)))
                    jitter = random.uniform(0.1, 0.3) * retry_after
                    sleep_time = retry_after + jitter
                    
                    print(f"Rate limited, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
                    time.sleep(sleep_time)
                    
                    if attempt == max_retries - 1:
                        raise
                else:
                    raise
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                
                sleep_time = base_delay * (2 ** attempt) + random.uniform(0.1, 0.5)
                print(f"Operation failed, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
                time.sleep(sleep_time)
    
    # Use retry logic for operations
    user_data = retry_with_backoff(lambda: hook.get_user('user@company.com'))
    return user_data

Connection Testing

def test_graph_connection():
    """Test Microsoft Graph API connection and permissions."""
    try:
        hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
        
        # Test basic connection
        success, message = hook.test_connection()
        if not success:
            print(f"Connection test failed: {message}")
            return False
        
        print("Basic connection successful")
        
        # Test specific permissions
        permissions_tests = {
            'User.Read.All': lambda: hook.list_users(top=1),
            'Group.Read.All': lambda: hook.list_groups(top=1),
            'Mail.Send': lambda: hook.get_api_version(),  # Basic test for mail permissions
            'Sites.Read.All': lambda: hook.get_api_version()  # Basic test for SharePoint permissions
        }
        
        results = {}
        for permission, test_func in permissions_tests.items():
            try:
                test_func()
                results[permission] = True
                print(f"✓ {permission}: Available")
            except APIError as e:
                if e.response_status_code == 403:
                    results[permission] = False
                    print(f"✗ {permission}: Insufficient permissions")
                else:
                    results[permission] = False
                    print(f"✗ {permission}: Error - {e}")
            except Exception as e:
                results[permission] = False
                print(f"✗ {permission}: Unexpected error - {e}")
        
        return all(results.values())
        
    except Exception as e:
        print(f"Connection test failed with error: {e}")
        return False

Performance Considerations

Optimizing Graph API Operations

def optimize_graph_operations():
    """Demonstrate Graph API optimization techniques."""
    hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
    
    # Use $select to only retrieve needed properties
    efficient_user_query = hook.list_users(
        select_properties=['id', 'displayName', 'userPrincipalName'],  # Only get what you need
        top=100  # Limit results appropriately
    )
    
    # Use $filter to reduce data transfer
    filtered_users = hook.list_users(
        filter_expression="department eq 'Engineering'",
        select_properties=['id', 'displayName', 'mail'],
        top=50
    )
    
    # Use batch requests for multiple operations
    batch_requests = []
    user_ids = ['user1@company.com', 'user2@company.com', 'user3@company.com']
    
    for user_id in user_ids:
        batch_requests.append(
            hook.request_information(
                url=f'users/{user_id}',
                query_parameters={'$select': 'id,displayName,mail'}
            )
        )
    
    # Execute all requests in a single batch
    batch_results = hook.batch_request(batch_requests, max_batch_size=20)
    
    return {
        'efficient_query_count': len(efficient_user_query),
        'filtered_users_count': len(filtered_users),
        'batch_results_count': len(batch_results)
    }

def implement_caching_strategy():
    """Implement caching for frequently accessed Graph data."""
    from functools import lru_cache
    import time
    
    class CachedGraphHook:
        def __init__(self, conn_id):
            self.hook = KiotaRequestAdapterHook(conn_id=conn_id)
            self._cache_timestamp = {}
            self._cache_ttl = 3600  # 1 hour TTL
        
        def _is_cache_valid(self, key):
            """Check if cached data is still valid."""
            if key not in self._cache_timestamp:
                return False
            return (time.time() - self._cache_timestamp[key]) < self._cache_ttl
        
        @lru_cache(maxsize=100)
        def get_user_cached(self, user_id):
            """Get user with caching."""
            cache_key = f"user_{user_id}"
            if self._is_cache_valid(cache_key):
                # Return from LRU cache
                pass
            else:
                # Update cache timestamp
                self._cache_timestamp[cache_key] = time.time()
            
            return self.hook.get_user(user_id, select_properties=['id', 'displayName', 'mail'])
        
        def invalidate_user_cache(self, user_id):
            """Invalidate user cache."""
            cache_key = f"user_{user_id}"
            if cache_key in self._cache_timestamp:
                del self._cache_timestamp[cache_key]
            
            # Clear from LRU cache
            self.get_user_cached.cache_clear()
    
    return CachedGraphHook

This comprehensive documentation covers all Microsoft Graph API capabilities in the Apache Airflow Microsoft Azure Provider, including authentication methods, API operations, sensor monitoring, trigger-based operations, and performance optimization techniques for Microsoft 365 integration.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json