or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

azure-batch.mdazure-data-explorer.mdazure-file-share.mdblob-storage.mdcontainer-services.mdcosmos-db.mddata-factory.mddata-lake-storage.mddata-transfers.mdindex.mdmicrosoft-graph.mdpowerbi.mdservice-bus.mdsynapse-analytics.md

service-bus.mddocs/

0

# Azure Service Bus

1

2

Complete Azure Service Bus integration providing comprehensive messaging capabilities including queue and topic management, message operations, subscription handling, and administrative functions for reliable messaging scenarios.

3

4

## Capabilities

5

6

### Base Service Bus Hook

7

8

Foundation hook for Azure Service Bus operations providing common functionality and connection management.

9

10

```python { .api }

11

class BaseAzureServiceBusHook(BaseHook):

12

"""

13

Base hook for Azure Service Bus operations.

14

15

Provides common functionality and connection management for Service Bus

16

administrative and message operations.

17

"""

18

19

def get_conn(self) -> ServiceBusClient:

20

"""

21

Get authenticated Azure Service Bus client.

22

23

Returns:

24

ServiceBusClient: Service Bus client instance

25

"""

26

27

def test_connection(self) -> tuple[bool, str]:

28

"""

29

Test the Azure Service Bus connection.

30

31

Returns:

32

tuple[bool, str]: Success status and message

33

"""

34

```

35

36

### Administrative Operations Hook

37

38

Hook for Azure Service Bus administrative operations including queue and topic management.

39

40

```python { .api }

41

class AdminClientHook(BaseAzureServiceBusHook):

42

"""

43

Hook for Azure Service Bus administrative operations.

44

45

Provides methods for managing queues, topics, subscriptions, and other

46

Service Bus administrative tasks.

47

"""

48

49

def get_conn(self) -> ServiceBusAdministrationClient:

50

"""

51

Get authenticated Service Bus administration client.

52

53

Returns:

54

ServiceBusAdministrationClient: Administration client instance

55

"""

56

57

def create_queue(

58

self,

59

queue_name: str,

60

max_delivery_count: int | None = None,

61

dead_lettering_on_message_expiration: bool | None = None,

62

**kwargs: Any

63

) -> None:

64

"""

65

Create a new Service Bus queue.

66

67

Args:

68

queue_name (str): Name of the queue to create

69

max_delivery_count (int | None): Maximum delivery attempts

70

dead_lettering_on_message_expiration (bool | None): Enable dead lettering

71

**kwargs: Additional queue properties

72

"""

73

74

def delete_queue(self, queue_name: str) -> None:

75

"""

76

Delete a Service Bus queue.

77

78

Args:

79

queue_name (str): Name of the queue to delete

80

"""

81

82

def get_queue(self, queue_name: str) -> QueueProperties:

83

"""

84

Get properties of a Service Bus queue.

85

86

Args:

87

queue_name (str): Name of the queue

88

89

Returns:

90

QueueProperties: Queue configuration and runtime information

91

"""

92

93

def list_queues(self) -> list[QueueProperties]:

94

"""

95

List all queues in the Service Bus namespace.

96

97

Returns:

98

list[QueueProperties]: List of queue properties

99

"""

100

101

def update_queue(

102

self,

103

queue_name: str,

104

queue_properties: QueueProperties,

105

**kwargs: Any

106

) -> QueueProperties:

107

"""

108

Update an existing Service Bus queue.

109

110

Args:

111

queue_name (str): Name of the queue to update

112

queue_properties (QueueProperties): New queue properties

113

**kwargs: Additional update parameters

114

115

Returns:

116

QueueProperties: Updated queue properties

117

"""

118

119

def create_topic(

120

self,

121

topic_name: str,

122

max_size_in_megabytes: int | None = None,

123

enable_partitioning: bool | None = None,

124

**kwargs: Any

125

) -> None:

126

"""

127

Create a new Service Bus topic.

128

129

Args:

130

topic_name (str): Name of the topic to create

131

max_size_in_megabytes (int | None): Maximum topic size

132

enable_partitioning (bool | None): Enable topic partitioning

133

**kwargs: Additional topic properties

134

"""

135

136

def delete_topic(self, topic_name: str) -> None:

137

"""

138

Delete a Service Bus topic.

139

140

Args:

141

topic_name (str): Name of the topic to delete

142

"""

143

144

def get_topic(self, topic_name: str) -> TopicProperties:

145

"""

146

Get properties of a Service Bus topic.

147

148

Args:

149

topic_name (str): Name of the topic

150

151

Returns:

152

TopicProperties: Topic configuration and runtime information

153

"""

154

155

def list_topics(self) -> list[TopicProperties]:

156

"""

157

List all topics in the Service Bus namespace.

158

159

Returns:

160

list[TopicProperties]: List of topic properties

161

"""

162

163

def create_subscription(

164

self,

165

topic_name: str,

166

subscription_name: str,

167

max_delivery_count: int | None = None,

168

dead_lettering_on_message_expiration: bool | None = None,

169

**kwargs: Any

170

) -> None:

171

"""

172

Create a subscription for a Service Bus topic.

173

174

Args:

175

topic_name (str): Name of the topic

176

subscription_name (str): Name of the subscription to create

177

max_delivery_count (int | None): Maximum delivery attempts

178

dead_lettering_on_message_expiration (bool | None): Enable dead lettering

179

**kwargs: Additional subscription properties

180

"""

181

182

def delete_subscription(

183

self,

184

topic_name: str,

185

subscription_name: str

186

) -> None:

187

"""

188

Delete a subscription from a Service Bus topic.

189

190

Args:

191

topic_name (str): Name of the topic

192

subscription_name (str): Name of the subscription to delete

193

"""

194

195

def get_subscription(

196

self,

197

topic_name: str,

198

subscription_name: str

199

) -> SubscriptionProperties:

200

"""

201

Get properties of a Service Bus subscription.

202

203

Args:

204

topic_name (str): Name of the topic

205

subscription_name (str): Name of the subscription

206

207

Returns:

208

SubscriptionProperties: Subscription configuration and runtime information

209

"""

210

211

def list_subscriptions(self, topic_name: str) -> list[SubscriptionProperties]:

212

"""

213

List all subscriptions for a Service Bus topic.

214

215

Args:

216

topic_name (str): Name of the topic

217

218

Returns:

219

list[SubscriptionProperties]: List of subscription properties

220

"""

221

222

def update_subscription(

223

self,

224

topic_name: str,

225

subscription_name: str,

226

subscription_properties: SubscriptionProperties,

227

**kwargs: Any

228

) -> SubscriptionProperties:

229

"""

230

Update an existing Service Bus subscription.

231

232

Args:

233

topic_name (str): Name of the topic

234

subscription_name (str): Name of the subscription

235

subscription_properties (SubscriptionProperties): New subscription properties

236

**kwargs: Additional update parameters

237

238

Returns:

239

SubscriptionProperties: Updated subscription properties

240

"""

241

```

242

243

### Message Operations Hook

244

245

Hook for Azure Service Bus message operations including sending and receiving messages.

246

247

```python { .api }

248

class MessageHook(BaseAzureServiceBusHook):

249

"""

250

Hook for Azure Service Bus message operations.

251

252

Provides methods for sending and receiving messages from queues and topics,

253

with support for message batching and transaction handling.

254

"""

255

256

def get_conn(self) -> ServiceBusClient:

257

"""

258

Get authenticated Service Bus client for message operations.

259

260

Returns:

261

ServiceBusClient: Service Bus client instance

262

"""

263

264

def send_message(

265

self,

266

queue_name: str,

267

message: str | ServiceBusMessage,

268

batch: bool = False,

269

**kwargs: Any

270

) -> None:

271

"""

272

Send a message to a Service Bus queue.

273

274

Args:

275

queue_name (str): Name of the target queue

276

message (str | ServiceBusMessage): Message content or ServiceBusMessage object

277

batch (bool): Whether to send as part of a batch (default: False)

278

**kwargs: Additional message properties

279

"""

280

281

def send_list_of_messages(

282

self,

283

queue_name: str,

284

messages: list[ServiceBusMessage],

285

**kwargs: Any

286

) -> None:

287

"""

288

Send multiple messages to a Service Bus queue.

289

290

Args:

291

queue_name (str): Name of the target queue

292

messages (list[ServiceBusMessage]): List of messages to send

293

**kwargs: Additional send parameters

294

"""

295

296

def receive_message(

297

self,

298

queue_name: str,

299

max_message_count: int = 1,

300

max_wait_time: int = 5,

301

**kwargs: Any

302

) -> list[ServiceBusReceivedMessage]:

303

"""

304

Receive messages from a Service Bus queue.

305

306

Args:

307

queue_name (str): Name of the source queue

308

max_message_count (int): Maximum number of messages to receive (default: 1)

309

max_wait_time (int): Maximum wait time in seconds (default: 5)

310

**kwargs: Additional receive parameters

311

312

Returns:

313

list[ServiceBusReceivedMessage]: List of received messages

314

"""

315

316

def peek_messages(

317

self,

318

queue_name: str,

319

message_count: int = 1,

320

sequence_number: int | None = None

321

) -> list[ServiceBusReceivedMessage]:

322

"""

323

Peek at messages in a Service Bus queue without removing them.

324

325

Args:

326

queue_name (str): Name of the queue to peek

327

message_count (int): Number of messages to peek (default: 1)

328

sequence_number (int | None): Starting sequence number

329

330

Returns:

331

list[ServiceBusReceivedMessage]: List of peeked messages

332

"""

333

334

def send_topic_message(

335

self,

336

topic_name: str,

337

message: str | ServiceBusMessage,

338

batch: bool = False,

339

**kwargs: Any

340

) -> None:

341

"""

342

Send a message to a Service Bus topic.

343

344

Args:

345

topic_name (str): Name of the target topic

346

message (str | ServiceBusMessage): Message content or ServiceBusMessage object

347

batch (bool): Whether to send as part of a batch (default: False)

348

**kwargs: Additional message properties

349

"""

350

351

def receive_subscription_message(

352

self,

353

topic_name: str,

354

subscription_name: str,

355

max_message_count: int = 1,

356

max_wait_time: int = 5,

357

**kwargs: Any

358

) -> list[ServiceBusReceivedMessage]:

359

"""

360

Receive messages from a Service Bus subscription.

361

362

Args:

363

topic_name (str): Name of the topic

364

subscription_name (str): Name of the subscription

365

max_message_count (int): Maximum number of messages to receive (default: 1)

366

max_wait_time (int): Maximum wait time in seconds (default: 5)

367

**kwargs: Additional receive parameters

368

369

Returns:

370

list[ServiceBusReceivedMessage]: List of received messages

371

"""

372

```

373

374

## Service Bus Operators

375

376

Execute Azure Service Bus operations as Airflow tasks with comprehensive queue and topic management capabilities.

377

378

### Queue Management Operators

379

380

```python { .api }

381

class AzureServiceBusCreateQueueOperator(BaseOperator):

382

"""

383

Creates Azure Service Bus queues.

384

385

Supports creating queues with custom properties and configuration

386

options for messaging requirements.

387

"""

388

389

def __init__(

390

self,

391

*,

392

queue_name: str,

393

azure_service_bus_conn_id: str = "azure_service_bus_default",

394

max_delivery_count: int | None = None,

395

dead_lettering_on_message_expiration: bool | None = None,

396

**kwargs

397

):

398

"""

399

Initialize Service Bus create queue operator.

400

401

Args:

402

queue_name (str): Name of the queue to create

403

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

404

max_delivery_count (int | None): Maximum delivery attempts

405

dead_lettering_on_message_expiration (bool | None): Enable dead lettering

406

"""

407

408

class AzureServiceBusDeleteQueueOperator(BaseOperator):

409

"""

410

Deletes Azure Service Bus queues.

411

412

Removes queues from the Service Bus namespace with

413

error handling for non-existent queues.

414

"""

415

416

def __init__(

417

self,

418

*,

419

queue_name: str,

420

azure_service_bus_conn_id: str = "azure_service_bus_default",

421

**kwargs

422

):

423

"""

424

Initialize Service Bus delete queue operator.

425

426

Args:

427

queue_name (str): Name of the queue to delete

428

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

429

"""

430

```

431

432

### Message Operations Operators

433

434

