0
# Microsoft Graph API
1
2
Access Microsoft Graph API for Microsoft 365 services integration with comprehensive support for Graph API endpoints, operations, and asynchronous processing capabilities for Microsoft cloud services.
3
4
## Capabilities
5
6
### Microsoft Graph Request Adapter Hook
7
8
Primary interface for Microsoft Graph API operations using Kiota request adapter for modern Graph API access patterns.
9
10
```python { .api }
11
class KiotaRequestAdapterHook(BaseHook):
12
"""
13
Hook for Microsoft Graph API using Kiota request adapter.
14
15
Provides methods for accessing Microsoft Graph API endpoints with
16
modern authentication and request handling capabilities.
17
"""
18
19
def get_conn(self) -> RequestAdapter:
20
"""
21
Get authenticated Graph API request adapter.
22
23
Returns:
24
RequestAdapter: Kiota request adapter for Graph API calls
25
"""
26
27
def test_connection(self) -> tuple[bool, str]:
28
"""
29
Test the Microsoft Graph API connection.
30
31
Returns:
32
tuple[bool, str]: Success status and message
33
"""
34
35
def request_information(
36
self,
37
url: str,
38
method: str = "GET",
39
headers: dict[str, str] | None = None,
40
query_parameters: dict[str, Any] | None = None,
41
content: bytes | None = None
42
) -> RequestInformation:
43
"""
44
Create request information for Graph API calls.
45
46
Args:
47
url (str): Graph API endpoint URL
48
method (str): HTTP method (default: "GET")
49
headers (dict[str, str] | None): Additional headers
50
query_parameters (dict[str, Any] | None): Query parameters
51
content (bytes | None): Request body content
52
53
Returns:
54
RequestInformation: Configured request information object
55
"""
56
57
def get_api_version(self) -> str:
58
"""
59
Get the Microsoft Graph API version being used.
60
61
Returns:
62
str: Graph API version (e.g., "v1.0", "beta")
63
"""
64
65
def get_base_url(self) -> str:
66
"""
67
Get the base URL for Microsoft Graph API.
68
69
Returns:
70
str: Base URL for Graph API endpoints
71
"""
72
73
def send_request(
74
self,
75
request_info: RequestInformation,
76
response_handler: ResponseHandler | None = None
77
) -> Any:
78
"""
79
Send a request to Microsoft Graph API.
80
81
Args:
82
request_info (RequestInformation): Request configuration
83
response_handler (ResponseHandler | None): Custom response handler
84
85
Returns:
86
Any: Response from Graph API
87
"""
88
89
def batch_request(
90
self,
91
requests: list[RequestInformation],
92
max_batch_size: int = 20
93
) -> list[Any]:
94
"""
95
Send multiple requests in batches to Graph API.
96
97
Args:
98
requests (list[RequestInformation]): List of request configurations
99
max_batch_size (int): Maximum requests per batch (default: 20)
100
101
Returns:
102
list[Any]: List of responses from Graph API
103
"""
104
105
def get_user(
106
self,
107
user_id: str,
108
select_properties: list[str] | None = None
109
) -> dict[str, Any]:
110
"""
111
Get user information from Microsoft Graph.
112
113
Args:
114
user_id (str): User ID or principal name
115
select_properties (list[str] | None): Properties to select
116
117
Returns:
118
dict[str, Any]: User information
119
"""
120
121
def list_users(
122
self,
123
filter_expression: str | None = None,
124
select_properties: list[str] | None = None,
125
top: int = 100
126
) -> list[dict[str, Any]]:
127
"""
128
List users from Microsoft Graph.
129
130
Args:
131
filter_expression (str | None): OData filter expression
132
select_properties (list[str] | None): Properties to select
133
top (int): Maximum number of results (default: 100)
134
135
Returns:
136
list[dict[str, Any]]: List of user information
137
"""
138
139
def get_group(
140
self,
141
group_id: str,
142
select_properties: list[str] | None = None
143
) -> dict[str, Any]:
144
"""
145
Get group information from Microsoft Graph.
146
147
Args:
148
group_id (str): Group ID
149
select_properties (list[str] | None): Properties to select
150
151
Returns:
152
dict[str, Any]: Group information
153
"""
154
155
def list_groups(
156
self,
157
filter_expression: str | None = None,
158
select_properties: list[str] | None = None,
159
top: int = 100
160
) -> list[dict[str, Any]]:
161
"""
162
List groups from Microsoft Graph.
163
164
Args:
165
filter_expression (str | None): OData filter expression
166
select_properties (list[str] | None): Properties to select
167
top (int): Maximum number of results (default: 100)
168
169
Returns:
170
list[dict[str, Any]]: List of group information
171
"""
172
173
def send_email(
174
self,
175
user_id: str,
176
subject: str,
177
body: str,
178
to_recipients: list[str],
179
cc_recipients: list[str] | None = None,
180
bcc_recipients: list[str] | None = None,
181
attachments: list[dict[str, Any]] | None = None
182
) -> dict[str, Any]:
183
"""
184
Send email through Microsoft Graph.
185
186
Args:
187
user_id (str): Sender user ID
188
subject (str): Email subject
189
body (str): Email body content
190
to_recipients (list[str]): To recipients email addresses
191
cc_recipients (list[str] | None): CC recipients email addresses
192
bcc_recipients (list[str] | None): BCC recipients email addresses
193
attachments (list[dict[str, Any]] | None): Email attachments
194
195
Returns:
196
dict[str, Any]: Email send response
197
"""
198
199
def create_calendar_event(
200
self,
201
user_id: str,
202
subject: str,
203
start_time: datetime,
204
end_time: datetime,
205
attendees: list[str] | None = None,
206
body: str | None = None,
207
location: str | None = None
208
) -> dict[str, Any]:
209
"""
210
Create calendar event through Microsoft Graph.
211
212
Args:
213
user_id (str): User ID to create event for
214
subject (str): Event subject
215
start_time (datetime): Event start time
216
end_time (datetime): Event end time
217
attendees (list[str] | None): Attendee email addresses
218
body (str | None): Event description
219
location (str | None): Event location
220
221
Returns:
222
dict[str, Any]: Created event information
223
"""
224
225
def upload_file_to_onedrive(
226
self,
227
user_id: str,
228
file_path: str,
229
content: bytes,
230
conflict_behavior: str = "rename"
231
) -> dict[str, Any]:
232
"""
233
Upload file to user's OneDrive through Microsoft Graph.
234
235
Args:
236
user_id (str): User ID
237
file_path (str): Path where to store the file in OneDrive
238
content (bytes): File content
239
conflict_behavior (str): Conflict resolution behavior (default: "rename")
240
241
Returns:
242
dict[str, Any]: Upload response with file information
243
"""
244
245
def get_sharepoint_site(
246
self,
247
site_id: str,
248
select_properties: list[str] | None = None
249
) -> dict[str, Any]:
250
"""
251
Get SharePoint site information.
252
253
Args:
254
site_id (str): SharePoint site ID
255
select_properties (list[str] | None): Properties to select
256
257
Returns:
258
dict[str, Any]: SharePoint site information
259
"""
260
261
def list_sharepoint_lists(
262
self,
263
site_id: str,
264
select_properties: list[str] | None = None
265
) -> list[dict[str, Any]]:
266
"""
267
List SharePoint lists in a site.
268
269
Args:
270
site_id (str): SharePoint site ID
271
select_properties (list[str] | None): Properties to select
272
273
Returns:
274
list[dict[str, Any]]: List of SharePoint lists
275
"""
276
277
def create_sharepoint_list_item(
278
self,
279
site_id: str,
280
list_id: str,
281
item_data: dict[str, Any]
282
) -> dict[str, Any]:
283
"""
284
Create item in SharePoint list.
285
286
Args:
287
site_id (str): SharePoint site ID
288
list_id (str): SharePoint list ID
289
item_data (dict[str, Any]): Item data to create
290
291
Returns:
292
dict[str, Any]: Created item information
293
"""
294
295
def get_teams_channels(
296
self,
297
team_id: str,
298
select_properties: list[str] | None = None
299
) -> list[dict[str, Any]]:
300
"""
301
Get channels in a Microsoft Teams team.
302
303
Args:
304
team_id (str): Teams team ID
305
select_properties (list[str] | None): Properties to select
306
307
Returns:
308
list[dict[str, Any]]: List of team channels
309
"""
310
311
def send_teams_message(
312
self,
313
team_id: str,
314
channel_id: str,
315
message: str,
316
message_type: str = "message"
317
) -> dict[str, Any]:
318
"""
319
Send message to Microsoft Teams channel.
320
321
Args:
322
team_id (str): Teams team ID
323
channel_id (str): Channel ID
324
message (str): Message content
325
message_type (str): Message type (default: "message")
326
327
Returns:
328
dict[str, Any]: Message send response
329
"""
330
```
331
332
### Response Handler
333
334
Supporting class for handling Microsoft Graph API responses with proper serialization and error handling.
335
336
```python { .api }
337
class DefaultResponseHandler(ResponseHandler):
338
"""
339
Default response handler for Microsoft Graph API.
340
341
Provides standard response handling, error processing,
342
and data serialization for Graph API calls.
343
"""
344
345
def handle_response_async(
346
self,
347
response: Any,
348
error_map: dict[str, type] | None = None
349
) -> Any:
350
"""
351
Handle asynchronous response from Graph API.
352
353
Args:
354
response (Any): HTTP response from Graph API
355
error_map (dict[str, type] | None): Error mapping configuration
356
357
Returns:
358
Any: Processed response data
359
"""
360
361
def handle_error_response(
362
self,
363
response: Any
364
) -> Exception:
365
"""
366
Handle error responses from Graph API.
367
368
Args:
369
response (Any): Error response from Graph API
370
371
Returns:
372
Exception: Appropriate exception for the error
373
"""
374
375
def serialize_response(
376
self,
377
response_data: Any
378
) -> dict[str, Any]:
379
"""
380
Serialize response data from Graph API.
381
382
Args:
383
response_data (Any): Raw response data
384
385
Returns:
386
dict[str, Any]: Serialized response data
387
"""
388
```
389
390
## Microsoft Graph Operators
391
392
Execute Microsoft Graph API operations as Airflow tasks with comprehensive Microsoft 365 service integration.
393
394
### Asynchronous Graph API Operator
395
396
```python { .api }
397
class MSGraphAsyncOperator(BaseOperator):
398
"""
399
Executes Microsoft Graph API operations asynchronously.
400
401
Supports various Graph API operations with deferrable execution
402
for long-running Microsoft 365 operations.
403
"""
404
405
def __init__(
406
self,
407
*,
408
conn_id: str = "msgraph_default",
409
url: str,
410
method: str = "GET",
411
query_parameters: dict[str, Any] | None = None,
412
headers: dict[str, str] | None = None,
413
data: dict[str, Any] | str | None = None,
414
response_filter: str | None = None,
415
response_type: type | None = None,
416
**kwargs
417
):
418
"""
419
Initialize Microsoft Graph async operator.
420
421
Args:
422
conn_id (str): Airflow connection ID for Microsoft Graph
423
url (str): Graph API endpoint URL
424
method (str): HTTP method (default: "GET")
425
query_parameters (dict[str, Any] | None): Query parameters
426
headers (dict[str, str] | None): Additional headers
427
data (dict[str, Any] | str | None): Request body data
428
response_filter (str | None): Response filtering expression
429
response_type (type | None): Expected response type
430
"""
431
432
def execute(self, context: Context) -> Any:
433
"""
434
Execute Microsoft Graph API operation.
435
436
Args:
437
context (Context): Airflow task context
438
439
Returns:
440
Any: Response from Graph API operation
441
"""
442
443
def execute_defer(self, context: Context) -> None:
444
"""
445
Execute operation in deferrable mode for long-running operations.
446
447
Args:
448
context (Context): Airflow task context
449
"""
450
```
451
452
## Microsoft Graph Sensors
453
454
Monitor Microsoft 365 resources and wait for specific conditions using Microsoft Graph API.
455
456
### Graph API Sensor
457
458
```python { .api }
459
class MSGraphSensor(BaseSensorOperator):
460
"""
461
Monitors Microsoft Graph API resources.
462
463
Provides sensor capabilities for waiting on Microsoft 365 resource
464
states and conditions through Graph API polling.
465
"""
466
467
def __init__(
468
self,
469
*,
470
conn_id: str = "msgraph_default",
471
url: str,
472
method: str = "GET",
473
query_parameters: dict[str, Any] | None = None,
474
headers: dict[str, str] | None = None,
475
response_filter: str | None = None,
476
success_condition: callable | None = None,
477
**kwargs
478
):
479
"""
480
Initialize Microsoft Graph sensor.
481
482
Args:
483
conn_id (str): Airflow connection ID for Microsoft Graph
484
url (str): Graph API endpoint URL to monitor
485
method (str): HTTP method (default: "GET")
486
query_parameters (dict[str, Any] | None): Query parameters
487
headers (dict[str, str] | None): Additional headers
488
response_filter (str | None): Response filtering expression
489
success_condition (callable | None): Function to evaluate success condition
490
"""
491
492
def poke(self, context: Context) -> bool:
493
"""
494
Poke Microsoft Graph API resource for condition.
495
496
Args:
497
context (Context): Airflow task context
498
499
Returns:
500
bool: True if condition is met, False otherwise
501
"""
502
```
503
504
## Microsoft Graph Triggers
505
506
Enable asynchronous monitoring and event-driven operations for Microsoft 365 services.
507
508
### Graph API Trigger
509
510
```python { .api }
511
class MSGraphTrigger(BaseTrigger):
512
"""
513
Async trigger for Microsoft Graph API operations.
514
515
Provides asynchronous monitoring and event handling for
516
Microsoft 365 resources through Graph API.
517
"""
518
519
def __init__(
520
self,
521
conn_id: str,
522
url: str,
523
method: str = "GET",
524
query_parameters: dict[str, Any] | None = None,
525
headers: dict[str, str] | None = None,
526
response_filter: str | None = None,
527
timeout: int = 3600,
528
check_interval: int = 60
529
):
530
"""
531
Initialize Microsoft Graph trigger.
532
533
Args:
534
conn_id (str): Airflow connection ID for Microsoft Graph
535
url (str): Graph API endpoint URL
536
method (str): HTTP method (default: "GET")
537
query_parameters (dict[str, Any] | None): Query parameters
538
headers (dict[str, str] | None): Additional headers
539
response_filter (str | None): Response filtering expression
540
timeout (int): Timeout in seconds (default: 3600)
541
check_interval (int): Check interval in seconds (default: 60)
542
"""
543
544
def run(self) -> AsyncIterator[TriggerEvent]:
545
"""
546
Run asynchronous monitoring of Graph API resource.
547
548
Yields:
549
TriggerEvent: Events when conditions are met or timeout occurs
550
"""
551
552
def serialize(self) -> tuple[str, dict[str, Any]]:
553
"""
554
Serialize trigger configuration for persistence.
555
556
Returns:
557
tuple[str, dict[str, Any]]: Serialized trigger data
558
"""
559
```
560
561
### Response Serializer
562
563
Supporting class for serializing Microsoft Graph API responses in trigger operations.
564
565
```python { .api }
566
class ResponseSerializer:
567
"""
568
Serializer for Graph API responses.
569
570
Provides serialization capabilities for Graph API response data
571
in trigger and asynchronous operations.
572
"""
573
574
@staticmethod
575
def serialize_response(response: Any) -> dict[str, Any]:
576
"""
577
Serialize Graph API response for storage or transmission.
578
579
Args:
580
response (Any): Graph API response object
581
582
Returns:
583
dict[str, Any]: Serialized response data
584
"""
585
586
@staticmethod
587
def deserialize_response(data: dict[str, Any]) -> Any:
588
"""
589
Deserialize Graph API response from stored data.
590
591
Args:
592
data (dict[str, Any]): Serialized response data
593
594
Returns:
595
Any: Deserialized response object
596
"""
597
```
598
599
## Usage Examples
600
601
### Basic Microsoft Graph Operations
602
603
```python
604
from airflow import DAG
605
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
606
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
607
from airflow.operators.python import PythonOperator
608
from datetime import datetime, timedelta
609
610
def process_user_data(**context):
611
"""Process user data retrieved from Microsoft Graph."""
612
users_data = context['task_instance'].xcom_pull(task_ids='get_users')
613
614
print(f"Retrieved {len(users_data.get('value', []))} users")
615
616
for user in users_data.get('value', []):
617
print(f"User: {user.get('displayName')} ({user.get('userPrincipalName')})")
618
619
return len(users_data.get('value', []))
620
621
def send_notification_email():
622
"""Send notification email using Microsoft Graph."""
623
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
624
625
# Send email notification
626
result = hook.send_email(
627
user_id='admin@company.com',
628
subject='Airflow Workflow Notification',
629
body='Your daily data processing workflow has completed successfully.',
630
to_recipients=['team@company.com', 'manager@company.com'],
631
cc_recipients=['notifications@company.com']
632
)
633
634
print(f"Email sent successfully: {result}")
635
return result
636
637
dag = DAG(
638
'msgraph_basic_workflow',
639
default_args={
640
'owner': 'integration-team',
641
'retries': 2,
642
'retry_delay': timedelta(minutes=3)
643
},
644
description='Basic Microsoft Graph API workflow',
645
schedule_interval=timedelta(hours=12),
646
start_date=datetime(2024, 1, 1),
647
catchup=False
648
)
649
650
# Get list of users
651
get_users = MSGraphAsyncOperator(
652
task_id='get_users',
653
conn_id='msgraph_conn',
654
url='users',
655
method='GET',
656
query_parameters={
657
'$select': 'id,displayName,userPrincipalName,mail',
658
'$filter': "accountEnabled eq true",
659
'$top': 50
660
},
661
dag=dag
662
)
663
664
# Process user data
665
process_users = PythonOperator(
666
task_id='process_users',
667
python_callable=process_user_data,
668
dag=dag
669
)
670
671
# Send notification
672
send_notification = PythonOperator(
673
task_id='send_notification',
674
python_callable=send_notification_email,
675
dag=dag
676
)
677
678
get_users >> process_users >> send_notification
679
```
680
681
### Advanced Microsoft 365 Integration
682
683
```python
684
from airflow import DAG
685
from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator
686
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
687
from airflow.operators.python import PythonOperator
688
from datetime import datetime, timedelta
689
import json
690
691
def manage_groups_and_users():
692
"""Manage Microsoft 365 groups and user memberships."""
693
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
694
695
# Get all security groups
696
groups = hook.list_groups(
697
filter_expression="groupTypes/any(c:c eq 'Unified') or securityEnabled eq true",
698
select_properties=['id', 'displayName', 'groupTypes', 'securityEnabled'],
699
top=100
700
)
701
702
print(f"Found {len(groups)} groups")
703
704
# Check membership for specific groups
705
critical_groups = ['Data-Scientists', 'Security-Admins', 'Project-Managers']
706
707
group_analysis = {}
708
709
for group in groups:
710
if group['displayName'] in critical_groups:
711
# Get group members
712
members_url = f"groups/{group['id']}/members"
713
members_response = hook.send_request(
714
hook.request_information(
715
url=members_url,
716
query_parameters={'$select': 'id,displayName,userPrincipalName'}
717
)
718
)
719
720
group_analysis[group['displayName']] = {
721
'id': group['id'],
722
'member_count': len(members_response.get('value', [])),
723
'members': members_response.get('value', [])
724
}
725
726
return group_analysis
727
728
def manage_calendar_events():
729
"""Manage calendar events for team coordination."""
730
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
731
732
# Create recurring team meeting
733
team_meeting = hook.create_calendar_event(
734
user_id='teamlead@company.com',
735
subject='Weekly Data Pipeline Review',
736
start_time=datetime.now().replace(hour=10, minute=0, second=0, microsecond=0),
737
end_time=datetime.now().replace(hour=11, minute=0, second=0, microsecond=0),
738
attendees=[
739
'dataengineer1@company.com',
740
'dataengineer2@company.com',
741
'analyst@company.com'
742
],
743
body='Weekly review of data pipeline status, issues, and improvements.',
744
location='Conference Room A / Teams'
745
)
746
747
print(f"Team meeting created: {team_meeting}")
748
749
# Create workflow completion notification event
750
notification_event = hook.create_calendar_event(
751
user_id='admin@company.com',
752
subject='Data Processing Workflow Completed',
753
start_time=datetime.now(),
754
end_time=datetime.now() + timedelta(minutes=30),
755
body='Daily data processing workflow has completed. Check results in dashboard.'
756
)
757
758
return {
759
'team_meeting': team_meeting,
760
'notification_event': notification_event
761
}
762
763
def sync_sharepoint_data():
764
"""Sync data with SharePoint lists."""
765
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
766
767
site_id = 'company.sharepoint.com,site-id,web-id'
768
769
# Get SharePoint lists
770
lists = hook.list_sharepoint_lists(
771
site_id=site_id,
772
select_properties=['id', 'displayName', 'list']
773
)
774
775
# Find the data tracking list
776
data_list = None
777
for sp_list in lists:
778
if sp_list['displayName'] == 'Data Processing Status':
779
data_list = sp_list
780
break
781
782
if data_list:
783
# Create status entry
784
status_item = hook.create_sharepoint_list_item(
785
site_id=site_id,
786
list_id=data_list['id'],
787
item_data={
788
'Title': f'Pipeline Run {datetime.now().strftime("%Y-%m-%d %H:%M")}',
789
'Status': 'Completed',
790
'ProcessedRecords': 150000,
791
'StartTime': datetime.now().isoformat(),
792
'EndTime': (datetime.now() + timedelta(hours=2)).isoformat()
793
}
794
)
795
796
print(f"Status item created: {status_item}")
797
return status_item
798
else:
799
print("Data Processing Status list not found")
800
return None
801
802
def send_teams_notifications():
803
"""Send notifications to Microsoft Teams channels."""
804
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
805
806
team_id = 'data-engineering-team-id'
807
808
# Get team channels
809
channels = hook.get_teams_channels(
810
team_id=team_id,
811
select_properties=['id', 'displayName']
812
)
813
814
# Find general channel
815
general_channel = None
816
for channel in channels:
817
if channel['displayName'].lower() == 'general':
818
general_channel = channel
819
break
820
821
if general_channel:
822
# Send completion notification
823
message_result = hook.send_teams_message(
824
team_id=team_id,
825
channel_id=general_channel['id'],
826
message=f"""
827
π **Daily Data Pipeline Completed Successfully**
828
829
**Execution Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
830
**Status**: β Success
831
**Records Processed**: 150,000
832
**Duration**: 2 hours 15 minutes
833
834
Check the dashboard for detailed results: [Dashboard Link](https://company-dashboard.com)
835
"""
836
)
837
838
print(f"Teams message sent: {message_result}")
839
return message_result
840
841
return None
842
843
dag = DAG(
844
'msgraph_advanced_workflow',
845
default_args={
846
'owner': 'integration-team',
847
'retries': 1,
848
'retry_delay': timedelta(minutes=5)
849
},
850
description='Advanced Microsoft 365 integration workflow',
851
schedule_interval=timedelta(days=1),
852
start_date=datetime(2024, 1, 1),
853
catchup=False
854
)
855
856
# Manage groups and users
857
manage_groups = PythonOperator(
858
task_id='manage_groups',
859
python_callable=manage_groups_and_users,
860
dag=dag
861
)
862
863
# Manage calendar events
864
manage_calendar = PythonOperator(
865
task_id='manage_calendar',
866
python_callable=manage_calendar_events,
867
dag=dag
868
)
869
870
# Sync SharePoint data
871
sync_sharepoint = PythonOperator(
872
task_id='sync_sharepoint',
873
python_callable=sync_sharepoint_data,
874
dag=dag
875
)
876
877
# Send Teams notifications
878
notify_teams = PythonOperator(
879
task_id='notify_teams',
880
python_callable=send_teams_notifications,
881
dag=dag
882
)
883
884
# Get Office 365 usage reports
885
get_reports = MSGraphAsyncOperator(
886
task_id='get_usage_reports',
887
conn_id='msgraph_conn',
888
url='reports/getOffice365ActiveUserDetail(period=\'D7\')',
889
method='GET',
890
dag=dag
891
)
892
893
manage_groups >> [manage_calendar, sync_sharepoint] >> notify_teams >> get_reports
894
```
895
896
### Microsoft Graph Sensor Example
897
898
```python
899
from airflow import DAG
900
from airflow.providers.microsoft.azure.sensors.msgraph import MSGraphSensor
901
from airflow.operators.python import PythonOperator
902
from datetime import datetime, timedelta
903
904
def check_file_processing_condition(response_data):
905
"""Check if file processing condition is met."""
906
files = response_data.get('value', [])
907
908
# Check if any new files have been uploaded in the last hour
909
recent_files = []
910
current_time = datetime.now()
911
912
for file in files:
913
file_modified = datetime.fromisoformat(file.get('lastModifiedDateTime', '').replace('Z', '+00:00'))
914
if (current_time - file_modified).total_seconds() < 3600: # 1 hour
915
recent_files.append(file)
916
917
print(f"Found {len(recent_files)} recent files")
918
return len(recent_files) > 0
919
920
def process_detected_files(**context):
921
"""Process files that were detected by the sensor."""
922
sensor_result = context['task_instance'].xcom_pull(task_ids='wait_for_new_files')
923
924
print(f"Sensor detected new files: {sensor_result}")
925
926
# Process the detected files
927
# Implementation would include file processing logic
928
929
return "Files processed successfully"
930
931
dag = DAG(
932
'msgraph_sensor_workflow',
933
default_args={
934
'owner': 'monitoring-team',
935
'retries': 1,
936
'retry_delay': timedelta(minutes=2)
937
},
938
description='Microsoft Graph sensor workflow',
939
schedule_interval=timedelta(minutes=30),
940
start_date=datetime(2024, 1, 1),
941
catchup=False
942
)
943
944
# Wait for new files in OneDrive
945
wait_for_files = MSGraphSensor(
946
task_id='wait_for_new_files',
947
conn_id='msgraph_conn',
948
url='me/drive/root/children',
949
method='GET',
950
query_parameters={
951
'$select': 'id,name,lastModifiedDateTime,size',
952
'$filter': "folder eq null" # Only files, not folders
953
},
954
success_condition=check_file_processing_condition,
955
timeout=1800, # 30 minutes timeout
956
poke_interval=60, # Check every minute
957
dag=dag
958
)
959
960
# Process detected files
961
process_files = PythonOperator(
962
task_id='process_files',
963
python_callable=process_detected_files,
964
dag=dag
965
)
966
967
wait_for_files >> process_files
968
```
969
970
## Connection Configuration
971
972
### Microsoft Graph Connection (`msgraph`)
973
974
Configure Microsoft Graph API connections in Airflow:
975
976
```python
977
# Connection configuration for Microsoft Graph
978
{
979
"conn_id": "msgraph_default",
980
"conn_type": "msgraph",
981
"host": "graph.microsoft.com", # Graph API endpoint
982
"extra": {
983
"tenant_id": "your-tenant-id",
984
"client_id": "your-client-id",
985
"client_secret": "your-client-secret",
986
"api_version": "v1.0" # or "beta" for preview features
987
}
988
}
989
```
990
991
### Authentication Methods
992
993
Microsoft Graph API supports multiple authentication methods:
994
995
1. **Application (Client Credentials) Authentication**:
996
```python
997
extra = {
998
"tenant_id": "your-tenant-id",
999
"client_id": "your-client-id",
1000
"client_secret": "your-client-secret",
1001
"auth_type": "client_credentials"
1002
}
1003
```
1004
1005
2. **Delegated Authentication (Authorization Code)**:
1006
```python
1007
extra = {
1008
"tenant_id": "your-tenant-id",
1009
"client_id": "your-client-id",
1010
"client_secret": "your-client-secret",
1011
"auth_type": "authorization_code",
1012
"scopes": ["https://graph.microsoft.com/.default"]
1013
}
1014
```
1015
1016
3. **Certificate-based Authentication**:
1017
```python
1018
extra = {
1019
"tenant_id": "your-tenant-id",
1020
"client_id": "your-client-id",
1021
"certificate_path": "/path/to/certificate.pem",
1022
"certificate_thumbprint": "cert-thumbprint",
1023
"auth_type": "certificate"
1024
}
1025
```
1026
1027
4. **Managed Identity Authentication**:
1028
```python
1029
extra = {
1030
"managed_identity_client_id": "your-managed-identity-client-id",
1031
"auth_type": "managed_identity"
1032
}
1033
```
1034
1035
## Error Handling
1036
1037
### Common Exception Patterns
1038
1039
```python
1040
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
1041
from kiota_abstractions.api_error import APIError
1042
1043
def robust_graph_operations():
1044
"""Demonstrate error handling patterns for Graph API operations."""
1045
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
1046
1047
try:
1048
# Attempt to get user information
1049
user_info = hook.get_user('user@company.com')
1050
print(f"User found: {user_info}")
1051
1052
except APIError as api_error:
1053
if api_error.response_status_code == 404:
1054
print("User not found")
1055
return None
1056
elif api_error.response_status_code == 403:
1057
print("Insufficient permissions to access user information")
1058
raise PermissionError("Insufficient Graph API permissions")
1059
elif api_error.response_status_code == 429:
1060
print("Rate limit exceeded, implementing retry logic")
1061
# Implement exponential backoff retry
1062
import time
1063
time.sleep(60) # Wait 1 minute before retry
1064
return hook.get_user('user@company.com')
1065
else:
1066
print(f"API error: {api_error}")
1067
raise
1068
1069
except Exception as e:
1070
print(f"Unexpected error: {e}")
1071
raise
1072
1073
try:
1074
# Batch request with error handling
1075
requests = [
1076
hook.request_information(url='users/user1@company.com'),
1077
hook.request_information(url='users/user2@company.com'),
1078
hook.request_information(url='users/nonexistent@company.com') # This will fail
1079
]
1080
1081
responses = hook.batch_request(requests)
1082
1083
for i, response in enumerate(responses):
1084
if isinstance(response, dict) and 'error' in response:
1085
print(f"Request {i} failed: {response['error']}")
1086
else:
1087
print(f"Request {i} succeeded: {response.get('displayName', 'Unknown')}")
1088
1089
except Exception as e:
1090
print(f"Batch request error: {e}")
1091
# Handle batch errors appropriately
1092
1093
def implement_retry_logic():
1094
"""Implement retry logic for Graph API operations."""
1095
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
1096
1097
import time
1098
import random
1099
1100
def retry_with_backoff(operation, max_retries=3, base_delay=1):
1101
"""Retry operation with exponential backoff."""
1102
for attempt in range(max_retries):
1103
try:
1104
return operation()
1105
except APIError as e:
1106
if e.response_status_code == 429: # Rate limit
1107
retry_after = int(e.response_headers.get('Retry-After', base_delay * (2 ** attempt)))
1108
jitter = random.uniform(0.1, 0.3) * retry_after
1109
sleep_time = retry_after + jitter
1110
1111
print(f"Rate limited, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
1112
time.sleep(sleep_time)
1113
1114
if attempt == max_retries - 1:
1115
raise
1116
else:
1117
raise
1118
except Exception as e:
1119
if attempt == max_retries - 1:
1120
raise
1121
1122
sleep_time = base_delay * (2 ** attempt) + random.uniform(0.1, 0.5)
1123
print(f"Operation failed, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")
1124
time.sleep(sleep_time)
1125
1126
# Use retry logic for operations
1127
user_data = retry_with_backoff(lambda: hook.get_user('user@company.com'))
1128
return user_data
1129
```
1130
1131
### Connection Testing
1132
1133
```python
1134
def test_graph_connection():
1135
"""Test Microsoft Graph API connection and permissions."""
1136
try:
1137
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
1138
1139
# Test basic connection
1140
success, message = hook.test_connection()
1141
if not success:
1142
print(f"Connection test failed: {message}")
1143
return False
1144
1145
print("Basic connection successful")
1146
1147
# Test specific permissions
1148
permissions_tests = {
1149
'User.Read.All': lambda: hook.list_users(top=1),
1150
'Group.Read.All': lambda: hook.list_groups(top=1),
1151
'Mail.Send': lambda: hook.get_api_version(), # Basic test for mail permissions
1152
'Sites.Read.All': lambda: hook.get_api_version() # Basic test for SharePoint permissions
1153
}
1154
1155
results = {}
1156
for permission, test_func in permissions_tests.items():
1157
try:
1158
test_func()
1159
results[permission] = True
1160
print(f"β {permission}: Available")
1161
except APIError as e:
1162
if e.response_status_code == 403:
1163
results[permission] = False
1164
print(f"β {permission}: Insufficient permissions")
1165
else:
1166
results[permission] = False
1167
print(f"β {permission}: Error - {e}")
1168
except Exception as e:
1169
results[permission] = False
1170
print(f"β {permission}: Unexpected error - {e}")
1171
1172
return all(results.values())
1173
1174
except Exception as e:
1175
print(f"Connection test failed with error: {e}")
1176
return False
1177
```
1178
1179
## Performance Considerations
1180
1181
### Optimizing Graph API Operations
1182
1183
```python
1184
def optimize_graph_operations():
1185
"""Demonstrate Graph API optimization techniques."""
1186
hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')
1187
1188
# Use $select to only retrieve needed properties
1189
efficient_user_query = hook.list_users(
1190
select_properties=['id', 'displayName', 'userPrincipalName'], # Only get what you need
1191
top=100 # Limit results appropriately
1192
)
1193
1194
# Use $filter to reduce data transfer
1195
filtered_users = hook.list_users(
1196
filter_expression="department eq 'Engineering'",
1197
select_properties=['id', 'displayName', 'mail'],
1198
top=50
1199
)
1200
1201
# Use batch requests for multiple operations
1202
batch_requests = []
1203
user_ids = ['user1@company.com', 'user2@company.com', 'user3@company.com']
1204
1205
for user_id in user_ids:
1206
batch_requests.append(
1207
hook.request_information(
1208
url=f'users/{user_id}',
1209
query_parameters={'$select': 'id,displayName,mail'}
1210
)
1211
)
1212
1213
# Execute all requests in a single batch
1214
batch_results = hook.batch_request(batch_requests, max_batch_size=20)
1215
1216
return {
1217
'efficient_query_count': len(efficient_user_query),
1218
'filtered_users_count': len(filtered_users),
1219
'batch_results_count': len(batch_results)
1220
}
1221
1222
def implement_caching_strategy():
1223
"""Implement caching for frequently accessed Graph data."""
1224
from functools import lru_cache
1225
import time
1226
1227
class CachedGraphHook:
1228
def __init__(self, conn_id):
1229
self.hook = KiotaRequestAdapterHook(conn_id=conn_id)
1230
self._cache_timestamp = {}
1231
self._cache_ttl = 3600 # 1 hour TTL
1232
1233
def _is_cache_valid(self, key):
1234
"""Check if cached data is still valid."""
1235
if key not in self._cache_timestamp:
1236
return False
1237
return (time.time() - self._cache_timestamp[key]) < self._cache_ttl
1238
1239
@lru_cache(maxsize=100)
1240
def get_user_cached(self, user_id):
1241
"""Get user with caching."""
1242
cache_key = f"user_{user_id}"
1243
if self._is_cache_valid(cache_key):
1244
# Return from LRU cache
1245
pass
1246
else:
1247
# Update cache timestamp
1248
self._cache_timestamp[cache_key] = time.time()
1249
1250
return self.hook.get_user(user_id, select_properties=['id', 'displayName', 'mail'])
1251
1252
def invalidate_user_cache(self, user_id):
1253
"""Invalidate user cache."""
1254
cache_key = f"user_{user_id}"
1255
if cache_key in self._cache_timestamp:
1256
del self._cache_timestamp[cache_key]
1257
1258
# Clear from LRU cache
1259
self.get_user_cached.cache_clear()
1260
1261
return CachedGraphHook
1262
```
1263
1264
This comprehensive documentation covers all Microsoft Graph API capabilities in the Apache Airflow Microsoft Azure Provider, including authentication methods, API operations, sensor monitoring, trigger-based operations, and performance optimization techniques for Microsoft 365 integration.