or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

microsoft-graph.mddocs/

0

# Microsoft Graph API

1

2

Access Microsoft Graph API for Microsoft 365 services integration with comprehensive support for Graph API endpoints, operations, and asynchronous processing capabilities for Microsoft cloud services.

3

4

## Capabilities

5

6

### Microsoft Graph Request Adapter Hook

7

8

Primary interface for Microsoft Graph API operations using Kiota request adapter for modern Graph API access patterns.

9

10

```python { .api }

11

class KiotaRequestAdapterHook(BaseHook):

12

"""

13

Hook for Microsoft Graph API using Kiota request adapter.

14

15

Provides methods for accessing Microsoft Graph API endpoints with

16

modern authentication and request handling capabilities.

17

"""

18

19

def get_conn(self) -> RequestAdapter:

20

"""

21

Get authenticated Graph API request adapter.

22

23

Returns:

24

RequestAdapter: Kiota request adapter for Graph API calls

25

"""

26

27

def test_connection(self) -> tuple[bool, str]:

28

"""

29

Test the Microsoft Graph API connection.

30

31

Returns:

32

tuple[bool, str]: Success status and message

33

"""

34

35

def request_information(

36

self,

37

url: str,

38

method: str = "GET",

39

headers: dict[str, str] | None = None,

40

query_parameters: dict[str, Any] | None = None,

41

content: bytes | None = None

42

) -> RequestInformation:

43

"""

44

Create request information for Graph API calls.

45

46

Args:

47

url (str): Graph API endpoint URL

48

method (str): HTTP method (default: "GET")

49

headers (dict[str, str] | None): Additional headers

50

query_parameters (dict[str, Any] | None): Query parameters

51

content (bytes | None): Request body content

52

53

Returns:

54

RequestInformation: Configured request information object

55

"""

56

57

def get_api_version(self) -> str:

58

"""

59

Get the Microsoft Graph API version being used.

60

61

Returns:

62

str: Graph API version (e.g., "v1.0", "beta")

63

"""

64

65

def get_base_url(self) -> str:

66

"""

67

Get the base URL for Microsoft Graph API.

68

69

Returns:

70

str: Base URL for Graph API endpoints

71

"""

72

73

def send_request(

74

self,

75

request_info: RequestInformation,

76

response_handler: ResponseHandler | None = None

77

) -> Any:

78

"""

79

Send a request to Microsoft Graph API.

80

81

Args:

82

request_info (RequestInformation): Request configuration

83

response_handler (ResponseHandler | None): Custom response handler

84

85

Returns:

86

Any: Response from Graph API

87

"""

88

89

def batch_request(

90

self,

91

requests: list[RequestInformation],

92

max_batch_size: int = 20

93

) -> list[Any]:

94

"""

95

Send multiple requests in batches to Graph API.

96

97

Args:

98

requests (list[RequestInformation]): List of request configurations

99

max_batch_size (int): Maximum requests per batch (default: 20)

100

101

Returns:

102

list[Any]: List of responses from Graph API

103

"""

104

105

def get_user(

106

self,

107

user_id: str,

108

select_properties: list[str] | None = None

109

) -> dict[str, Any]:

110

"""

111

Get user information from Microsoft Graph.

112

113

Args:

114

user_id (str): User ID or principal name

115

select_properties (list[str] | None): Properties to select

116

117

Returns:

118

dict[str, Any]: User information

119

"""

120

121

def list_users(

122

self,

123

filter_expression: str | None = None,

124

select_properties: list[str] | None = None,

125

top: int = 100

126

) -> list[dict[str, Any]]:

127

"""

128

List users from Microsoft Graph.

129

130

Args:

131

filter_expression (str | None): OData filter expression

132

select_properties (list[str] | None): Properties to select

133

top (int): Maximum number of results (default: 100)

134

135

Returns:

136

list[dict[str, Any]]: List of user information

137

"""

138

139

def get_group(

140

self,

141

group_id: str,

142

select_properties: list[str] | None = None

143

) -> dict[str, Any]:

144

"""

145

Get group information from Microsoft Graph.

146

147

Args:

148

group_id (str): Group ID

149

select_properties (list[str] | None): Properties to select

150

151

Returns:

152

dict[str, Any]: Group information

153

"""

154

155

def list_groups(

156

self,

157

filter_expression: str | None = None,

158

select_properties: list[str] | None = None,

159

top: int = 100

160

) -> list[dict[str, Any]]:

161

"""

162

List groups from Microsoft Graph.

163

164

Args:

165

filter_expression (str | None): OData filter expression

166

select_properties (list[str] | None): Properties to select

167

top (int): Maximum number of results (default: 100)

168

169

Returns:

170

list[dict[str, Any]]: List of group information

171

"""

172

173

def send_email(

174

self,

175

user_id: str,

176

subject: str,

177

body: str,

178

to_recipients: list[str],

179

cc_recipients: list[str] | None = None,

180

bcc_recipients: list[str] | None = None,

181

attachments: list[dict[str, Any]] | None = None

182

) -> dict[str, Any]:

183

"""

184

Send email through Microsoft Graph.

185

186

Args:

187

user_id (str): Sender user ID

188

subject (str): Email subject

189

body (str): Email body content

190

to_recipients (list[str]): To recipients email addresses

191

cc_recipients (list[str] | None): CC recipients email addresses

192

bcc_recipients (list[str] | None): BCC recipients email addresses

193

attachments (list[dict[str, Any]] | None): Email attachments

194

195

Returns:

196

dict[str, Any]: Email send response

197

"""

198

199

def create_calendar_event(

200

self,

201

user_id: str,

202

subject: str,

203

start_time: datetime,

204

end_time: datetime,

205

attendees: list[str] | None = None,

206

body: str | None = None,

207

location: str | None = None

208

) -> dict[str, Any]:

209

"""

210

Create calendar event through Microsoft Graph.

211

212

Args:

213

user_id (str): User ID to create event for

214

subject (str): Event subject

215

start_time (datetime): Event start time

216

end_time (datetime): Event end time

217

attendees (list[str] | None): Attendee email addresses

218

body (str | None): Event description

219

location (str | None): Event location

220

221

Returns:

222

dict[str, Any]: Created event information

223

"""

224

225

def upload_file_to_onedrive(

226

self,

227

user_id: str,

228

file_path: str,

229

content: bytes,

230

conflict_behavior: str = "rename"

231

) -> dict[str, Any]:

232

"""

233

Upload file to user's OneDrive through Microsoft Graph.

234

235

Args:

236

user_id (str): User ID

237

file_path (str): Path where to store the file in OneDrive

238

content (bytes): File content

239

conflict_behavior (str): Conflict resolution behavior (default: "rename")

240

241

Returns:

242

dict[str, Any]: Upload response with file information

243

"""

244

245

def get_sharepoint_site(

246

self,

247

site_id: str,

248

select_properties: list[str] | None = None

249

) -> dict[str, Any]:

250

"""

251

Get SharePoint site information.

252

253

Args:

254

site_id (str): SharePoint site ID

255

select_properties (list[str] | None): Properties to select

256

257

Returns:

258

dict[str, Any]: SharePoint site information

259

"""

260

261

def list_sharepoint_lists(

262

self,

263

site_id: str,

264

select_properties: list[str] | None = None

265

) -> list[dict[str, Any]]:

266

"""

267

List SharePoint lists in a site.

268

269

Args:

270

site_id (str): SharePoint site ID

271

select_properties (list[str] | None): Properties to select

272

273

Returns:

274

list[dict[str, Any]]: List of SharePoint lists

275

"""

276

277

def create_sharepoint_list_item(

278

self,

279

site_id: str,

280

list_id: str,

281

item_data: dict[str, Any]

282

) -> dict[str, Any]:

283

"""

284

Create item in SharePoint list.

285

286

Args:

287

site_id (str): SharePoint site ID

288

list_id (str): SharePoint list ID

289

item_data (dict[str, Any]): Item data to create

290

291

Returns:

292

dict[str, Any]: Created item information

293

"""

294

295

def get_teams_channels(

296

self,

297

team_id: str,

298

select_properties: list[str] | None = None

299

) -> list[dict[str, Any]]:

300

"""

301

Get channels in a Microsoft Teams team.

302

303

Args:

304

team_id (str): Teams team ID

305

select_properties (list[str] | None): Properties to select

306

307

Returns:

308

list[dict[str, Any]]: List of team channels

309

"""

310

311

def send_teams_message(

312

self,

313

team_id: str,

314

channel_id: str,

315

message: str,

316

message_type: str = "message"

317

) -> dict[str, Any]:

318

"""

319

Send message to Microsoft Teams channel.

320

321

Args:

322

team_id (str): Teams team ID

323

channel_id (str): Channel ID

324

message (str): Message content

325

message_type (str): Message type (default: "message")

326

327

Returns:

328

dict[str, Any]: Message send response

329

"""

330

```

331

332

### Response Handler

333

334

Supporting class for handling Microsoft Graph API responses with proper serialization and error handling.

335

336

```python { .api }

337

class DefaultResponseHandler(ResponseHandler):

338

"""

339

Default response handler for Microsoft Graph API.

340

341

Provides standard response handling, error processing,

342

and data serialization for Graph API calls.

343

"""

344

345

def handle_response_async(

346

self,

347

response: Any,

348

error_map: dict[str, type] | None = None

349

) -> Any:

350

"""

351

Handle asynchronous response from Graph API.

352

353

Args:

354

response (Any): HTTP response from Graph API

355

error_map (dict[str, type] | None): Error mapping configuration

356

357

Returns:

358

Any: Processed response data

359

"""

360

361

def handle_error_response(

362

self,

363

response: Any

364

) -> Exception:

365

"""

366

Handle error responses from Graph API.

367

368

Args:

369

response (Any): Error response from Graph API

370

371

Returns:

372

Exception: Appropriate exception for the error

373

"""

374

375

def serialize_response(

376

self,

377

response_data: Any

378

) -> dict[str, Any]:

379

"""

380

Serialize response data from Graph API.

381

382

Args:

383

response_data (Any): Raw response data

384

385

Returns:

386

dict[str, Any]: Serialized response data

387

"""

388

```

389

390

## Microsoft Graph Operators

391

392

Execute Microsoft Graph API operations as Airflow tasks with comprehensive Microsoft 365 service integration.

393

394

### Asynchronous Graph API Operator

395

396

```python { .api }

397

class MSGraphAsyncOperator(BaseOperator):

398

"""

399

Executes Microsoft Graph API operations asynchronously.

400

401

Supports various Graph API operations with deferrable execution

402

for long-running Microsoft 365 operations.

403

"""

404

405

def __init__(

406

self,

407

*,

408

conn_id: str = "msgraph_default",

409

url: str,

410

method: str = "GET",

411

query_parameters: dict[str, Any] | None = None,

412

headers: dict[str, str] | None = None,

413

data: dict[str, Any] | str | None = None,

414

response_filter: str | None = None,

415

response_type: type | None = None,

416

**kwargs

417

):

418

"""

419

Initialize Microsoft Graph async operator.

420

421

Args:

422

conn_id (str): Airflow connection ID for Microsoft Graph

423

url (str): Graph API endpoint URL

424

method (str): HTTP method (default: "GET")

425

query_parameters (dict[str, Any] | None): Query parameters

426

headers (dict[str, str] | None): Additional headers

427

data (dict[str, Any] | str | None): Request body data

428

response_filter (str | None): Response filtering expression

429

response_type (type | None): Expected response type

430

"""

431

432

def execute(self, context: Context) -> Any:

433

"""

434

Execute Microsoft Graph API operation.

435

436

Args:

437

context (Context): Airflow task context

438

439

Returns:

440

Any: Response from Graph API operation

441

"""

442

443

def execute_defer(self, context: Context) -> None:

444

"""

445

Execute operation in deferrable mode for long-running operations.

446

447

Args:

448

context (Context): Airflow task context

449

"""

450

```

451

452

## Microsoft Graph Sensors

453

454

Monitor Microsoft 365 resources and wait for specific conditions using Microsoft Graph API.

455

456

### Graph API Sensor

457

458

```python { .api }

459

class MSGraphSensor(BaseSensorOperator):

460

"""

461

Monitors Microsoft Graph API resources.

462

463

Provides sensor capabilities for waiting on Microsoft 365 resource

464

states and conditions through Graph API polling.

465

"""

466

467

def __init__(

468

self,

469

*,

470

conn_id: str = "msgraph_default",

471

url: str,

472

method: str = "GET",

473

query_parameters: dict[str, Any] | None = None,

474

headers: dict[str, str] | None = None,

475

response_filter: str | None = None,

476

success_condition: callable | None = None,

477

**kwargs

478

):

479

"""

480

Initialize Microsoft Graph sensor.

481

482

Args:

483

conn_id (str): Airflow connection ID for Microsoft Graph

484

url (str): Graph API endpoint URL to monitor

485

method (str): HTTP method (default: "GET")

486

query_parameters (dict[str, Any] | None): Query parameters

487

headers (dict[str, str] | None): Additional headers

488

response_filter (str | None): Response filtering expression

489

success_condition (callable | None): Function to evaluate success condition

490

"""

491

492

def poke(self, context: Context) -> bool:

493

"""

494

Poke Microsoft Graph API resource for condition.

495

496

Args:

497

context (Context): Airflow task context

498

499

Returns:

500

bool: True if condition is met, False otherwise

501

"""

502

```

503

504

## Microsoft Graph Triggers

505

506

Enable asynchronous monitoring and event-driven operations for Microsoft 365 services.

507

508

### Graph API Trigger

509

510

```python { .api }

511

class MSGraphTrigger(BaseTrigger):

512

"""

513

Async trigger for Microsoft Graph API operations.

514

515

Provides asynchronous monitoring and event handling for

516

Microsoft 365 resources through Graph API.

517

"""

518

519

def __init__(

520

self,

521

conn_id: str,

522

url: str,

523

method: str = "GET",

524

query_parameters: dict[str, Any] | None = None,

525

headers: dict[str, str] | None = None,

526

response_filter: str | None = None,

527

timeout: int = 3600,

528

check_interval: int = 60

529

):

530

"""

531

Initialize Microsoft Graph trigger.

532

533

Args:

534

conn_id (str): Airflow connection ID for Microsoft Graph

535

url (str): Graph API endpoint URL

536

method (str): HTTP method (default: "GET")

537

query_parameters (dict[str, Any] | None): Query parameters

538

headers (dict[str, str] | None): Additional headers

539

response_filter (str | None): Response filtering expression

540

timeout (int): Timeout in seconds (default: 3600)

541

check_interval (int): Check interval in seconds (default: 60)

542

"""

543

544

def run(self) -> AsyncIterator[TriggerEvent]:

545

"""

546

Run asynchronous monitoring of Graph API resource.

547

548

Yields:

549

TriggerEvent: Events when conditions are met or timeout occurs

550

"""

551

552

def serialize(self) -> tuple[str, dict[str, Any]]:

553

"""

554

Serialize trigger configuration for persistence.

555

556

Returns:

557

tuple[str, dict[str, Any]]: Serialized trigger data

558

"""

559

```

560

561

### Response Serializer

562

563

Supporting class for serializing Microsoft Graph API responses in trigger operations.

564

565

```python { .api }

566

class ResponseSerializer:

567

"""

568

Serializer for Graph API responses.

569

570

Provides serialization capabilities for Graph API response data

571

in trigger and asynchronous operations.

572

"""

573

574

@staticmethod

575

def serialize_response(response: Any) -> dict[str, Any]:

576

"""

577

Serialize Graph API response for storage or transmission.

578

579

Args:

580

response (Any): Graph API response object

581

582

Returns:

583

dict[str, Any]: Serialized response data

584

"""

585

586

@staticmethod

587

def deserialize_response(data: dict[str, Any]) -> Any:

588

"""

589

Deserialize Graph API response from stored data.

590

591

Args:

592

data (dict[str, Any]): Serialized response data

593

594

Returns:

595

Any: Deserialized response object

596

"""

597

```

598

599

## Usage Examples

600

601

### Basic Microsoft Graph Operations

602

603

```python

604

from airflow import DAG

605

from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator

606

from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook

607

from airflow.operators.python import PythonOperator

608

from datetime import datetime, timedelta

609

610

def process_user_data(**context):

611

"""Process user data retrieved from Microsoft Graph."""

612

users_data = context['task_instance'].xcom_pull(task_ids='get_users')

613

614

print(f"Retrieved {len(users_data.get('value', []))} users")

615

616

for user in users_data.get('value', []):

617

print(f"User: {user.get('displayName')} ({user.get('userPrincipalName')})")

618

619

return len(users_data.get('value', []))

620

621

def send_notification_email():

622

"""Send notification email using Microsoft Graph."""

623

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

624

625

# Send email notification

626

result = hook.send_email(

627

user_id='admin@company.com',

628

subject='Airflow Workflow Notification',

629

body='Your daily data processing workflow has completed successfully.',

630

to_recipients=['team@company.com', 'manager@company.com'],

631

cc_recipients=['notifications@company.com']

632

)

633

634

print(f"Email sent successfully: {result}")

635

return result

636

637

dag = DAG(

638

'msgraph_basic_workflow',

639

default_args={

640

'owner': 'integration-team',

641

'retries': 2,

642

'retry_delay': timedelta(minutes=3)

643

},

644

description='Basic Microsoft Graph API workflow',

645

schedule_interval=timedelta(hours=12),

646

start_date=datetime(2024, 1, 1),

647

catchup=False

648

)

649

650

# Get list of users

651

get_users = MSGraphAsyncOperator(

652

task_id='get_users',

653

conn_id='msgraph_conn',

654

url='users',

655

method='GET',

656

query_parameters={

657

'$select': 'id,displayName,userPrincipalName,mail',

658

'$filter': "accountEnabled eq true",

659

'$top': 50

660

},

661

dag=dag

662

)

663

664

# Process user data

665

process_users = PythonOperator(

666

task_id='process_users',

667

python_callable=process_user_data,

668

dag=dag

669

)

670

671

# Send notification

672

send_notification = PythonOperator(

673

task_id='send_notification',

674

python_callable=send_notification_email,

675

dag=dag

676

)

677

678

get_users >> process_users >> send_notification

679

```

680

681

### Advanced Microsoft 365 Integration

682

683

```python

684

from airflow import DAG

685

from airflow.providers.microsoft.azure.operators.msgraph import MSGraphAsyncOperator

686

from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook

687

from airflow.operators.python import PythonOperator

688

from datetime import datetime, timedelta

689

import json

690

691

def manage_groups_and_users():

692

"""Manage Microsoft 365 groups and user memberships."""

693

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

694

695

# Get all security groups

696

groups = hook.list_groups(

697

filter_expression="groupTypes/any(c:c eq 'Unified') or securityEnabled eq true",

698

select_properties=['id', 'displayName', 'groupTypes', 'securityEnabled'],

699

top=100

700

)

701

702

print(f"Found {len(groups)} groups")

703

704

# Check membership for specific groups

705

critical_groups = ['Data-Scientists', 'Security-Admins', 'Project-Managers']

706

707

group_analysis = {}

708

709

for group in groups:

710

if group['displayName'] in critical_groups:

711

# Get group members

712

members_url = f"groups/{group['id']}/members"

713

members_response = hook.send_request(

714

hook.request_information(

715

url=members_url,

716

query_parameters={'$select': 'id,displayName,userPrincipalName'}

717

)

718

)

719

720

group_analysis[group['displayName']] = {

721

'id': group['id'],

722

'member_count': len(members_response.get('value', [])),

723

'members': members_response.get('value', [])

724

}

725

726

return group_analysis

727

728

def manage_calendar_events():

729

"""Manage calendar events for team coordination."""

730

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

731

732

# Create recurring team meeting

733

team_meeting = hook.create_calendar_event(

734

user_id='teamlead@company.com',

735

subject='Weekly Data Pipeline Review',

736

start_time=datetime.now().replace(hour=10, minute=0, second=0, microsecond=0),

737

end_time=datetime.now().replace(hour=11, minute=0, second=0, microsecond=0),

738

attendees=[

739

'dataengineer1@company.com',

740

'dataengineer2@company.com',

741

'analyst@company.com'

742

],

743

body='Weekly review of data pipeline status, issues, and improvements.',

744

location='Conference Room A / Teams'

745

)

746

747

print(f"Team meeting created: {team_meeting}")

748

749

# Create workflow completion notification event

750

notification_event = hook.create_calendar_event(

751

user_id='admin@company.com',

752

subject='Data Processing Workflow Completed',

753

start_time=datetime.now(),

754

end_time=datetime.now() + timedelta(minutes=30),

755

body='Daily data processing workflow has completed. Check results in dashboard.'

756

)

757

758

return {

759

'team_meeting': team_meeting,

760

'notification_event': notification_event

761

}

762

763

def sync_sharepoint_data():

764

"""Sync data with SharePoint lists."""

765

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

766

767

site_id = 'company.sharepoint.com,site-id,web-id'

768

769

# Get SharePoint lists

770

lists = hook.list_sharepoint_lists(

771

site_id=site_id,

772

select_properties=['id', 'displayName', 'list']

773

)

774

775

# Find the data tracking list

776

data_list = None

777

for sp_list in lists:

778

if sp_list['displayName'] == 'Data Processing Status':

779

data_list = sp_list

780

break

781

782

if data_list:

783

# Create status entry

784

status_item = hook.create_sharepoint_list_item(

785

site_id=site_id,

786

list_id=data_list['id'],

787

item_data={

788

'Title': f'Pipeline Run {datetime.now().strftime("%Y-%m-%d %H:%M")}',

789

'Status': 'Completed',

790

'ProcessedRecords': 150000,

791

'StartTime': datetime.now().isoformat(),

792

'EndTime': (datetime.now() + timedelta(hours=2)).isoformat()

793

}

794

)

795

796

print(f"Status item created: {status_item}")

797

return status_item

798

else:

799

print("Data Processing Status list not found")

800

return None

801

802

def send_teams_notifications():

803

"""Send notifications to Microsoft Teams channels."""

804

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

805

806

team_id = 'data-engineering-team-id'

807

808

# Get team channels

809

channels = hook.get_teams_channels(

810

team_id=team_id,

811

select_properties=['id', 'displayName']

812

)

813

814

# Find general channel

815

general_channel = None

816

for channel in channels:

817

if channel['displayName'].lower() == 'general':

818

general_channel = channel

819

break

820

821

if general_channel:

822

# Send completion notification

823

message_result = hook.send_teams_message(

824

team_id=team_id,

825

channel_id=general_channel['id'],

826

message=f"""

827

πŸŽ‰ **Daily Data Pipeline Completed Successfully**

828

829

**Execution Time**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

830

**Status**: βœ… Success

831

**Records Processed**: 150,000

832

**Duration**: 2 hours 15 minutes

833

834

Check the dashboard for detailed results: [Dashboard Link](https://company-dashboard.com)

835

"""

836

)

837

838

print(f"Teams message sent: {message_result}")

839

return message_result

840

841

return None

842

843

dag = DAG(

844

'msgraph_advanced_workflow',

845

default_args={

846

'owner': 'integration-team',

847

'retries': 1,

848

'retry_delay': timedelta(minutes=5)

849

},

850

description='Advanced Microsoft 365 integration workflow',

851

schedule_interval=timedelta(days=1),

852

start_date=datetime(2024, 1, 1),

853

catchup=False

854

)

855

856

# Manage groups and users

857

manage_groups = PythonOperator(

858

task_id='manage_groups',

859

python_callable=manage_groups_and_users,

860

dag=dag

861

)

862

863

# Manage calendar events

864

manage_calendar = PythonOperator(

865

task_id='manage_calendar',

866

python_callable=manage_calendar_events,

867

dag=dag

868

)

869

870

# Sync SharePoint data

871

sync_sharepoint = PythonOperator(

872

task_id='sync_sharepoint',

873

python_callable=sync_sharepoint_data,

874

dag=dag

875

)

876

877

# Send Teams notifications

878

notify_teams = PythonOperator(

879

task_id='notify_teams',

880

python_callable=send_teams_notifications,

881

dag=dag

882

)

883

884

# Get Office 365 usage reports

885

get_reports = MSGraphAsyncOperator(

886

task_id='get_usage_reports',

887

conn_id='msgraph_conn',

888

url='reports/getOffice365ActiveUserDetail(period=\'D7\')',

889

method='GET',

890

dag=dag

891

)

892

893

manage_groups >> [manage_calendar, sync_sharepoint] >> notify_teams >> get_reports

894

```

895

896

### Microsoft Graph Sensor Example

897

898

```python

899

from airflow import DAG

900

from airflow.providers.microsoft.azure.sensors.msgraph import MSGraphSensor

901

from airflow.operators.python import PythonOperator

902

from datetime import datetime, timedelta

903

904

def check_file_processing_condition(response_data):

905

"""Check if file processing condition is met."""

906

files = response_data.get('value', [])

907

908

# Check if any new files have been uploaded in the last hour

909

recent_files = []

910

current_time = datetime.now()

911

912

for file in files:

913

file_modified = datetime.fromisoformat(file.get('lastModifiedDateTime', '').replace('Z', '+00:00'))

914

if (current_time - file_modified).total_seconds() < 3600: # 1 hour

915

recent_files.append(file)

916

917

print(f"Found {len(recent_files)} recent files")

918

return len(recent_files) > 0

919

920

def process_detected_files(**context):

921

"""Process files that were detected by the sensor."""

922

sensor_result = context['task_instance'].xcom_pull(task_ids='wait_for_new_files')

923

924

print(f"Sensor detected new files: {sensor_result}")

925

926

# Process the detected files

927

# Implementation would include file processing logic

928

929

return "Files processed successfully"

930

931

dag = DAG(

932

'msgraph_sensor_workflow',

933

default_args={

934

'owner': 'monitoring-team',

935

'retries': 1,

936

'retry_delay': timedelta(minutes=2)

937

},

938

description='Microsoft Graph sensor workflow',

939

schedule_interval=timedelta(minutes=30),

940

start_date=datetime(2024, 1, 1),

941

catchup=False

942

)

943

944

# Wait for new files in OneDrive

945

wait_for_files = MSGraphSensor(

946

task_id='wait_for_new_files',

947

conn_id='msgraph_conn',

948

url='me/drive/root/children',

949

method='GET',

950

query_parameters={

951

'$select': 'id,name,lastModifiedDateTime,size',

952

'$filter': "folder eq null" # Only files, not folders

953

},

954

success_condition=check_file_processing_condition,

955

timeout=1800, # 30 minutes timeout

956

poke_interval=60, # Check every minute

957

dag=dag

958

)

959

960

# Process detected files

961

process_files = PythonOperator(

962

task_id='process_files',

963

python_callable=process_detected_files,

964

dag=dag

965

)

966

967

wait_for_files >> process_files

968

```

969

970

## Connection Configuration

971

972

### Microsoft Graph Connection (`msgraph`)

973

974

Configure Microsoft Graph API connections in Airflow:

975

976

```python

977

# Connection configuration for Microsoft Graph

978

{

979

"conn_id": "msgraph_default",

980

"conn_type": "msgraph",

981

"host": "graph.microsoft.com", # Graph API endpoint

982

"extra": {

983

"tenant_id": "your-tenant-id",

984

"client_id": "your-client-id",

985

"client_secret": "your-client-secret",

986

"api_version": "v1.0" # or "beta" for preview features

987

}

988

}

989

```

990

991

### Authentication Methods

