Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Access Microsoft Graph API for Microsoft 365 services integration with comprehensive support for Graph API endpoints, operations, and asynchronous processing capabilities for Microsoft cloud services.
Primary interface for Microsoft Graph API operations using Kiota request adapter for modern Graph API access patterns.
class KiotaRequestAdapterHook(BaseHook):
"""
Hook for Microsoft Graph API using Kiota request adapter.
Provides methods for accessing Microsoft Graph API endpoints with
modern authentication and request handling capabilities.
"""
def get_conn(self) -> RequestAdapter:
"""
Get authenticated Graph API request adapter.
Returns:
RequestAdapter: Kiota request adapter for Graph API calls
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Microsoft Graph API connection.
Returns:
tuple[bool, str]: Success status and message
"""
def request_information(
self,
url: str,
method: str = "GET",
headers: dict[str, str] | None = None,
query_parameters: dict[str, Any] | None = None,
content: bytes | None = None
) -> RequestInformation:
"""
Create request information for Graph API calls.
Args:
url (str): Graph API endpoint URL
method (str): HTTP method (default: "GET")
headers (dict[str, str] | None): Additional headers
query_parameters (dict[str, Any] | None): Query parameters
content (bytes | None): Request body content
Returns:
RequestInformation: Configured request information object
"""
def get_api_version(self) -> str:
"""
Get the Microsoft Graph API version being used.
Returns:
str: Graph API version (e.g., "v1.0", "beta")
"""
def get_base_url(self) -> str:
"""
Get the base URL for Microsoft Graph API.
Returns:
str: Base URL for Graph API endpoints
"""
def send_request(
self,
request_info: RequestInformation,
response_handler: ResponseHandler | None = None
) -> Any:
"""
Send a request to Microsoft Graph API.
Args:
request_info (RequestInformation): Request configuration
response_handler (ResponseHandler | None): Custom response handler
Returns:
Any: Response from Graph API
"""
def batch_request(
self,
requests: list[RequestInformation],
max_batch_size: int = 20
) -> list[Any]:
"""
Send multiple requests in batches to Graph API.
Args:
requests (list[RequestInformation]): List of request configurations
max_batch_size (int): Maximum requests per batch (default: 20)
Returns:
list[Any]: List of responses from Graph API
"""
def get_user(
self,
user_id: str,
select_properties: list[str] | None = None
) -> dict[str, Any]:
"""
Get user information from Microsoft Graph.
Args:
user_id (str): User ID or principal name
select_properties (list[str] | None): Properties to select
Returns:
dict[str, Any]: User information
"""
def list_users(
self,
filter_expression: str | None = None,
select_properties: list[str] | None = None,
top: int = 100
) -> list[dict[str, Any]]:
"""
List users from Microsoft Graph.
Args:
filter_expression (str | None): OData filter expression
select_properties (list[str] | None): Properties to select
top (int): Maximum number of results (default: 100)
Returns:
list[dict[str, Any]]: List of user information
"""
def get_group(
self,
group_id: str,
select_properties: list[str] | None = None
) -> dict[str, Any]:
"""
Get group information from Microsoft Graph.
Args:
group_id (str): Group ID
select_properties (list[str] | None): Properties to select
Returns:
dict[str, Any]: Group information
"""
def list_groups(
self,
filter_expression: str | None = None,
select_properties: list[str] | None = None,
top: int = 100
) -> list[dict[str, Any]]:
"""
List groups from Microsoft Graph.
Args:
filter_expression (str | None): OData filter expression
select_properties (list[str] | None): Properties to select
top (int): Maximum number of results (default: 100)
Returns:
list[dict[str, Any]]: List of group information
"""
def send_email(
self,
user_id: str,
subject: str,
body: str,
to_recipients: list[str],
cc_recipients: list[str] | None = None,
bcc_recipients: list[str] | None = None,
attachments: list[dict[str, Any]] | None = None
) -> dict[str, Any]:
"""
Send email through Microsoft Graph.
Args:
user_id (str): Sender user ID
subject (str): Email subject
body (str): Email body content
to_recipients (list[str]): To recipients email addresses
cc_recipients (list[str] | None): CC recipients email addresses
bcc_recipients (list[str] | None): BCC recipients email addresses
attachments (list[dict[str, Any]] | None): Email attachments
Returns:
dict[str, Any]: Email send response
"""
def create_calendar_event(
self,
user_id: str,
subject: str,
start_time: datetime,
end_time: datetime,
attendees: list[str] | None = None,
body: str | None = None,
location: str | None = None
) -> dict[str, Any]:
"""
Create calendar event through Microsoft Graph.
Args:
user_id (str): User ID to create event for
subject (str): Event subject
start_time (datetime): Event start time
end_time (datetime): Event end time
attendees (list[str] | None): Attendee email addresses
body (str | None): Event description
location (str | None): Event location
Returns:
dict[str, Any]: Created event information
"""
def upload_file_to_onedrive(
self,
user_id: str,
file_path: str,
content: bytes,
conflict_behavior: str = "rename"
) -> dict[str, Any]:
"""
Upload file to user's OneDrive through Microsoft Graph.
Args:
user_id (str): User ID
file_path (str): Path where to store the file in OneDrive
content (bytes): File content
conflict_behavior (str): Conflict resolution behavior (default: "rename")
Returns:
dict[str, Any]: Upload response with file information
"""
def get_sharepoint_site(
self,
site_id: str,
select_properties: list[str] | None = None
) -> dict[str, Any]:
"""
Get SharePoint site information.
Args:
site_id (str): SharePoint site ID
select_properties (list[str] | None): Properties to select
Returns:
dict[str, Any]: SharePoint site information
"""
def list_sharepoint_lists(
self,
site_id: str,
select_properties: list[str] | None = None
) -> list[dict[str, Any]]:
"""
List SharePoint lists in a site.
Args:
site_id (str): SharePoint site ID
select_properties (list[str] | None): Properties to select
Returns:
list[dict[str, Any]]: List of SharePoint lists
"""
def create_sharepoint_list_item(
self,
site_id: str,
list_id: str,
item_data: dict[str, Any]
) -> dict[str, Any]:
"""
Create item in SharePoint list.
Args:
site_id (str): SharePoint site ID
list_id (str): SharePoint list ID
item_data (dict[str, Any]): Item data to create
Returns:
dict[str, Any]: Created item information
"""
def get_teams_channels(
self,
team_id: str,
select_properties: list[str] | None = None
) -> list[dict[str, Any]]:
"""
Get channels in a Microsoft Teams team.
Args:
team_id (str): Teams team ID
select_properties (list[str] | None): Properties to select
Returns:
list[dict[str, Any]]: List of team channels
"""
def send_teams_message(
self,
team_id: str,
channel_id: str,
message: str,
message_type: str = "message"
) -> dict[str, Any]:
"""
Send message to Microsoft Teams channel.
Args:
team_id (str): Teams team ID
channel_id (str): Channel ID
message (str): Message content
message_type (str): Message type (default: "message")
Returns:
dict[str, Any]: Message send response
"""Supporting class for handling Microsoft Graph API responses with proper serialization and error handling.
class DefaultResponseHandler(ResponseHandler):
"""
Default response handler for Microsoft Graph API.
Provides standard response handling, error processing,
and data serialization for Graph API calls.
"""
def handle_response_async(
self,
response: Any,
error_map: dict[str, type] | None = None
) -> Any:
"""
Handle asynchronous response from Graph API.
Args:
response (Any): HTTP response from Graph API
error_map (dict[str, type] | None): Error mapping configuration
Returns:
Any: Processed response data
"""
def handle_error_response(
self,
response: Any
) -> Exception:
"""
Handle error responses from Graph API.
Args:
response (Any): Error response from Graph API
Returns:
Exception: Appropriate exception for the error
"""
def serialize_response(
self,
response_data: Any
) -> dict[str, Any]:
"""
Serialize response data from Graph API.
Args:
response_data (Any): Raw response data
Returns:
dict[str, Any]: Serialized response data
"""Execute Microsoft Graph API operations as Airflow tasks with comprehensive Microsoft 365 service integration.
class MSGraphAsyncOperator(BaseOperator):
"""
Executes Microsoft Graph API operations asynchronously.
Supports various Graph API operations with deferrable execution
for long-running Microsoft 365 operations.
"""
def __init__(
self,
*,
conn_id: str = "msgraph_default",
url: str,
method: str = "GET",
query_parameters: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
data: dict[str, Any] | str | None = None,
response_filter: str | None = None,
response_type: type | None = None,
**kwargs
):
"""
Initialize Microsoft Graph async operator.
Args:
conn_id (str): Airflow connection ID for Microsoft Graph
url (str): Graph API endpoint URL
method (str): HTTP method (default: "GET")
query_parameters (dict[str, Any] | None): Query parameters
headers (dict[str, str] | None): Additional headers
data (dict[str, Any] | str | None): Request body data
response_filter (str | None): Response filtering expression
response_type (type | None): Expected response type
"""
def execute(self, context: Context) -> Any:
"""
Execute Microsoft Graph API operation.
Args:
context (Context): Airflow task context
Returns:
Any: Response from Graph API operation
"""
def execute_defer(self, context: Context) -> None:
"""
Execute operation in deferrable mode for long-running operations.
Args:
context (Context): Airflow task context
"""Monitor Microsoft 365 resources and wait for specific conditions using Microsoft Graph API.
class MSGraphSensor(BaseSensorOperator):
"""
Monitors Microsoft Graph API resources.
Provides sensor capabilities for waiting on Microsoft 365 resource
states and conditions through Graph API polling.
"""
def __init__(
self,
*,
conn_id: str = "msgraph_default",
url: str,
method: str = "GET",
query_parameters: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
response_filter: str | None = None,
success_condition: callable | None = None,
**kwargs
):
"""
Initialize Microsoft Graph sensor.
Args:
conn_id (str): Airflow connection ID for Microsoft Graph
url (str): Graph API endpoint URL to monitor
method (str): HTTP method (default: "GET")
query_parameters (dict[str, Any] | None): Query parameters
headers (dict[str, str] | None): Additional headers
response_filter (str | None): Response filtering expression
success_condition (callable | None): Function to evaluate success condition
"""
def poke(self, context: Context) -> bool:
"""
Poke Microsoft Graph API resource for condition.
Args:
context (Context): Airflow task context
Returns:
bool: True if condition is met, False otherwise
"""Enable asynchronous monitoring and event-driven operations for Microsoft 365 services.
class MSGraphTrigger(BaseTrigger):
"""
Async trigger for Microsoft Graph API operations.
Provides asynchronous monitoring and event handling for
Microsoft 365 resources through Graph API.
"""
def __init__(
self,
conn_id: str,
url: str,
method: str = "GET",
query_parameters: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
response_filter: str | None = None,
timeout: int = 3600,
check_interval: int = 60
):
"""
Initialize Microsoft Graph trigger.
Args:
conn_id (str): Airflow connection ID for Microsoft Graph
url (str): Graph API endpoint URL
method (str): HTTP method (default: "GET")
query_parameters (dict[str, Any] | None): Query parameters
headers (dict[str, str] | None): Additional headers
response_filter (str | None): Response filtering expression
timeout (int): Timeout in seconds (default: 3600)
check_interval (int): Check interval in seconds (default: 60)
"""
def run(self) -> AsyncIterator[TriggerEvent]:
"""
Run asynchronous monitoring of Graph API resource.
Yields:
TriggerEvent: Events when conditions are met or timeout occurs
"""
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize trigger configuration for persistence.
Returns:
tuple[str, dict[str, Any]]: Serialized trigger data
"""Supporting class for serializing Microsoft Graph API responses in trigger operations.
class ResponseSerializer:
"""
Serializer for Graph API responses.
Provides serialization capabilities for Graph API response data
in trigger and asynchronous operations.
"""
@staticmethod
def serialize_response(response: Any) -> dict[str, Any]:
"""
Serialize Graph API response for storage or transmission.
Args:
response (Any): Graph API response object
Returns:
dict[str, Any]: Serialized response data
"""
@staticmethod
def deserialize_response(data: dict[str, Any]) -> Any:
"""
Deserialize Graph API response from stored data.
Args:
data (dict[str, Any]): Serialized response data
Returns:
Any: Deserialized response object
"""from airflow import DAG
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_user_data(**context):
"""Process user data retrieved from Microsoft Graph."""
users_data = context['task_instance'].xcom_pull(task_ids='get_users')
print(f"Retrieved {len(users_data.get('value', []))} users")
for user in users_data.get('value', []):
print(f"User: {user.get('displayName')} ({user.get('userPrincipalName')})")
return len(users_data.get('value', []))
def send_notification_email():
"""Send notification email using Microsoft Graph."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
# Send email notification
result = hook.send_email(
user_id='admin@company.com',
subject='Airflow Workflow Notification',
body='Your daily data processing workflow has completed successfully.',
to_recipients=['team@company.com', 'manager@company.com'],
cc_recipients=['notifications@company.com']
)
print(f"Email sent successfully: {result}")
return result
dag = DAG(
'msgraph_basic_workflow',
default_args={
'owner': 'integration-team',
'retries': 2,
'retry_delay': timedelta(minutes=3)
},
description='Basic Microsoft Graph API workflow',
schedule_interval=timedelta(hours=12),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Get list of users
get_users = MSGraphAsyncOperator(
task_id='get_users',
conn_id='msgraph_conn',
url='users',
method='GET',
query_parameters={
'$select': 'id,displayName,userPrincipalName,mail',
'$filter': "accountEnabled eq true",
'$top': 50
},
dag=dag
)
# Process user data
process_users = PythonOperator(
task_id='process_users',
python_callable=process_user_data,
dag=dag
)
# Send notification
send_notification = PythonOperator(
task_id='send_notification',
python_callable=send_notification_email,
dag=dag
)
get_users >> process_users >> send_notificationfrom airflow import DAG
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
def manage_groups_and_users():
"""Manage Microsoft 365 groups and user memberships."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
# Get all security groups
groups = hook.list_groups(
filter_expression="groupTypes/any(c:c eq 'Unified') or securityEnabled eq true",
select_properties=['id', 'displayName', 'groupTypes', 'securityEnabled'],
top=100
)
print(f"Found {len(groups)} groups")
# Check membership for specific groups
critical_groups = ['Data-Scientists', 'Security-Admins', 'Project-Managers']
group_analysis = {}
for group in groups:
if group['displayName'] in critical_groups:
# Get group members
members_url = f"groups/{group['id']}/members"
members_response = hook.send_request(
hook.request_information(
url=members_url,
query_parameters={'$select': 'id,displayName,userPrincipalName'}
)
)
group_analysis[group['displayName']] = {
'id': group['id'],
'member_count': len(members_response.get('value', [])),
'members': members_response.get('value', [])
}
return group_analysis
def manage_calendar_events():
"""Manage calendar events for team coordination."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
# Create recurring team meeting
team_meeting = hook.create_calendar_event(
user_id='teamlead@company.com',
subject='Weekly Data Pipeline Review',
start_time=datetime.now().replace(hour=10, minute=0, second=0, microsecond=0),
end_time=datetime.now().replace(hour=11, minute=0, second=0, microsecond=0),
attendees=[
'dataengineer1@company.com',
'dataengineer2@company.com',
'analyst@company.com'
],
body='Weekly review of data pipeline status, issues, and improvements.',
location='Conference Room A / Teams'
)
print(f"Team meeting created: {team_meeting}")
# Create workflow completion notification event
notification_event = hook.create_calendar_event(
user_id='admin@company.com',
subject='Data Processing Workflow Completed',
start_time=datetime.now(),
end_time=datetime.now() + timedelta(minutes=30),
body='Daily data processing workflow has completed. Check results in dashboard.'
)
return {
'team_meeting': team_meeting,
'notification_event': notification_event
}
def sync_sharepoint_data():
"""Sync data with SharePoint lists."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
site_id = 'company.sharepoint.com,site-id,web-id'
# Get SharePoint lists
lists = hook.list_sharepoint_lists(
site_id=site_id,
select_properties=['id', 'displayName', 'list']
)
# Find the data tracking list
data_list = None
for sp_list in lists:
if sp_list['displayName'] == 'Data Processing Status':
data_list = sp_list
break
if data_list:
# Create status entry
status_item = hook.create_sharepoint_list_item(
site_id=site_id,
list_id=data_list['id'],
item_data={
'Title': f'Pipeline Run {datetime.now().strftime("%Y-%m-%d %H:%M")}',
'Status': 'Completed',
'ProcessedRecords': 150000,
'StartTime': datetime.now().isoformat(),
'EndTime': (datetime.now() + timedelta(hours=2)).isoformat()
}
)
print(f"Status item created: {status_item}")
return status_item
else:
print("Data Processing Status list not found")
return None
def send_teams_notifications():
"""Send notifications to Microsoft Teams channels."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
team_id = 'data-engineering-team-id'
# Get team channels
channels = hook.get_teams_channels(
team_id=team_id,
select_properties=['id', 'displayName']
)
# Find general channel
general_channel = None
for channel in channels:
if channel['displayName'].lower() == 'general':
general_channel = channel
break
if general_channel:
# Send completion notification
message_result = hook.send_teams_message(
team_id=team_id,
channel_id=general_channel['id'],
message=f"""
🎉 **Daily Data Pipeline Completed Successfully**
**Execution Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Status**: ✅ Success
**Records Processed**: 150,000
**Duration**: 2 hours 15 minutes
Check the dashboard for detailed results: [Dashboard Link](https://company-dashboard.com)
"""
)
print(f"Teams message sent: {message_result}")
return message_result
return None
dag = DAG(
'msgraph_advanced_workflow',
default_args={
'owner': 'integration-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Advanced Microsoft 365 integration workflow',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Manage groups and users
manage_groups = PythonOperator(
task_id='manage_groups',
python_callable=manage_groups_and_users,
dag=dag
)
# Manage calendar events
manage_calendar = PythonOperator(
task_id='manage_calendar',
python_callable=manage_calendar_events,
dag=dag
)
# Sync SharePoint data
sync_sharepoint = PythonOperator(
task_id='sync_sharepoint',
python_callable=sync_sharepoint_data,
dag=dag
)
# Send Teams notifications
notify_teams = PythonOperator(
task_id='notify_teams',
python_callable=send_teams_notifications,
dag=dag
)
# Get Office 365 usage reports
get_reports = MSGraphAsyncOperator(
task_id='get_usage_reports',
conn_id='msgraph_conn',
url='reports/getOffice365ActiveUserDetail(period=\'D7\')',
method='GET',
dag=dag
)
manage_groups >> [manage_calendar, sync_sharepoint] >> notify_teams >> get_reportsfrom airflow import DAG
from airflow.providers.microsoft.azure.sensors.msgraph import MSGraphSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def check_file_processing_condition(response_data):
"""Check if file processing condition is met."""
files = response_data.get('value', [])
# Check if any new files have been uploaded in the last hour
recent_files = []
current_time = datetime.now()
for file in files:
file_modified = datetime.fromisoformat(file.get('lastModifiedDateTime', '').replace('Z', '+00:00'))
if (current_time - file_modified).total_seconds() < 3600: # 1 hour
recent_files.append(file)
print(f"Found {len(recent_files)} recent files")
return len(recent_files) > 0
def process_detected_files(**context):
"""Process files that were detected by the sensor."""
sensor_result = context['task_instance'].xcom_pull(task_ids='wait_for_new_files')
print(f"Sensor detected new files: {sensor_result}")
# Process the detected files
# Implementation would include file processing logic
return "Files processed successfully"
dag = DAG(
'msgraph_sensor_workflow',
default_args={
'owner': 'monitoring-team',
'retries': 1,
'retry_delay': timedelta(minutes=2)
},
description='Microsoft Graph sensor workflow',
schedule_interval=timedelta(minutes=30),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Wait for new files in OneDrive
wait_for_files = MSGraphSensor(
task_id='wait_for_new_files',
conn_id='msgraph_conn',
url='me/drive/root/children',
method='GET',
query_parameters={
'$select': 'id,name,lastModifiedDateTime,size',
'$filter': "folder eq null" # Only files, not folders
},
success_condition=check_file_processing_condition,
timeout=1800, # 30 minutes timeout
poke_interval=60, # Check every minute
dag=dag
)
# Process detected files
process_files = PythonOperator(
task_id='process_files',
python_callable=process_detected_files,
dag=dag
)
wait_for_files >> process_filesmsgraph)Configure Microsoft Graph API connections in Airflow:
# Connection configuration for Microsoft Graph
{
"conn_id": "msgraph_default",
"conn_type": "msgraph",
"host": "graph.microsoft.com", # Graph API endpoint
"extra": {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"api_version": "v1.0" # or "beta" for preview features
}
}Microsoft Graph API supports multiple authentication methods:
Application (Client Credentials) Authentication:
extra = {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"auth_type": "client_credentials"
}Delegated Authentication (Authorization Code):
extra = {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret",
"auth_type": "authorization_code",
"scopes": ["https://graph.microsoft.com/.default"]
}Certificate-based Authentication:
extra = {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"certificate_path": "/path/to/certificate.pem",
"certificate_thumbprint": "cert-thumbprint",
"auth_type": "certificate"
}Managed Identity Authentication:
extra = {
"managed_identity_client_id": "your-managed-identity-client-id",
"auth_type": "managed_identity"
}from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from kiota_abstractions.api_error import APIError
def robust_graph_operations():
"""Demonstrate error handling patterns for Graph API operations."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
try:
# Attempt to get user information
user_info = hook.get_user('user@company.com')
print(f"User found: {user_info}")
except APIError as api_error:
if api_error.response_status_code == 404:
print("User not found")
return None
elif api_error.response_status_code == 403:
print("Insufficient permissions to access user information")
raise PermissionError("Insufficient Graph API permissions")
elif api_error.response_status_code == 429:
print("Rate limit exceeded, implementing retry logic")
# Implement exponential backoff retry
import time
time.sleep(60) # Wait 1 minute before retry
return hook.get_user('user@company.com')
else:
print(f"API error: {api_error}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
try:
# Batch request with error handling
requests = [
hook.request_information(url='users/user1@company.com'),
hook.request_information(url='users/user2@company.com'),
hook.request_information(url='users/nonexistent@company.com') # This will fail
]
responses = hook.batch_request(requests)
for i, response in enumerate(responses):
if isinstance(response, dict) and 'error' in response:
print(f"Request {i} failed: {response['error']}")
else:
print(f"Request {i} succeeded: {response.get('displayName', 'Unknown')}")
except Exception as e:
print(f"Batch request error: {e}")
# Handle batch errors appropriately
def implement_retry_logic():
"""Implement retry logic for Graph API operations."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
import time
import random
def retry_with_backoff(operation, max_retries=3, base_delay=1):
"""Retry operation with exponential backoff."""
for attempt in range(max_retries):
try:
return operation()
except APIError as e:
if e.response_status_code == 429: # Rate limit
retry_after = int(e.response_headers.get('Retry-After', base_delay * (2 ** attempt)))
jitter = random.uniform(0.1, 0.3) * retry_after
sleep_time = retry_after + jitter
print(f"Rate limited, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
time.sleep(sleep_time)
if attempt == max_retries - 1:
raise
else:
raise
except Exception as e:
if attempt == max_retries - 1:
raise
sleep_time = base_delay * (2 ** attempt) + random.uniform(0.1, 0.5)
print(f"Operation failed, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
time.sleep(sleep_time)
# Use retry logic for operations
user_data = retry_with_backoff(lambda: hook.get_user('user@company.com'))
return user_datadef test_graph_connection():
"""Test Microsoft Graph API connection and permissions."""
try:
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
# Test basic connection
success, message = hook.test_connection()
if not success:
print(f"Connection test failed: {message}")
return False
print("Basic connection successful")
# Test specific permissions
permissions_tests = {
'User.Read.All': lambda: hook.list_users(top=1),
'Group.Read.All': lambda: hook.list_groups(top=1),
'Mail.Send': lambda: hook.get_api_version(), # Basic test for mail permissions
'Sites.Read.All': lambda: hook.get_api_version() # Basic test for SharePoint permissions
}
results = {}
for permission, test_func in permissions_tests.items():
try:
test_func()
results[permission] = True
print(f"✓ {permission}: Available")
except APIError as e:
if e.response_status_code == 403:
results[permission] = False
print(f"✗ {permission}: Insufficient permissions")
else:
results[permission] = False
print(f"✗ {permission}: Error - {e}")
except Exception as e:
results[permission] = False
print(f"✗ {permission}: Unexpected error - {e}")
return all(results.values())
except Exception as e:
print(f"Connection test failed with error: {e}")
return Falsedef optimize_graph_operations():
"""Demonstrate Graph API optimization techniques."""
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
# Use $select to only retrieve needed properties
efficient_user_query = hook.list_users(
select_properties=['id', 'displayName', 'userPrincipalName'], # Only get what you need
top=100 # Limit results appropriately
)
# Use $filter to reduce data transfer
filtered_users = hook.list_users(
filter_expression="department eq 'Engineering'",
select_properties=['id', 'displayName', 'mail'],
top=50
)
# Use batch requests for multiple operations
batch_requests = []
user_ids = ['user1@company.com', 'user2@company.com', 'user3@company.com']
for user_id in user_ids:
batch_requests.append(
hook.request_information(
url=f'users/{user_id}',
query_parameters={'$select': 'id,displayName,mail'}
)
)
# Execute all requests in a single batch
batch_results = hook.batch_request(batch_requests, max_batch_size=20)
return {
'efficient_query_count': len(efficient_user_query),
'filtered_users_count': len(filtered_users),
'batch_results_count': len(batch_results)
}
def implement_caching_strategy():
"""Implement caching for frequently accessed Graph data."""
from functools import lru_cache
import time
class CachedGraphHook:
def __init__(self, conn_id):
self.hook = KiotaRequestAdapterHook(conn_id=conn_id)
self._cache_timestamp = {}
self._cache_ttl = 3600 # 1 hour TTL
def _is_cache_valid(self, key):
"""Check if cached data is still valid."""
if key not in self._cache_timestamp:
return False
return (time.time() - self._cache_timestamp[key]) < self._cache_ttl
@lru_cache(maxsize=100)
def get_user_cached(self, user_id):
"""Get user with caching."""
cache_key = f"user_{user_id}"
if self._is_cache_valid(cache_key):
# Return from LRU cache
pass
else:
# Update cache timestamp
self._cache_timestamp[cache_key] = time.time()
return self.hook.get_user(user_id, select_properties=['id', 'displayName', 'mail'])
def invalidate_user_cache(self, user_id):
"""Invalidate user cache."""
cache_key = f"user_{user_id}"
if cache_key in self._cache_timestamp:
del self._cache_timestamp[cache_key]
# Clear from LRU cache
self.get_user_cached.cache_clear()
return CachedGraphHookThis comprehensive documentation covers all Microsoft Graph API capabilities in the Apache Airflow Microsoft Azure Provider, including authentication methods, API operations, sensor monitoring, trigger-based operations, and performance optimization techniques for Microsoft 365 integration.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure