0
# Azure Service Bus
1
2
Complete Azure Service Bus integration providing comprehensive messaging capabilities including queue and topic management, message operations, subscription handling, and administrative functions for reliable messaging scenarios.
3
4
## Capabilities
5
6
### Base Service Bus Hook
7
8
Foundation hook for Azure Service Bus operations providing common functionality and connection management.
9
10
```python { .api }
11
class BaseAzureServiceBusHook(BaseHook):
12
"""
13
Base hook for Azure Service Bus operations.
14
15
Provides common functionality and connection management for Service Bus
16
administrative and message operations.
17
"""
18
19
def get_conn(self) -> ServiceBusClient:
20
"""
21
Get authenticated Azure Service Bus client.
22
23
Returns:
24
ServiceBusClient: Service Bus client instance
25
"""
26
27
def test_connection(self) -> tuple[bool, str]:
28
"""
29
Test the Azure Service Bus connection.
30
31
Returns:
32
tuple[bool, str]: Success status and message
33
"""
34
```
35
36
### Administrative Operations Hook
37
38
Hook for Azure Service Bus administrative operations including queue and topic management.
39
40
```python { .api }
41
class AdminClientHook(BaseAzureServiceBusHook):
42
"""
43
Hook for Azure Service Bus administrative operations.
44
45
Provides methods for managing queues, topics, subscriptions, and other
46
Service Bus administrative tasks.
47
"""
48
49
def get_conn(self) -> ServiceBusAdministrationClient:
50
"""
51
Get authenticated Service Bus administration client.
52
53
Returns:
54
ServiceBusAdministrationClient: Administration client instance
55
"""
56
57
def create_queue(
58
self,
59
queue_name: str,
60
max_delivery_count: int | None = None,
61
dead_lettering_on_message_expiration: bool | None = None,
62
**kwargs: Any
63
) -> None:
64
"""
65
Create a new Service Bus queue.
66
67
Args:
68
queue_name (str): Name of the queue to create
69
max_delivery_count (int | None): Maximum delivery attempts
70
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
71
**kwargs: Additional queue properties
72
"""
73
74
def delete_queue(self, queue_name: str) -> None:
75
"""
76
Delete a Service Bus queue.
77
78
Args:
79
queue_name (str): Name of the queue to delete
80
"""
81
82
def get_queue(self, queue_name: str) -> QueueProperties:
83
"""
84
Get properties of a Service Bus queue.
85
86
Args:
87
queue_name (str): Name of the queue
88
89
Returns:
90
QueueProperties: Queue configuration and runtime information
91
"""
92
93
def list_queues(self) -> list[QueueProperties]:
94
"""
95
List all queues in the Service Bus namespace.
96
97
Returns:
98
list[QueueProperties]: List of queue properties
99
"""
100
101
def update_queue(
102
self,
103
queue_name: str,
104
queue_properties: QueueProperties,
105
**kwargs: Any
106
) -> QueueProperties:
107
"""
108
Update an existing Service Bus queue.
109
110
Args:
111
queue_name (str): Name of the queue to update
112
queue_properties (QueueProperties): New queue properties
113
**kwargs: Additional update parameters
114
115
Returns:
116
QueueProperties: Updated queue properties
117
"""
118
119
def create_topic(
120
self,
121
topic_name: str,
122
max_size_in_megabytes: int | None = None,
123
enable_partitioning: bool | None = None,
124
**kwargs: Any
125
) -> None:
126
"""
127
Create a new Service Bus topic.
128
129
Args:
130
topic_name (str): Name of the topic to create
131
max_size_in_megabytes (int | None): Maximum topic size
132
enable_partitioning (bool | None): Enable topic partitioning
133
**kwargs: Additional topic properties
134
"""
135
136
def delete_topic(self, topic_name: str) -> None:
137
"""
138
Delete a Service Bus topic.
139
140
Args:
141
topic_name (str): Name of the topic to delete
142
"""
143
144
def get_topic(self, topic_name: str) -> TopicProperties:
145
"""
146
Get properties of a Service Bus topic.
147
148
Args:
149
topic_name (str): Name of the topic
150
151
Returns:
152
TopicProperties: Topic configuration and runtime information
153
"""
154
155
def list_topics(self) -> list[TopicProperties]:
156
"""
157
List all topics in the Service Bus namespace.
158
159
Returns:
160
list[TopicProperties]: List of topic properties
161
"""
162
163
def create_subscription(
164
self,
165
topic_name: str,
166
subscription_name: str,
167
max_delivery_count: int | None = None,
168
dead_lettering_on_message_expiration: bool | None = None,
169
**kwargs: Any
170
) -> None:
171
"""
172
Create a subscription for a Service Bus topic.
173
174
Args:
175
topic_name (str): Name of the topic
176
subscription_name (str): Name of the subscription to create
177
max_delivery_count (int | None): Maximum delivery attempts
178
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
179
**kwargs: Additional subscription properties
180
"""
181
182
def delete_subscription(
183
self,
184
topic_name: str,
185
subscription_name: str
186
) -> None:
187
"""
188
Delete a subscription from a Service Bus topic.
189
190
Args:
191
topic_name (str): Name of the topic
192
subscription_name (str): Name of the subscription to delete
193
"""
194
195
def get_subscription(
196
self,
197
topic_name: str,
198
subscription_name: str
199
) -> SubscriptionProperties:
200
"""
201
Get properties of a Service Bus subscription.
202
203
Args:
204
topic_name (str): Name of the topic
205
subscription_name (str): Name of the subscription
206
207
Returns:
208
SubscriptionProperties: Subscription configuration and runtime information
209
"""
210
211
def list_subscriptions(self, topic_name: str) -> list[SubscriptionProperties]:
212
"""
213
List all subscriptions for a Service Bus topic.
214
215
Args:
216
topic_name (str): Name of the topic
217
218
Returns:
219
list[SubscriptionProperties]: List of subscription properties
220
"""
221
222
def update_subscription(
223
self,
224
topic_name: str,
225
subscription_name: str,
226
subscription_properties: SubscriptionProperties,
227
**kwargs: Any
228
) -> SubscriptionProperties:
229
"""
230
Update an existing Service Bus subscription.
231
232
Args:
233
topic_name (str): Name of the topic
234
subscription_name (str): Name of the subscription
235
subscription_properties (SubscriptionProperties): New subscription properties
236
**kwargs: Additional update parameters
237
238
Returns:
239
SubscriptionProperties: Updated subscription properties
240
"""
241
```
242
243
### Message Operations Hook
244
245
Hook for Azure Service Bus message operations including sending and receiving messages.
246
247
```python { .api }
248
class MessageHook(BaseAzureServiceBusHook):
249
"""
250
Hook for Azure Service Bus message operations.
251
252
Provides methods for sending and receiving messages from queues and topics,
253
with support for message batching and transaction handling.
254
"""
255
256
def get_conn(self) -> ServiceBusClient:
257
"""
258
Get authenticated Service Bus client for message operations.
259
260
Returns:
261
ServiceBusClient: Service Bus client instance
262
"""
263
264
def send_message(
265
self,
266
queue_name: str,
267
message: str | ServiceBusMessage,
268
batch: bool = False,
269
**kwargs: Any
270
) -> None:
271
"""
272
Send a message to a Service Bus queue.
273
274
Args:
275
queue_name (str): Name of the target queue
276
message (str | ServiceBusMessage): Message content or ServiceBusMessage object
277
batch (bool): Whether to send as part of a batch (default: False)
278
**kwargs: Additional message properties
279
"""
280
281
def send_list_of_messages(
282
self,
283
queue_name: str,
284
messages: list[ServiceBusMessage],
285
**kwargs: Any
286
) -> None:
287
"""
288
Send multiple messages to a Service Bus queue.
289
290
Args:
291
queue_name (str): Name of the target queue
292
messages (list[ServiceBusMessage]): List of messages to send
293
**kwargs: Additional send parameters
294
"""
295
296
def receive_message(
297
self,
298
queue_name: str,
299
max_message_count: int = 1,
300
max_wait_time: int = 5,
301
**kwargs: Any
302
) -> list[ServiceBusReceivedMessage]:
303
"""
304
Receive messages from a Service Bus queue.
305
306
Args:
307
queue_name (str): Name of the source queue
308
max_message_count (int): Maximum number of messages to receive (default: 1)
309
max_wait_time (int): Maximum wait time in seconds (default: 5)
310
**kwargs: Additional receive parameters
311
312
Returns:
313
list[ServiceBusReceivedMessage]: List of received messages
314
"""
315
316
def peek_messages(
317
self,
318
queue_name: str,
319
message_count: int = 1,
320
sequence_number: int | None = None
321
) -> list[ServiceBusReceivedMessage]:
322
"""
323
Peek at messages in a Service Bus queue without removing them.
324
325
Args:
326
queue_name (str): Name of the queue to peek
327
message_count (int): Number of messages to peek (default: 1)
328
sequence_number (int | None): Starting sequence number
329
330
Returns:
331
list[ServiceBusReceivedMessage]: List of peeked messages
332
"""
333
334
def send_topic_message(
335
self,
336
topic_name: str,
337
message: str | ServiceBusMessage,
338
batch: bool = False,
339
**kwargs: Any
340
) -> None:
341
"""
342
Send a message to a Service Bus topic.
343
344
Args:
345
topic_name (str): Name of the target topic
346
message (str | ServiceBusMessage): Message content or ServiceBusMessage object
347
batch (bool): Whether to send as part of a batch (default: False)
348
**kwargs: Additional message properties
349
"""
350
351
def receive_subscription_message(
352
self,
353
topic_name: str,
354
subscription_name: str,
355
max_message_count: int = 1,
356
max_wait_time: int = 5,
357
**kwargs: Any
358
) -> list[ServiceBusReceivedMessage]:
359
"""
360
Receive messages from a Service Bus subscription.
361
362
Args:
363
topic_name (str): Name of the topic
364
subscription_name (str): Name of the subscription
365
max_message_count (int): Maximum number of messages to receive (default: 1)
366
max_wait_time (int): Maximum wait time in seconds (default: 5)
367
**kwargs: Additional receive parameters
368
369
Returns:
370
list[ServiceBusReceivedMessage]: List of received messages
371
"""
372
```
373
374
## Service Bus Operators
375
376
Execute Azure Service Bus operations as Airflow tasks with comprehensive queue and topic management capabilities.
377
378
### Queue Management Operators
379
380
```python { .api }
381
class AzureServiceBusCreateQueueOperator(BaseOperator):
382
"""
383
Creates Azure Service Bus queues.
384
385
Supports creating queues with custom properties and configuration
386
options for messaging requirements.
387
"""
388
389
def __init__(
390
self,
391
*,
392
queue_name: str,
393
azure_service_bus_conn_id: str = "azure_service_bus_default",
394
max_delivery_count: int | None = None,
395
dead_lettering_on_message_expiration: bool | None = None,
396
**kwargs
397
):
398
"""
399
Initialize Service Bus create queue operator.
400
401
Args:
402
queue_name (str): Name of the queue to create
403
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
404
max_delivery_count (int | None): Maximum delivery attempts
405
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
406
"""
407
408
class AzureServiceBusDeleteQueueOperator(BaseOperator):
409
"""
410
Deletes Azure Service Bus queues.
411
412
Removes queues from the Service Bus namespace with
413
error handling for non-existent queues.
414
"""
415
416
def __init__(
417
self,
418
*,
419
queue_name: str,
420
azure_service_bus_conn_id: str = "azure_service_bus_default",
421
**kwargs
422
):
423
"""
424
Initialize Service Bus delete queue operator.
425
426
Args:
427
queue_name (str): Name of the queue to delete
428
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
429
"""
430
```
431
432
### Message Operations Operators
433
434
```python { .api }
435
class AzureServiceBusSendMessageOperator(BaseOperator):
436
"""
437
Sends messages to Azure Service Bus queues.
438
439
Supports sending single messages or batches with custom
440
message properties and routing information.
441
"""
442
443
def __init__(
444
self,
445
*,
446
queue_name: str,
447
message: str,
448
azure_service_bus_conn_id: str = "azure_service_bus_default",
449
batch: bool = False,
450
**kwargs
451
):
452
"""
453
Initialize Service Bus send message operator.
454
455
Args:
456
queue_name (str): Target queue name
457
message (str): Message content to send
458
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
459
batch (bool): Whether to send as batch message
460
"""
461
462
class AzureServiceBusReceiveMessageOperator(BaseOperator):
463
"""
464
Receives messages from Azure Service Bus queues.
465
466
Retrieves messages with configurable receive options
467
and message handling parameters.
468
"""
469
470
def __init__(
471
self,
472
*,
473
queue_name: str,
474
azure_service_bus_conn_id: str = "azure_service_bus_default",
475
max_message_count: int = 1,
476
max_wait_time: int = 5,
477
**kwargs
478
):
479
"""
480
Initialize Service Bus receive message operator.
481
482
Args:
483
queue_name (str): Source queue name
484
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
485
max_message_count (int): Maximum messages to receive
486
max_wait_time (int): Maximum wait time in seconds
487
"""
488
```
489
490
### Topic and Subscription Operators
491
492
```python { .api }
493
class AzureServiceBusTopicCreateOperator(BaseOperator):
494
"""
495
Creates Azure Service Bus topics.
496
497
Supports creating topics with custom configuration
498
and partitioning options for publish-subscribe scenarios.
499
"""
500
501
def __init__(
502
self,
503
*,
504
topic_name: str,
505
azure_service_bus_conn_id: str = "azure_service_bus_default",
506
max_size_in_megabytes: int | None = None,
507
enable_partitioning: bool | None = None,
508
**kwargs
509
):
510
"""
511
Initialize Service Bus create topic operator.
512
513
Args:
514
topic_name (str): Name of the topic to create
515
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
516
max_size_in_megabytes (int | None): Maximum topic size
517
enable_partitioning (bool | None): Enable topic partitioning
518
"""
519
520
class AzureServiceBusTopicDeleteOperator(BaseOperator):
521
"""
522
Deletes Azure Service Bus topics.
523
524
Removes topics and all associated subscriptions
525
from the Service Bus namespace.
526
"""
527
528
def __init__(
529
self,
530
*,
531
topic_name: str,
532
azure_service_bus_conn_id: str = "azure_service_bus_default",
533
**kwargs
534
):
535
"""
536
Initialize Service Bus delete topic operator.
537
538
Args:
539
topic_name (str): Name of the topic to delete
540
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
541
"""
542
543
class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
544
"""
545
Creates Azure Service Bus subscriptions.
546
547
Creates subscriptions for topics with filtering rules
548
and message handling configuration.
549
"""
550
551
def __init__(
552
self,
553
*,
554
topic_name: str,
555
subscription_name: str,
556
azure_service_bus_conn_id: str = "azure_service_bus_default",
557
max_delivery_count: int | None = None,
558
dead_lettering_on_message_expiration: bool | None = None,
559
**kwargs
560
):
561
"""
562
Initialize Service Bus create subscription operator.
563
564
Args:
565
topic_name (str): Name of the topic
566
subscription_name (str): Name of the subscription to create
567
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
568
max_delivery_count (int | None): Maximum delivery attempts
569
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
570
"""
571
572
class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
573
"""
574
Deletes Azure Service Bus subscriptions.
575
576
Removes subscriptions from topics with proper
577
cleanup and error handling.
578
"""
579
580
def __init__(
581
self,
582
*,
583
topic_name: str,
584
subscription_name: str,
585
azure_service_bus_conn_id: str = "azure_service_bus_default",
586
**kwargs
587
):
588
"""
589
Initialize Service Bus delete subscription operator.
590
591
Args:
592
topic_name (str): Name of the topic
593
subscription_name (str): Name of the subscription to delete
594
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
595
"""
596
597
class AzureServiceBusUpdateSubscriptionOperator(BaseOperator):
598
"""
599
Updates Azure Service Bus subscriptions.
600
601
Modifies subscription properties and configuration
602
for existing subscriptions.
603
"""
604
605
def __init__(
606
self,
607
*,
608
topic_name: str,
609
subscription_name: str,
610
azure_service_bus_conn_id: str = "azure_service_bus_default",
611
**kwargs
612
):
613
"""
614
Initialize Service Bus update subscription operator.
615
616
Args:
617
topic_name (str): Name of the topic
618
subscription_name (str): Name of the subscription to update
619
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
620
"""
621
622
class ASBReceiveSubscriptionMessageOperator(BaseOperator):
623
"""
624
Receives messages from Azure Service Bus subscriptions.
625
626
Retrieves messages from topic subscriptions with
627
configurable receive parameters and filtering.
628
"""
629
630
def __init__(
631
self,
632
*,
633
topic_name: str,
634
subscription_name: str,
635
azure_service_bus_conn_id: str = "azure_service_bus_default",
636
max_message_count: int = 1,
637
max_wait_time: int = 5,
638
**kwargs
639
):
640
"""
641
Initialize Service Bus receive subscription message operator.
642
643
Args:
644
topic_name (str): Name of the topic
645
subscription_name (str): Name of the subscription
646
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
647
max_message_count (int): Maximum messages to receive
648
max_wait_time (int): Maximum wait time in seconds
649
"""
650
```
651
652
## Usage Examples
653
654
### Basic Queue Operations
655
656
```python
657
from airflow import DAG
658
from airflow.providers.microsoft.azure.operators.asb import (
659
AzureServiceBusCreateQueueOperator,
660
AzureServiceBusSendMessageOperator,
661
AzureServiceBusReceiveMessageOperator,
662
AzureServiceBusDeleteQueueOperator
663
)
664
from airflow.operators.python import PythonOperator
665
from datetime import datetime, timedelta
666
667
def process_received_messages(**context):
668
"""Process messages received from Service Bus."""
669
# Get messages from previous task
670
messages = context['task_instance'].xcom_pull(task_ids='receive_messages')
671
672
for message in messages:
673
print(f"Processing message: {message.body}")
674
# Message processing logic here
675
676
return len(messages)
677
678
dag = DAG(
679
'service_bus_queue_workflow',
680
default_args={
681
'owner': 'messaging-team',
682
'retries': 2,
683
'retry_delay': timedelta(minutes=3)
684
},
685
description='Service Bus queue messaging workflow',
686
schedule_interval=timedelta(minutes=15),
687
start_date=datetime(2024, 1, 1),
688
catchup=False
689
)
690
691
# Create queue
692
create_queue = AzureServiceBusCreateQueueOperator(
693
task_id='create_processing_queue',
694
queue_name='data-processing-queue',
695
azure_service_bus_conn_id='service_bus_conn',
696
max_delivery_count=5,
697
dead_lettering_on_message_expiration=True,
698
dag=dag
699
)
700
701
# Send messages
702
send_message = AzureServiceBusSendMessageOperator(
703
task_id='send_data_message',
704
queue_name='data-processing-queue',
705
message='{"data": "sample_data", "timestamp": "2024-01-01T10:00:00Z"}',
706
azure_service_bus_conn_id='service_bus_conn',
707
dag=dag
708
)
709
710
# Receive and process messages
711
receive_messages = AzureServiceBusReceiveMessageOperator(
712
task_id='receive_messages',
713
queue_name='data-processing-queue',
714
azure_service_bus_conn_id='service_bus_conn',
715
max_message_count=10,
716
max_wait_time=30,
717
dag=dag
718
)
719
720
process_messages = PythonOperator(
721
task_id='process_messages',
722
python_callable=process_received_messages,
723
dag=dag
724
)
725
726
# Cleanup queue (optional)
727
cleanup_queue = AzureServiceBusDeleteQueueOperator(
728
task_id='cleanup_queue',
729
queue_name='data-processing-queue',
730
azure_service_bus_conn_id='service_bus_conn',
731
dag=dag
732
)
733
734
create_queue >> send_message >> receive_messages >> process_messages >> cleanup_queue
735
```
736
737
### Topic and Subscription Pattern
738
739
```python
740
from airflow import DAG
741
from airflow.providers.microsoft.azure.operators.asb import (
742
AzureServiceBusTopicCreateOperator,
743
AzureServiceBusSubscriptionCreateOperator,
744
AzureServiceBusSendMessageOperator,
745
ASBReceiveSubscriptionMessageOperator
746
)
747
from airflow.providers.microsoft.azure.hooks.asb import MessageHook
748
from airflow.operators.python import PythonOperator
749
from datetime import datetime, timedelta
750
751
def send_notification_messages():
752
"""Send notification messages to topic."""
753
hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
754
755
notifications = [
756
{"type": "order", "id": "12345", "status": "completed"},
757
{"type": "payment", "id": "67890", "status": "processed"},
758
{"type": "shipping", "id": "11111", "status": "dispatched"}
759
]
760
761
for notification in notifications:
762
hook.send_topic_message(
763
topic_name='notifications',
764
message=str(notification)
765
)
766
767
print(f"Sent {len(notifications)} notifications")
768
769
def process_order_notifications(**context):
770
"""Process order-specific notifications."""
771
messages = context['task_instance'].xcom_pull(task_ids='receive_order_messages')
772
773
for message in messages:
774
print(f"Processing order notification: {message.body}")
775
# Order processing logic here
776
777
def process_payment_notifications(**context):
778
"""Process payment-specific notifications."""
779
messages = context['task_instance'].xcom_pull(task_ids='receive_payment_messages')
780
781
for message in messages:
782
print(f"Processing payment notification: {message.body}")
783
# Payment processing logic here
784
785
dag = DAG(
786
'service_bus_topic_workflow',
787
default_args={
788
'owner': 'notification-team',
789
'retries': 1,
790
'retry_delay': timedelta(minutes=2)
791
},
792
description='Service Bus topic notification workflow',
793
schedule_interval=timedelta(minutes=30),
794
start_date=datetime(2024, 1, 1),
795
catchup=False
796
)
797
798
# Create topic
799
create_topic = AzureServiceBusTopicCreateOperator(
800
task_id='create_notifications_topic',
801
topic_name='notifications',
802
azure_service_bus_conn_id='service_bus_conn',
803
max_size_in_megabytes=1024,
804
enable_partitioning=True,
805
dag=dag
806
)
807
808
# Create subscriptions for different notification types
809
create_order_subscription = AzureServiceBusSubscriptionCreateOperator(
810
task_id='create_order_subscription',
811
topic_name='notifications',
812
subscription_name='order-processor',
813
azure_service_bus_conn_id='service_bus_conn',
814
max_delivery_count=3,
815
dag=dag
816
)
817
818
create_payment_subscription = AzureServiceBusSubscriptionCreateOperator(
819
task_id='create_payment_subscription',
820
topic_name='notifications',
821
subscription_name='payment-processor',
822
azure_service_bus_conn_id='service_bus_conn',
823
max_delivery_count=3,
824
dag=dag
825
)
826
827
# Send notifications
828
send_notifications = PythonOperator(
829
task_id='send_notifications',
830
python_callable=send_notification_messages,
831
dag=dag
832
)
833
834
# Receive from subscriptions
835
receive_order_messages = ASBReceiveSubscriptionMessageOperator(
836
task_id='receive_order_messages',
837
topic_name='notifications',
838
subscription_name='order-processor',
839
azure_service_bus_conn_id='service_bus_conn',
840
max_message_count=50,
841
dag=dag
842
)
843
844
receive_payment_messages = ASBReceiveSubscriptionMessageOperator(
845
task_id='receive_payment_messages',
846
topic_name='notifications',
847
subscription_name='payment-processor',
848
azure_service_bus_conn_id='service_bus_conn',
849
max_message_count=50,
850
dag=dag
851
)
852
853
# Process notifications
854
process_orders = PythonOperator(
855
task_id='process_order_notifications',
856
python_callable=process_order_notifications,
857
dag=dag
858
)
859
860
process_payments = PythonOperator(
861
task_id='process_payment_notifications',
862
python_callable=process_payment_notifications,
863
dag=dag
864
)
865
866
# Set up dependencies
867
create_topic >> [create_order_subscription, create_payment_subscription]
868
[create_order_subscription, create_payment_subscription] >> send_notifications
869
send_notifications >> [receive_order_messages, receive_payment_messages]
870
receive_order_messages >> process_orders
871
receive_payment_messages >> process_payments
872
```
873
874
### Advanced Message Handling
875
876
```python
877
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
878
from azure.servicebus import ServiceBusMessage
879
import json
880
881
def advanced_message_operations():
882
"""Demonstrate advanced Service Bus operations."""
883
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
884
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
885
886
# Create queue with advanced configuration
887
admin_hook.create_queue(
888
queue_name='advanced-queue',
889
max_delivery_count=5,
890
dead_lettering_on_message_expiration=True,
891
default_message_time_to_live=timedelta(hours=24),
892
duplicate_detection_history_time_window=timedelta(minutes=10)
893
)
894
895
# Send message with properties
896
message_data = {
897
"id": "msg-001",
898
"data": "Important business data",
899
"timestamp": datetime.now().isoformat()
900
}
901
902
# Create message with custom properties
903
message = ServiceBusMessage(
904
body=json.dumps(message_data),
905
content_type="application/json",
906
message_id="msg-001",
907
session_id="session-001"
908
)
909
910
# Set custom properties
911
message.application_properties = {
912
"priority": "high",
913
"department": "finance",
914
"requires_processing": True
915
}
916
917
message_hook.send_message('advanced-queue', message)
918
919
# Receive and process messages
920
received_messages = message_hook.receive_message(
921
queue_name='advanced-queue',
922
max_message_count=10,
923
max_wait_time=60
924
)
925
926
for msg in received_messages:
927
print(f"Message ID: {msg.message_id}")
928
print(f"Content Type: {msg.content_type}")
929
print(f"Properties: {msg.application_properties}")
930
print(f"Body: {msg.body}")
931
932
# Complete the message to remove it from queue
933
message_hook.get_conn().get_queue_receiver('advanced-queue').complete_message(msg)
934
935
def monitor_queue_metrics():
936
"""Monitor Service Bus queue metrics."""
937
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
938
939
# Get queue properties and metrics
940
queue_properties = admin_hook.get_queue('data-processing-queue')
941
942
print(f"Active Message Count: {queue_properties.active_message_count}")
943
print(f"Dead Letter Message Count: {queue_properties.dead_letter_message_count}")
944
print(f"Scheduled Message Count: {queue_properties.scheduled_message_count}")
945
print(f"Size in Bytes: {queue_properties.size_in_bytes}")
946
947
# Alert if queue has too many messages
948
if queue_properties.active_message_count > 1000:
949
print("WARNING: Queue has high message count!")
950
951
return {
952
'active_messages': queue_properties.active_message_count,
953
'dead_letter_messages': queue_properties.dead_letter_message_count,
954
'queue_size_bytes': queue_properties.size_in_bytes
955
}
956
```
957
958
## Connection Configuration
959
960
### Service Bus Connection (`azure_service_bus`)
961
962
Configure Azure Service Bus connections in Airflow:
963
964
```python
965
# Connection configuration for Service Bus
966
{
967
"conn_id": "azure_service_bus_default",
968
"conn_type": "azure_service_bus",
969
"host": "myservicebus.servicebus.windows.net",
970
"extra": {
971
"tenant_id": "your-tenant-id",
972
"client_id": "your-client-id",
973
"client_secret": "your-client-secret"
974
}
975
}
976
```
977
978
### Authentication Methods
979
980
Azure Service Bus supports multiple authentication methods:
981
982
1. **Service Principal Authentication**:
983
```python
984
extra = {
985
"tenant_id": "your-tenant-id",
986
"client_id": "your-client-id",
987
"client_secret": "your-client-secret"
988
}
989
```
990
991
2. **Connection String Authentication**:
992
```python
993
extra = {
994
"connection_string": "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key"
995
}
996
```
997
998
3. **Managed Identity Authentication**:
999
```python
1000
extra = {
1001
"managed_identity_client_id": "your-managed-identity-client-id"
1002
}
1003
```
1004
1005
4. **Shared Access Key Authentication**:
1006
```python
1007
extra = {
1008
"shared_access_key_name": "your-key-name",
1009
"shared_access_key_value": "your-key-value"
1010
}
1011
```
1012
1013
## Error Handling
1014
1015
### Common Exception Patterns
1016
1017
```python
1018
from azure.servicebus.exceptions import ServiceBusError, MessageAlreadySettled
1019
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
1020
1021
def robust_message_handling():
1022
"""Demonstrate error handling patterns."""
1023
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
1024
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
1025
1026
try:
1027
# Attempt to create queue
1028
admin_hook.create_queue('test-queue')
1029
except ServiceBusError as e:
1030
if "already exists" in str(e).lower():
1031
print("Queue already exists, continuing...")
1032
else:
1033
print(f"Service Bus error: {e}")
1034
raise
1035
1036
try:
1037
# Send message with error handling
1038
message_hook.send_message('test-queue', 'test message')
1039
except ServiceBusError as e:
1040
print(f"Failed to send message: {e}")
1041
# Implement retry logic or alternative handling
1042
1043
try:
1044
# Receive messages with proper completion
1045
messages = message_hook.receive_message('test-queue')
1046
for message in messages:
1047
try:
1048
# Process message
1049
print(f"Processing: {message.body}")
1050
# Complete message to remove from queue
1051
message_hook.get_conn().get_queue_receiver('test-queue').complete_message(message)
1052
except MessageAlreadySettled:
1053
print("Message was already settled")
1054
except Exception as e:
1055
print(f"Failed to process message: {e}")
1056
# Abandon message to return to queue
1057
message_hook.get_conn().get_queue_receiver('test-queue').abandon_message(message)
1058
1059
except Exception as e:
1060
print(f"Unexpected error: {e}")
1061
raise
1062
```
1063
1064
### Connection Testing
1065
1066
```python
1067
def test_service_bus_connection():
1068
"""Test Service Bus connection and capabilities."""
1069
try:
1070
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
1071
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
1072
1073
# Test admin operations
1074
queues = admin_hook.list_queues()
1075
print(f"Found {len(queues)} queues")
1076
1077
# Test message operations
1078
success, message = admin_hook.test_connection()
1079
if success:
1080
print("Service Bus connection successful")
1081
else:
1082
print(f"Service Bus connection failed: {message}")
1083
1084
except Exception as e:
1085
print(f"Service Bus connection test failed: {e}")
1086
```
1087
1088
## Performance Considerations
1089
1090
### Optimizing Message Throughput
1091
1092
```python
1093
def optimized_batch_operations():
1094
"""Optimize Service Bus operations for high throughput."""
1095
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
1096
1097
# Use message batching for better performance
1098
messages = []
1099
for i in range(100):
1100
message = ServiceBusMessage(
1101
body=f"Batch message {i}",
1102
message_id=f"batch-msg-{i}"
1103
)
1104
messages.append(message)
1105
1106
# Send batch of messages
1107
message_hook.send_list_of_messages('high-throughput-queue', messages)
1108
1109
# Receive messages in batches
1110
batch_messages = message_hook.receive_message(
1111
queue_name='high-throughput-queue',
1112
max_message_count=32, # Optimal batch size
1113
max_wait_time=10
1114
)
1115
1116
# Process messages in parallel
1117
for message in batch_messages:
1118
# Process message logic here
1119
pass
1120
1121
def configure_high_performance_queue():
1122
"""Configure queue for high performance scenarios."""
1123
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
1124
1125
admin_hook.create_queue(
1126
queue_name='high-perf-queue',
1127
enable_partitioning=True, # Enable partitioning for higher throughput
1128
max_size_in_megabytes=5120, # Larger queue size
1129
duplicate_detection_history_time_window=timedelta(minutes=1), # Shorter deduplication window
1130
enable_batched_operations=True # Enable batched operations
1131
)
1132
```
1133
1134
This comprehensive documentation covers all Azure Service Bus capabilities in the Apache Airflow Microsoft Azure Provider, including administrative operations, message handling, topic/subscription patterns, and performance optimization techniques.