992

993

Microsoft Graph API supports multiple authentication methods:

994

995

1. **Application (Client Credentials) Authentication**:

996

```python

997

extra = {

998

"tenant_id": "your-tenant-id",

999

"client_id": "your-client-id",

1000

"client_secret": "your-client-secret",

1001

"auth_type": "client_credentials"

1002

}

1003

```

1004

1005

2. **Delegated Authentication (Authorization Code)**:

1006

```python

1007

extra = {

1008

"tenant_id": "your-tenant-id",

1009

"client_id": "your-client-id",

1010

"client_secret": "your-client-secret",

1011

"auth_type": "authorization_code",

1012

"scopes": ["https://graph.microsoft.com/.default"]

1013

}

1014

```

1015

1016

3. **Certificate-based Authentication**:

1017

```python

1018

extra = {

1019

"tenant_id": "your-tenant-id",

1020

"client_id": "your-client-id",

1021

"certificate_path": "/path/to/certificate.pem",

1022

"certificate_thumbprint": "cert-thumbprint",

1023

"auth_type": "certificate"

1024

}

1025

```

1026

1027

4. **Managed Identity Authentication**:

1028

```python

1029

extra = {

1030

"managed_identity_client_id": "your-managed-identity-client-id",

1031

"auth_type": "managed_identity"

1032

}

1033

```

1034

1035

## Error Handling

1036

1037

### Common Exception Patterns

1038

1039

```python

1040

from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook

1041

from kiota_abstractions.api_error import APIError

1042

1043

def robust_graph_operations():

1044

"""Demonstrate error handling patterns for Graph API operations."""

1045

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

1046

1047

try:

1048

# Attempt to get user information

1049

user_info = hook.get_user('user@company.com')

1050

print(f"User found: {user_info}")

1051

1052

except APIError as api_error:

1053

if api_error.response_status_code == 404:

1054

print("User not found")

1055

return None

1056

elif api_error.response_status_code == 403:

1057

print("Insufficient permissions to access user information")

1058

raise PermissionError("Insufficient Graph API permissions")

1059

elif api_error.response_status_code == 429:

1060

print("Rate limit exceeded, implementing retry logic")

1061

# Implement exponential backoff retry

1062

import time

1063

time.sleep(60) # Wait 1 minute before retry

1064

return hook.get_user('user@company.com')

1065

else:

1066

print(f"API error: {api_error}")

1067

raise

1068

1069

except Exception as e:

1070

print(f"Unexpected error: {e}")

1071

raise

1072

1073

try:

1074

# Batch request with error handling

1075

requests = [

1076

hook.request_information(url='users/user1@company.com'),

1077

hook.request_information(url='users/user2@company.com'),

1078

hook.request_information(url='users/nonexistent@company.com') # This will fail

1079

]

1080

1081

responses = hook.batch_request(requests)

1082

1083

for i, response in enumerate(responses):

1084

if isinstance(response, dict) and 'error' in response:

1085

print(f"Request {i} failed: {response['error']}")

1086

else:

1087

print(f"Request {i} succeeded: {response.get('displayName', 'Unknown')}")

1088

1089

except Exception as e:

1090

print(f"Batch request error: {e}")

1091

# Handle batch errors appropriately

1092

1093

def implement_retry_logic():

1094

"""Implement retry logic for Graph API operations."""

1095

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

1096

1097

import time

1098

import random

1099

1100

def retry_with_backoff(operation, max_retries=3, base_delay=1):

1101

"""Retry operation with exponential backoff."""

1102

for attempt in range(max_retries):

1103

try:

1104

return operation()

1105

except APIError as e:

1106

if e.response_status_code == 429: # Rate limit

1107

retry_after = int(e.response_headers.get('Retry-After', base_delay * (2 ** attempt)))

1108

jitter = random.uniform(0.1, 0.3) * retry_after

1109

sleep_time = retry_after + jitter

1110

1111

print(f"Rate limited, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")

1112

time.sleep(sleep_time)

1113

1114

if attempt == max_retries - 1:

1115

raise

1116

else:

1117

raise

1118

except Exception as e:

1119

if attempt == max_retries - 1:

1120

raise

1121

1122

sleep_time = base_delay * (2 ** attempt) + random.uniform(0.1, 0.5)

1123

print(f"Operation failed, retrying in {sleep_time:.2f} seconds (attempt {attempt + 1})")

1124

time.sleep(sleep_time)

1125

1126

# Use retry logic for operations

1127

user_data = retry_with_backoff(lambda: hook.get_user('user@company.com'))

1128

return user_data

1129

```

1130

1131

### Connection Testing

1132

1133