```python { .api }

435

class AzureServiceBusSendMessageOperator(BaseOperator):

436

"""

437

Sends messages to Azure Service Bus queues.

438

439

Supports sending single messages or batches with custom

440

message properties and routing information.

441

"""

442

443

def __init__(

444

self,

445

*,

446

queue_name: str,

447

message: str,

448

azure_service_bus_conn_id: str = "azure_service_bus_default",

449

batch: bool = False,

450

**kwargs

451

):

452

"""

453

Initialize Service Bus send message operator.

454

455

Args:

456

queue_name (str): Target queue name

457

message (str): Message content to send

458

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

459

batch (bool): Whether to send as batch message

460

"""

461

462

class AzureServiceBusReceiveMessageOperator(BaseOperator):

463

"""

464

Receives messages from Azure Service Bus queues.

465

466

Retrieves messages with configurable receive options

467

and message handling parameters.

468

"""

469

470

def __init__(

471

self,

472

*,

473

queue_name: str,

474

azure_service_bus_conn_id: str = "azure_service_bus_default",

475

max_message_count: int = 1,

476

max_wait_time: int = 5,

477

**kwargs

478

):

479

"""

480

Initialize Service Bus receive message operator.

481

482

Args:

483

queue_name (str): Source queue name

484

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

485

max_message_count (int): Maximum messages to receive

486

max_wait_time (int): Maximum wait time in seconds

487

"""

488

```

489

490

### Topic and Subscription Operators

491

492

```python { .api }

493

class AzureServiceBusTopicCreateOperator(BaseOperator):

494

"""

495

Creates Azure Service Bus topics.

496

497

Supports creating topics with custom configuration

498

and partitioning options for publish-subscribe scenarios.

499

"""

500

501

def __init__(

502

self,

503

*,

504

topic_name: str,

505

azure_service_bus_conn_id: str = "azure_service_bus_default",

506

max_size_in_megabytes: int | None = None,

507

enable_partitioning: bool | None = None,

508

**kwargs

509

):

510

"""

511

Initialize Service Bus create topic operator.

512

513

Args:

514

topic_name (str): Name of the topic to create

515

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

516

max_size_in_megabytes (int | None): Maximum topic size

517

enable_partitioning (bool | None): Enable topic partitioning

518

"""

519

520

class AzureServiceBusTopicDeleteOperator(BaseOperator):

521

"""

522

Deletes Azure Service Bus topics.

523

524

Removes topics and all associated subscriptions

525

from the Service Bus namespace.

526

"""

527

528

def __init__(

529

self,

530

*,

531

topic_name: str,

532

azure_service_bus_conn_id: str = "azure_service_bus_default",

533

**kwargs

534

):

535

"""

536

Initialize Service Bus delete topic operator.

537

538

Args:

539

topic_name (str): Name of the topic to delete

540

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

541

"""

542

543

class AzureServiceBusSubscriptionCreateOperator(BaseOperator):

544

"""

545

Creates Azure Service Bus subscriptions.

546

547

Creates subscriptions for topics with filtering rules

548

and message handling configuration.

549

"""

550

551

def __init__(

552

self,

553

*,

554

topic_name: str,

555

subscription_name: str,

556

azure_service_bus_conn_id: str = "azure_service_bus_default",

557

max_delivery_count: int | None = None,

558

dead_lettering_on_message_expiration: bool | None = None,

559

**kwargs

560

):

561

"""

562

Initialize Service Bus create subscription operator.

563

564

Args:

565

topic_name (str): Name of the topic

566

subscription_name (str): Name of the subscription to create

567

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

568

max_delivery_count (int | None): Maximum delivery attempts

569

dead_lettering_on_message_expiration (bool | None): Enable dead lettering

570

"""

571

572

class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):

573

"""

574

Deletes Azure Service Bus subscriptions.

575

576

Removes subscriptions from topics with proper

577

cleanup and error handling.

578

"""

579

580

def __init__(

581

self,

582

*,

583

topic_name: str,

584

subscription_name: str,

585

azure_service_bus_conn_id: str = "azure_service_bus_default",

586

**kwargs

587

):

588

"""

589

Initialize Service Bus delete subscription operator.

590

591

Args:

592

topic_name (str): Name of the topic

593

subscription_name (str): Name of the subscription to delete

594

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

595

"""

596

597

class AzureServiceBusUpdateSubscriptionOperator(BaseOperator):

598

"""

599

Updates Azure Service Bus subscriptions.

600

601

Modifies subscription properties and configuration

602

for existing subscriptions.

603

"""

604

605

def __init__(

606

self,

607

*,

608

topic_name: str,

609

subscription_name: str,

610

azure_service_bus_conn_id: str = "azure_service_bus_default",

611

**kwargs

612

):

613

"""

614

Initialize Service Bus update subscription operator.

615

616

Args:

617

topic_name (str): Name of the topic

618

subscription_name (str): Name of the subscription to update

619

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

620

"""

621

622

class ASBReceiveSubscriptionMessageOperator(BaseOperator):

623

"""

624

Receives messages from Azure Service Bus subscriptions.

625

626

Retrieves messages from topic subscriptions with

627

configurable receive parameters and filtering.

628

"""

629

630

def __init__(

631

self,

632

*,

633

topic_name: str,

634

subscription_name: str,

635

azure_service_bus_conn_id: str = "azure_service_bus_default",

636

max_message_count: int = 1,

637

max_wait_time: int = 5,

638

**kwargs

639

):

640

"""

641

Initialize Service Bus receive subscription message operator.

642

643

Args:

644

topic_name (str): Name of the topic

645

subscription_name (str): Name of the subscription

646

azure_service_bus_conn_id (str): Airflow connection ID for Service Bus

647

max_message_count (int): Maximum messages to receive

648

max_wait_time (int): Maximum wait time in seconds

649

"""

650

```

651

652

## Usage Examples

653

654

### Basic Queue Operations

655

656

```python

657

from airflow import DAG

658

from airflow.providers.microsoft.azure.operators.asb import (

659

AzureServiceBusCreateQueueOperator,

660

AzureServiceBusSendMessageOperator,

661

AzureServiceBusReceiveMessageOperator,

662

AzureServiceBusDeleteQueueOperator

663

)

664

from airflow.operators.python import PythonOperator

665

from datetime import datetime, timedelta

666

667

def process_received_messages(**context):

668

"""Process messages received from Service Bus."""

669

# Get messages from previous task

670

messages = context['task_instance'].xcom_pull(task_ids='receive_messages')

671

672

for message in messages:

673

print(f"Processing message: {message.body}")

674

# Message processing logic here

675

676

return len(messages)

677

678

dag = DAG(

679

'service_bus_queue_workflow',

680

default_args={

681

'owner': 'messaging-team',

682

'retries': 2,

683

'retry_delay': timedelta(minutes=3)

684

},

685

description='Service Bus queue messaging workflow',

686

schedule_interval=timedelta(minutes=15),

687

start_date=datetime(2024, 1, 1),

688

catchup=False

689

)

690

691

# Create queue

692

create_queue = AzureServiceBusCreateQueueOperator(

693

task_id='create_processing_queue',

694

queue_name='data-processing-queue',

695

azure_service_bus_conn_id='service_bus_conn',

696

max_delivery_count=5,

697

dead_lettering_on_message_expiration=True,

698

dag=dag

699

)

700

701

# Send messages

702

send_message = AzureServiceBusSendMessageOperator(

703

task_id='send_data_message',

704

queue_name='data-processing-queue',

705

message='{"data": "sample_data", "timestamp": "2024-01-01T10:00:00Z"}',

706

azure_service_bus_conn_id='service_bus_conn',

707

dag=dag

708

)

709

710

# Receive and process messages

711

receive_messages = AzureServiceBusReceiveMessageOperator(

712

task_id='receive_messages',

713

queue_name='data-processing-queue',

714

azure_service_bus_conn_id='service_bus_conn',

715

max_message_count=10,

716

max_wait_time=30,

717

dag=dag

718

)

719

720

process_messages = PythonOperator(

721

task_id='process_messages',

722

python_callable=process_received_messages,

723

dag=dag

724

)

725

726

# Cleanup queue (optional)

727

cleanup_queue = AzureServiceBusDeleteQueueOperator(

728

task_id='cleanup_queue',

729

queue_name='data-processing-queue',

730

azure_service_bus_conn_id='service_bus_conn',

731

dag=dag

732

)

733

734

create_queue >> send_message >> receive_messages >> process_messages >> cleanup_queue

735

```

736

737

### Topic and Subscription Pattern

738

739

```python

740

from airflow import DAG

741

from airflow.providers.microsoft.azure.operators.asb import (

742

AzureServiceBusTopicCreateOperator,

743

AzureServiceBusSubscriptionCreateOperator,

744

AzureServiceBusSendMessageOperator,

745

ASBReceiveSubscriptionMessageOperator

746

)

747

from airflow.providers.microsoft.azure.hooks.asb import MessageHook

748

from airflow.operators.python import PythonOperator

749

from datetime import datetime, timedelta

750

751

def send_notification_messages():

752

"""Send notification messages to topic."""

753

hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')

754

755

notifications = [

756

{"type": "order", "id": "12345", "status": "completed"},

757

{"type": "payment", "id": "67890", "status": "processed"},

758

{"type": "shipping", "id": "11111", "status": "dispatched"}

759

]

760

761

for notification in notifications:

762

hook.send_topic_message(

763

topic_name='notifications',

764

message=str(notification)

765

)

766

767

print(f"Sent {len(notifications)} notifications")

768

769

def process_order_notifications(**context):

770

"""Process order-specific notifications."""

771

messages = context['task_instance'].xcom_pull(task_ids='receive_order_messages')

772

773

for message in messages:

774

print(f"Processing order notification: {message.body}")

775

# Order processing logic here

776

777

def process_payment_notifications(**context):

778

"""Process payment-specific notifications."""

779

messages = context['task_instance'].xcom_pull(task_ids='receive_payment_messages')

780

781

for message in messages:

782

print(f"Processing payment notification: {message.body}")

783

# Payment processing logic here

784

785

dag = DAG(

786

'service_bus_topic_workflow',

787

default_args={

788

'owner': 'notification-team',

789

'retries': 1,

790

'retry_delay': timedelta(minutes=2)

791

},

792

description='Service Bus topic notification workflow',

793

schedule_interval=timedelta(minutes=30),

794

start_date=datetime(2024, 1, 1),

795

catchup=False

796

)

797

798

# Create topic

799

create_topic = AzureServiceBusTopicCreateOperator(

800

task_id='create_notifications_topic',

801

topic_name='notifications',

802

azure_service_bus_conn_id='service_bus_conn',

803

max_size_in_megabytes=1024,

804

enable_partitioning=True,

805

dag=dag

806

)

807

808

# Create subscriptions for different notification types

809

create_order_subscription = AzureServiceBusSubscriptionCreateOperator(

810

task_id='create_order_subscription',

811

topic_name='notifications',

812

subscription_name='order-processor',

813

azure_service_bus_conn_id='service_bus_conn',

814

max_delivery_count=3,

815

dag=dag

816

)

817

818

create_payment_subscription = AzureServiceBusSubscriptionCreateOperator(

819

task_id='create_payment_subscription',

820

topic_name='notifications',

821

subscription_name='payment-processor',

822

azure_service_bus_conn_id='service_bus_conn',

823

max_delivery_count=3,

824

dag=dag

825

)

826

827

# Send notifications

828

send_notifications = PythonOperator(

829

task_id='send_notifications',

830

python_callable=send_notification_messages,

831

dag=dag

832

)

833

834

# Receive from subscriptions

835

receive_order_messages = ASBReceiveSubscriptionMessageOperator(

836

task_id='receive_order_messages',

837

topic_name='notifications',

838

subscription_name='order-processor',

839

azure_service_bus_conn_id='service_bus_conn',

840

max_message_count=50,

841

dag=dag

842

)

843

844

receive_payment_messages = ASBReceiveSubscriptionMessageOperator(

845

task_id='receive_payment_messages',

846

topic_name='notifications',

847

subscription_name='payment-processor',

848

azure_service_bus_conn_id='service_bus_conn',

849

max_message_count=50,

850

dag=dag

851

)

852

853

# Process notifications

854

process_orders = PythonOperator(

855

task_id='process_order_notifications',

856

python_callable=process_order_notifications,

857

dag=dag

858

)

859

860

process_payments = PythonOperator(

861

task_id='process_payment_notifications',

862

python_callable=process_payment_notifications,

863

dag=dag

864

)

865

866

# Set up dependencies

867

create_topic >> [create_order_subscription, create_payment_subscription]

868

[create_order_subscription, create_payment_subscription] >> send_notifications

869

send_notifications >> [receive_order_messages, receive_payment_messages]

870

receive_order_messages >> process_orders

871

receive_payment_messages >> process_payments

872

```

873

874

### Advanced Message Handling

875

876

```python

877

from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook

878

from azure.servicebus import ServiceBusMessage

879

import json

880

881

def advanced_message_operations():

882

"""Demonstrate advanced Service Bus operations."""

883

admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')

884

message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')

885

886

# Create queue with advanced configuration

887

admin_hook.create_queue(

888

queue_name='advanced-queue',

889

max_delivery_count=5,

890

dead_lettering_on_message_expiration=True,

891

default_message_time_to_live=timedelta(hours=24),

892

duplicate_detection_history_time_window=timedelta(minutes=10)

893

)

894

895

# Send message with properties

896

message_data = {

897

"id": "msg-001",

898

"data": "Important business data",

899

"timestamp": datetime.now().isoformat()

900

}

901

902

# Create message with custom properties

903

message = ServiceBusMessage(

904

body=json.dumps(message_data),

905

content_type="application/json",

906

message_id="msg-001",

907

session_id="session-001"

908

)

909

910

# Set custom properties

911

message.application_properties = {

912

"priority": "high",

913

"department": "finance",

914

"requires_processing": True

915

}

916

917

message_hook.send_message('advanced-queue', message)

918

919

# Receive and process messages

920

received_messages = message_hook.receive_message(

921

queue_name='advanced-queue',

922

max_message_count=10,

923

max_wait_time=60

924

)

925

926

for msg in received_messages:

927

print(f"Message ID: {msg.message_id}")

928

print(f"Content Type: {msg.content_type}")

929

print(f"Properties: {msg.application_properties}")

930

print(f"Body: {msg.body}")

931

932

# Complete the message to remove it from queue

933

message_hook.get_conn().get_queue_receiver('advanced-queue').complete_message(msg)

934

935

def monitor_queue_metrics():

936

"""Monitor Service Bus queue metrics."""

937

admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')

938

939

# Get queue properties and metrics

940

queue_properties = admin_hook.get_queue('data-processing-queue')

941

942

print(f"Active Message Count: {queue_properties.active_message_count}")

943

print(f"Dead Letter Message Count: {queue_properties.dead_letter_message_count}")

944

print(f"Scheduled Message Count: {queue_properties.scheduled_message_count}")

945

print(f"Size in Bytes: {queue_properties.size_in_bytes}")

946

947

# Alert if queue has too many messages

948

if queue_properties.active_message_count > 1000:

949

print("WARNING: Queue has high message count!")

950

951

return {

952

'active_messages': queue_properties.active_message_count,

953

'dead_letter_messages': queue_properties.dead_letter_message_count,

954

'queue_size_bytes': queue_properties.size_in_bytes

955

}

956

```

957

958

## Connection Configuration

959

960

### Service Bus Connection (`azure_service_bus`)

961

962

Configure Azure Service Bus connections in Airflow:

963

964

```python

965

# Connection configuration for Service Bus

966

{

967

"conn_id": "azure_service_bus_default",

968

"conn_type": "azure_service_bus",

969

"host": "myservicebus.servicebus.windows.net",

970

"extra": {

971

"tenant_id": "your-tenant-id",

972

"client_id": "your-client-id",

973

"client_secret": "your-client-secret"

974

}

975

}

976

```

977

978

### Authentication Methods

979

980

Azure Service Bus supports multiple authentication methods:

981

982

1. **Service Principal Authentication**:

983

```python

984

extra = {

985

"tenant_id": "your-tenant-id",

986

"client_id": "your-client-id",

987

"client_secret": "your-client-secret"

988

}

989

```

990

991

2. **Connection String Authentication**:

992

```python

993

extra = {

994

"connection_string": "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key"

995

}

996

```

997

998

3. **Managed Identity Authentication**:

999

```python

1000

extra = {

1001

"managed_identity_client_id": "your-managed-identity-client-id"

1002

}

1003

```