```python

1134

def test_graph_connection():

1135

"""Test Microsoft Graph API connection and permissions."""

1136

try:

1137

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

1138

1139

# Test basic connection

1140

success, message = hook.test_connection()

1141

if not success:

1142

print(f"Connection test failed: {message}")

1143

return False

1144

1145

print("Basic connection successful")

1146

1147

# Test specific permissions

1148

permissions_tests = {

1149

'User.Read.All': lambda: hook.list_users(top=1),

1150

'Group.Read.All': lambda: hook.list_groups(top=1),

1151

'Mail.Send': lambda: hook.get_api_version(), # Basic test for mail permissions

1152

'Sites.Read.All': lambda: hook.get_api_version() # Basic test for SharePoint permissions

1153

}

1154

1155

results = {}

1156

for permission, test_func in permissions_tests.items():

1157

try:

1158

test_func()

1159

results[permission] = True

1160

print(f"βœ“ {permission}: Available")

1161

except APIError as e:

1162

if e.response_status_code == 403:

1163

results[permission] = False

1164

print(f"βœ— {permission}: Insufficient permissions")

1165

else:

1166

results[permission] = False

1167

print(f"βœ— {permission}: Error - {e}")

1168

except Exception as e:

1169

results[permission] = False

1170

print(f"βœ— {permission}: Unexpected error - {e}")

1171

1172

return all(results.values())

1173

1174

except Exception as e:

1175

print(f"Connection test failed with error: {e}")

1176

return False

1177

```

1178

1179

## Performance Considerations

1180

1181

### Optimizing Graph API Operations

1182

1183

```python

1184

def optimize_graph_operations():

1185

"""Demonstrate Graph API optimization techniques."""

1186

hook = KiotaRequestAdapterHook(conn_id='msgraph_conn')

1187

1188

# Use $select to only retrieve needed properties

1189

efficient_user_query = hook.list_users(

1190

select_properties=['id', 'displayName', 'userPrincipalName'], # Only get what you need

1191

top=100 # Limit results appropriately

1192

)

1193

1194

# Use $filter to reduce data transfer

1195

filtered_users = hook.list_users(

1196

filter_expression="department eq 'Engineering'",

1197

select_properties=['id', 'displayName', 'mail'],

1198

top=50

1199

)

1200

1201

# Use batch requests for multiple operations

1202

batch_requests = []

1203

user_ids = ['user1@company.com', 'user2@company.com', 'user3@company.com']

1204

1205

for user_id in user_ids:

1206

batch_requests.append(

1207

hook.request_information(

1208

url=f'users/{user_id}',

1209

query_parameters={'$select': 'id,displayName,mail'}

1210

)

1211

)

1212

1213

# Execute all requests in a single batch

1214

batch_results = hook.batch_request(batch_requests, max_batch_size=20)

1215

1216

return {

1217

'efficient_query_count': len(efficient_user_query),

1218

'filtered_users_count': len(filtered_users),

1219

'batch_results_count': len(batch_results)

1220

}

1221

1222

def implement_caching_strategy():

1223

"""Implement caching for frequently accessed Graph data."""

1224

from functools import lru_cache

1225

import time

1226

1227

class CachedGraphHook:

1228

def __init__(self, conn_id):

1229

self.hook = KiotaRequestAdapterHook(conn_id=conn_id)

1230

self._cache_timestamp = {}

1231

self._cache_ttl = 3600 # 1 hour TTL

1232

1233

def _is_cache_valid(self, key):

1234

"""Check if cached data is still valid."""

1235

if key not in self._cache_timestamp:

1236

return False

1237

return (time.time() - self._cache_timestamp[key]) < self._cache_ttl

1238

1239

@lru_cache(maxsize=100)

1240

def get_user_cached(self, user_id):

1241

"""Get user with caching."""

1242

cache_key = f"user_{user_id}"

1243

if self._is_cache_valid(cache_key):

1244

# Return from LRU cache

1245

pass

1246

else:

1247

# Update cache timestamp

1248

self._cache_timestamp[cache_key] = time.time()

1249

1250

return self.hook.get_user(user_id, select_properties=['id', 'displayName', 'mail'])

1251

1252

def invalidate_user_cache(self, user_id):

1253

"""Invalidate user cache."""

1254

cache_key = f"user_{user_id}"

1255

if cache_key in self._cache_timestamp:

1256

del self._cache_timestamp[cache_key]

1257

1258

# Clear from LRU cache

1259

self.get_user_cached.cache_clear()

1260

1261

return CachedGraphHook

1262

```

1263

1264

This comprehensive documentation covers all Microsoft Graph API capabilities in the Apache Airflow Microsoft Azure Provider, including authentication methods, API operations, sensor monitoring, trigger-based operations, and performance optimization techniques for Microsoft 365 integration.