1004

1005

4. **Shared Access Key Authentication**:

1006

```python

1007

extra = {

1008

"shared_access_key_name": "your-key-name",

1009

"shared_access_key_value": "your-key-value"

1010

}

1011

```

1012

1013

## Error Handling

1014

1015

### Common Exception Patterns

1016

1017

```python

1018

from azure.servicebus.exceptions import ServiceBusError, MessageAlreadySettled

1019

from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook

1020

1021

def robust_message_handling():

1022

"""Demonstrate error handling patterns."""

1023

admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')

1024

message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')

1025

1026

try:

1027

# Attempt to create queue

1028

admin_hook.create_queue('test-queue')

1029

except ServiceBusError as e:

1030

if "already exists" in str(e).lower():

1031

print("Queue already exists, continuing...")

1032

else:

1033

print(f"Service Bus error: {e}")

1034

raise

1035

1036

try:

1037

# Send message with error handling

1038

message_hook.send_message('test-queue', 'test message')

1039

except ServiceBusError as e:

1040

print(f"Failed to send message: {e}")

1041

# Implement retry logic or alternative handling

1042

1043

try:

1044

# Receive messages with proper completion

1045

messages = message_hook.receive_message('test-queue')

1046

for message in messages:

1047

try:

1048

# Process message

1049

print(f"Processing: {message.body}")

1050

# Complete message to remove from queue

1051

message_hook.get_conn().get_queue_receiver('test-queue').complete_message(message)

1052

except MessageAlreadySettled:

1053

print("Message was already settled")

1054

except Exception as e:

1055

print(f"Failed to process message: {e}")

1056

# Abandon message to return to queue

1057

message_hook.get_conn().get_queue_receiver('test-queue').abandon_message(message)

1058

1059

except Exception as e:

1060

print(f"Unexpected error: {e}")

1061

raise

1062

```

1063

1064

### Connection Testing

1065

1066

```python

1067

def test_service_bus_connection():

1068

"""Test Service Bus connection and capabilities."""

1069

try:

1070

admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')

1071

message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')

1072

1073

# Test admin operations

1074

queues = admin_hook.list_queues()

1075

print(f"Found {len(queues)} queues")

1076

1077

# Test message operations

1078

success, message = admin_hook.test_connection()

1079

if success:

1080

print("Service Bus connection successful")

1081

else:

1082

print(f"Service Bus connection failed: {message}")

1083

1084

except Exception as e:

1085

print(f"Service Bus connection test failed: {e}")

1086

```

1087

1088

## Performance Considerations

1089

1090

### Optimizing Message Throughput

1091

1092

```python

1093

def optimized_batch_operations():

1094

"""Optimize Service Bus operations for high throughput."""

1095

message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')

1096

1097

# Use message batching for better performance

1098

messages = []

1099

for i in range(100):

1100

message = ServiceBusMessage(

1101

body=f"Batch message {i}",

1102

message_id=f"batch-msg-{i}"

1103

)

1104

messages.append(message)

1105

1106

# Send batch of messages

1107

message_hook.send_list_of_messages('high-throughput-queue', messages)

1108

1109

# Receive messages in batches

1110

batch_messages = message_hook.receive_message(

1111

queue_name='high-throughput-queue',

1112

max_message_count=32, # Optimal batch size

1113

max_wait_time=10

1114

)

1115

1116

# Process messages in parallel

1117

for message in batch_messages:

1118

# Process message logic here

1119

pass

1120

1121

def configure_high_performance_queue():

1122

"""Configure queue for high performance scenarios."""

1123

admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')

1124

1125

admin_hook.create_queue(

1126

queue_name='high-perf-queue',

1127

enable_partitioning=True, # Enable partitioning for higher throughput

1128

max_size_in_megabytes=5120, # Larger queue size

1129

duplicate_detection_history_time_window=timedelta(minutes=1), # Shorter deduplication window

1130

enable_batched_operations=True # Enable batched operations

1131

)

1132

```

1133

1134

This comprehensive documentation covers all Azure Service Bus capabilities in the Apache Airflow Microsoft Azure Provider, including administrative operations, message handling, topic/subscription patterns, and performance optimization techniques.