CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

service-bus.mddocs/

Azure Service Bus

Complete Azure Service Bus integration providing comprehensive messaging capabilities including queue and topic management, message operations, subscription handling, and administrative functions for reliable messaging scenarios.

Capabilities

Base Service Bus Hook

Foundation hook for Azure Service Bus operations providing common functionality and connection management.

class BaseAzureServiceBusHook(BaseHook):
    """
    Base hook for Azure Service Bus operations.
    
    Provides common functionality and connection management for Service Bus
    administrative and message operations.
    """
    
    def get_conn(self) -> ServiceBusClient:
        """
        Get authenticated Azure Service Bus client.
        
        Returns:
            ServiceBusClient: Service Bus client instance
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Azure Service Bus connection.
        
        Returns:
            tuple[bool, str]: Success status and message
        """

Administrative Operations Hook

Hook for Azure Service Bus administrative operations including queue and topic management.

class AdminClientHook(BaseAzureServiceBusHook):
    """
    Hook for Azure Service Bus administrative operations.
    
    Provides methods for managing queues, topics, subscriptions, and other
    Service Bus administrative tasks.
    """
    
    def get_conn(self) -> ServiceBusAdministrationClient:
        """
        Get authenticated Service Bus administration client.
        
        Returns:
            ServiceBusAdministrationClient: Administration client instance
        """
    
    def create_queue(
        self,
        queue_name: str,
        max_delivery_count: int | None = None,
        dead_lettering_on_message_expiration: bool | None = None,
        **kwargs: Any
    ) -> None:
        """
        Create a new Service Bus queue.
        
        Args:
            queue_name (str): Name of the queue to create
            max_delivery_count (int | None): Maximum delivery attempts
            dead_lettering_on_message_expiration (bool | None): Enable dead lettering
            **kwargs: Additional queue properties
        """
    
    def delete_queue(self, queue_name: str) -> None:
        """
        Delete a Service Bus queue.
        
        Args:
            queue_name (str): Name of the queue to delete
        """
    
    def get_queue(self, queue_name: str) -> QueueProperties:
        """
        Get properties of a Service Bus queue.
        
        Args:
            queue_name (str): Name of the queue
            
        Returns:
            QueueProperties: Queue configuration and runtime information
        """
    
    def list_queues(self) -> list[QueueProperties]:
        """
        List all queues in the Service Bus namespace.
        
        Returns:
            list[QueueProperties]: List of queue properties
        """
    
    def update_queue(
        self,
        queue_name: str,
        queue_properties: QueueProperties,
        **kwargs: Any
    ) -> QueueProperties:
        """
        Update an existing Service Bus queue.
        
        Args:
            queue_name (str): Name of the queue to update
            queue_properties (QueueProperties): New queue properties
            **kwargs: Additional update parameters
            
        Returns:
            QueueProperties: Updated queue properties
        """
    
    def create_topic(
        self,
        topic_name: str,
        max_size_in_megabytes: int | None = None,
        enable_partitioning: bool | None = None,
        **kwargs: Any
    ) -> None:
        """
        Create a new Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic to create
            max_size_in_megabytes (int | None): Maximum topic size
            enable_partitioning (bool | None): Enable topic partitioning
            **kwargs: Additional topic properties
        """
    
    def delete_topic(self, topic_name: str) -> None:
        """
        Delete a Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic to delete
        """
    
    def get_topic(self, topic_name: str) -> TopicProperties:
        """
        Get properties of a Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic
            
        Returns:
            TopicProperties: Topic configuration and runtime information
        """
    
    def list_topics(self) -> list[TopicProperties]:
        """
        List all topics in the Service Bus namespace.
        
        Returns:
            list[TopicProperties]: List of topic properties
        """
    
    def create_subscription(
        self,
        topic_name: str,
        subscription_name: str,
        max_delivery_count: int | None = None,
        dead_lettering_on_message_expiration: bool | None = None,
        **kwargs: Any
    ) -> None:
        """
        Create a subscription for a Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription to create
            max_delivery_count (int | None): Maximum delivery attempts
            dead_lettering_on_message_expiration (bool | None): Enable dead lettering
            **kwargs: Additional subscription properties
        """
    
    def delete_subscription(
        self,
        topic_name: str,
        subscription_name: str
    ) -> None:
        """
        Delete a subscription from a Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription to delete
        """
    
    def get_subscription(
        self,
        topic_name: str,
        subscription_name: str
    ) -> SubscriptionProperties:
        """
        Get properties of a Service Bus subscription.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription
            
        Returns:
            SubscriptionProperties: Subscription configuration and runtime information
        """
    
    def list_subscriptions(self, topic_name: str) -> list[SubscriptionProperties]:
        """
        List all subscriptions for a Service Bus topic.
        
        Args:
            topic_name (str): Name of the topic
            
        Returns:
            list[SubscriptionProperties]: List of subscription properties
        """
    
    def update_subscription(
        self,
        topic_name: str,
        subscription_name: str,
        subscription_properties: SubscriptionProperties,
        **kwargs: Any
    ) -> SubscriptionProperties:
        """
        Update an existing Service Bus subscription.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription
            subscription_properties (SubscriptionProperties): New subscription properties
            **kwargs: Additional update parameters
            
        Returns:
            SubscriptionProperties: Updated subscription properties
        """

Message Operations Hook

Hook for Azure Service Bus message operations including sending and receiving messages.

class MessageHook(BaseAzureServiceBusHook):
    """
    Hook for Azure Service Bus message operations.
    
    Provides methods for sending and receiving messages from queues and topics,
    with support for message batching and transaction handling.
    """
    
    def get_conn(self) -> ServiceBusClient:
        """
        Get authenticated Service Bus client for message operations.
        
        Returns:
            ServiceBusClient: Service Bus client instance
        """
    
    def send_message(
        self,
        queue_name: str,
        message: str | ServiceBusMessage,
        batch: bool = False,
        **kwargs: Any
    ) -> None:
        """
        Send a message to a Service Bus queue.
        
        Args:
            queue_name (str): Name of the target queue
            message (str | ServiceBusMessage): Message content or ServiceBusMessage object
            batch (bool): Whether to send as part of a batch (default: False)
            **kwargs: Additional message properties
        """
    
    def send_list_of_messages(
        self,
        queue_name: str,
        messages: list[ServiceBusMessage],
        **kwargs: Any
    ) -> None:
        """
        Send multiple messages to a Service Bus queue.
        
        Args:
            queue_name (str): Name of the target queue
            messages (list[ServiceBusMessage]): List of messages to send
            **kwargs: Additional send parameters
        """
    
    def receive_message(
        self,
        queue_name: str,
        max_message_count: int = 1,
        max_wait_time: int = 5,
        **kwargs: Any
    ) -> list[ServiceBusReceivedMessage]:
        """
        Receive messages from a Service Bus queue.
        
        Args:
            queue_name (str): Name of the source queue
            max_message_count (int): Maximum number of messages to receive (default: 1)
            max_wait_time (int): Maximum wait time in seconds (default: 5)
            **kwargs: Additional receive parameters
            
        Returns:
            list[ServiceBusReceivedMessage]: List of received messages
        """
    
    def peek_messages(
        self,
        queue_name: str,
        message_count: int = 1,
        sequence_number: int | None = None
    ) -> list[ServiceBusReceivedMessage]:
        """
        Peek at messages in a Service Bus queue without removing them.
        
        Args:
            queue_name (str): Name of the queue to peek
            message_count (int): Number of messages to peek (default: 1)
            sequence_number (int | None): Starting sequence number
            
        Returns:
            list[ServiceBusReceivedMessage]: List of peeked messages
        """
    
    def send_topic_message(
        self,
        topic_name: str,
        message: str | ServiceBusMessage,
        batch: bool = False,
        **kwargs: Any
    ) -> None:
        """
        Send a message to a Service Bus topic.
        
        Args:
            topic_name (str): Name of the target topic
            message (str | ServiceBusMessage): Message content or ServiceBusMessage object
            batch (bool): Whether to send as part of a batch (default: False)
            **kwargs: Additional message properties
        """
    
    def receive_subscription_message(
        self,
        topic_name: str,
        subscription_name: str,
        max_message_count: int = 1,
        max_wait_time: int = 5,
        **kwargs: Any
    ) -> list[ServiceBusReceivedMessage]:
        """
        Receive messages from a Service Bus subscription.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription
            max_message_count (int): Maximum number of messages to receive (default: 1)
            max_wait_time (int): Maximum wait time in seconds (default: 5)
            **kwargs: Additional receive parameters
            
        Returns:
            list[ServiceBusReceivedMessage]: List of received messages
        """

Service Bus Operators

Execute Azure Service Bus operations as Airflow tasks with comprehensive queue and topic management capabilities.

Queue Management Operators

class AzureServiceBusCreateQueueOperator(BaseOperator):
    """
    Creates Azure Service Bus queues.
    
    Supports creating queues with custom properties and configuration
    options for messaging requirements.
    """
    
    def __init__(
        self,
        *,
        queue_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        max_delivery_count: int | None = None,
        dead_lettering_on_message_expiration: bool | None = None,
        **kwargs
    ):
        """
        Initialize Service Bus create queue operator.
        
        Args:
            queue_name (str): Name of the queue to create
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            max_delivery_count (int | None): Maximum delivery attempts
            dead_lettering_on_message_expiration (bool | None): Enable dead lettering
        """

class AzureServiceBusDeleteQueueOperator(BaseOperator):
    """
    Deletes Azure Service Bus queues.
    
    Removes queues from the Service Bus namespace with
    error handling for non-existent queues.
    """
    
    def __init__(
        self,
        *,
        queue_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        **kwargs
    ):
        """
        Initialize Service Bus delete queue operator.
        
        Args:
            queue_name (str): Name of the queue to delete
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
        """

Message Operations Operators

class AzureServiceBusSendMessageOperator(BaseOperator):
    """
    Sends messages to Azure Service Bus queues.
    
    Supports sending single messages or batches with custom
    message properties and routing information.
    """
    
    def __init__(
        self,
        *,
        queue_name: str,
        message: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        batch: bool = False,
        **kwargs
    ):
        """
        Initialize Service Bus send message operator.
        
        Args:
            queue_name (str): Target queue name
            message (str): Message content to send
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            batch (bool): Whether to send as batch message
        """

class AzureServiceBusReceiveMessageOperator(BaseOperator):
    """
    Receives messages from Azure Service Bus queues.
    
    Retrieves messages with configurable receive options
    and message handling parameters.
    """
    
    def __init__(
        self,
        *,
        queue_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        max_message_count: int = 1,
        max_wait_time: int = 5,
        **kwargs
    ):
        """
        Initialize Service Bus receive message operator.
        
        Args:
            queue_name (str): Source queue name
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            max_message_count (int): Maximum messages to receive
            max_wait_time (int): Maximum wait time in seconds
        """

Topic and Subscription Operators

class AzureServiceBusTopicCreateOperator(BaseOperator):
    """
    Creates Azure Service Bus topics.
    
    Supports creating topics with custom configuration
    and partitioning options for publish-subscribe scenarios.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        max_size_in_megabytes: int | None = None,
        enable_partitioning: bool | None = None,
        **kwargs
    ):
        """
        Initialize Service Bus create topic operator.
        
        Args:
            topic_name (str): Name of the topic to create
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            max_size_in_megabytes (int | None): Maximum topic size
            enable_partitioning (bool | None): Enable topic partitioning
        """

class AzureServiceBusTopicDeleteOperator(BaseOperator):
    """
    Deletes Azure Service Bus topics.
    
    Removes topics and all associated subscriptions
    from the Service Bus namespace.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        **kwargs
    ):
        """
        Initialize Service Bus delete topic operator.
        
        Args:
            topic_name (str): Name of the topic to delete
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
        """

class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
    """
    Creates Azure Service Bus subscriptions.
    
    Creates subscriptions for topics with filtering rules
    and message handling configuration.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        subscription_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        max_delivery_count: int | None = None,
        dead_lettering_on_message_expiration: bool | None = None,
        **kwargs
    ):
        """
        Initialize Service Bus create subscription operator.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription to create
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            max_delivery_count (int | None): Maximum delivery attempts
            dead_lettering_on_message_expiration (bool | None): Enable dead lettering
        """

class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
    """
    Deletes Azure Service Bus subscriptions.
    
    Removes subscriptions from topics with proper
    cleanup and error handling.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        subscription_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        **kwargs
    ):
        """
        Initialize Service Bus delete subscription operator.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription to delete
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
        """

class AzureServiceBusUpdateSubscriptionOperator(BaseOperator):
    """
    Updates Azure Service Bus subscriptions.
    
    Modifies subscription properties and configuration
    for existing subscriptions.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        subscription_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        **kwargs
    ):
        """
        Initialize Service Bus update subscription operator.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription to update
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
        """

class ASBReceiveSubscriptionMessageOperator(BaseOperator):
    """
    Receives messages from Azure Service Bus subscriptions.
    
    Retrieves messages from topic subscriptions with
    configurable receive parameters and filtering.
    """
    
    def __init__(
        self,
        *,
        topic_name: str,
        subscription_name: str,
        azure_service_bus_conn_id: str = "azure_service_bus_default",
        max_message_count: int = 1,
        max_wait_time: int = 5,
        **kwargs
    ):
        """
        Initialize Service Bus receive subscription message operator.
        
        Args:
            topic_name (str): Name of the topic
            subscription_name (str): Name of the subscription
            azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
            max_message_count (int): Maximum messages to receive
            max_wait_time (int): Maximum wait time in seconds
        """

Usage Examples

Basic Queue Operations

from airflow import DAG
from airflow.providers.microsoft.azure.operators.asb import (
    AzureServiceBusCreateQueueOperator,
    AzureServiceBusSendMessageOperator,
    AzureServiceBusReceiveMessageOperator,
    AzureServiceBusDeleteQueueOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def process_received_messages(**context):
    """Process messages received from Service Bus."""
    # Get messages from previous task
    messages = context['task_instance'].xcom_pull(task_ids='receive_messages')
    
    for message in messages:
        print(f"Processing message: {message.body}")
        # Message processing logic here
        
    return len(messages)

dag = DAG(
    'service_bus_queue_workflow',
    default_args={
        'owner': 'messaging-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=3)
    },
    description='Service Bus queue messaging workflow',
    schedule_interval=timedelta(minutes=15),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Create queue
create_queue = AzureServiceBusCreateQueueOperator(
    task_id='create_processing_queue',
    queue_name='data-processing-queue',
    azure_service_bus_conn_id='service_bus_conn',
    max_delivery_count=5,
    dead_lettering_on_message_expiration=True,
    dag=dag
)

# Send messages
send_message = AzureServiceBusSendMessageOperator(
    task_id='send_data_message',
    queue_name='data-processing-queue',
    message='{"data": "sample_data", "timestamp": "2024-01-01T10:00:00Z"}',
    azure_service_bus_conn_id='service_bus_conn',
    dag=dag
)

# Receive and process messages
receive_messages = AzureServiceBusReceiveMessageOperator(
    task_id='receive_messages',
    queue_name='data-processing-queue',
    azure_service_bus_conn_id='service_bus_conn',
    max_message_count=10,
    max_wait_time=30,
    dag=dag
)

process_messages = PythonOperator(
    task_id='process_messages',
    python_callable=process_received_messages,
    dag=dag
)

# Cleanup queue (optional)
cleanup_queue = AzureServiceBusDeleteQueueOperator(
    task_id='cleanup_queue',
    queue_name='data-processing-queue',
    azure_service_bus_conn_id='service_bus_conn',
    dag=dag
)

create_queue >> send_message >> receive_messages >> process_messages >> cleanup_queue

Topic and Subscription Pattern

from airflow import DAG
from airflow.providers.microsoft.azure.operators.asb import (
    AzureServiceBusTopicCreateOperator,
    AzureServiceBusSubscriptionCreateOperator,
    AzureServiceBusSendMessageOperator,
    ASBReceiveSubscriptionMessageOperator
)
from airflow.providers.microsoft.azure.hooks.asb import MessageHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def send_notification_messages():
    """Send notification messages to topic."""
    hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
    
    notifications = [
        {"type": "order", "id": "12345", "status": "completed"},
        {"type": "payment", "id": "67890", "status": "processed"},
        {"type": "shipping", "id": "11111", "status": "dispatched"}
    ]
    
    for notification in notifications:
        hook.send_topic_message(
            topic_name='notifications',
            message=str(notification)
        )
    
    print(f"Sent {len(notifications)} notifications")

def process_order_notifications(**context):
    """Process order-specific notifications."""
    messages = context['task_instance'].xcom_pull(task_ids='receive_order_messages')
    
    for message in messages:
        print(f"Processing order notification: {message.body}")
        # Order processing logic here

def process_payment_notifications(**context):
    """Process payment-specific notifications.""" 
    messages = context['task_instance'].xcom_pull(task_ids='receive_payment_messages')
    
    for message in messages:
        print(f"Processing payment notification: {message.body}")
        # Payment processing logic here

dag = DAG(
    'service_bus_topic_workflow',
    default_args={
        'owner': 'notification-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=2)
    },
    description='Service Bus topic notification workflow',
    schedule_interval=timedelta(minutes=30),
    start_date=datetime(2024, 1, 1),
    catchup=False
)

# Create topic
create_topic = AzureServiceBusTopicCreateOperator(
    task_id='create_notifications_topic',
    topic_name='notifications',
    azure_service_bus_conn_id='service_bus_conn',
    max_size_in_megabytes=1024,
    enable_partitioning=True,
    dag=dag
)

# Create subscriptions for different notification types
create_order_subscription = AzureServiceBusSubscriptionCreateOperator(
    task_id='create_order_subscription',
    topic_name='notifications',
    subscription_name='order-processor',
    azure_service_bus_conn_id='service_bus_conn',
    max_delivery_count=3,
    dag=dag
)

create_payment_subscription = AzureServiceBusSubscriptionCreateOperator(
    task_id='create_payment_subscription',
    topic_name='notifications',
    subscription_name='payment-processor',
    azure_service_bus_conn_id='service_bus_conn',
    max_delivery_count=3,
    dag=dag
)

# Send notifications
send_notifications = PythonOperator(
    task_id='send_notifications',
    python_callable=send_notification_messages,
    dag=dag
)

# Receive from subscriptions
receive_order_messages = ASBReceiveSubscriptionMessageOperator(
    task_id='receive_order_messages',
    topic_name='notifications',
    subscription_name='order-processor',
    azure_service_bus_conn_id='service_bus_conn',
    max_message_count=50,
    dag=dag
)

receive_payment_messages = ASBReceiveSubscriptionMessageOperator(
    task_id='receive_payment_messages',
    topic_name='notifications',
    subscription_name='payment-processor',
    azure_service_bus_conn_id='service_bus_conn',
    max_message_count=50,
    dag=dag
)

# Process notifications
process_orders = PythonOperator(
    task_id='process_order_notifications',
    python_callable=process_order_notifications,
    dag=dag
)

process_payments = PythonOperator(
    task_id='process_payment_notifications',
    python_callable=process_payment_notifications,
    dag=dag
)

# Set up dependencies
create_topic >> [create_order_subscription, create_payment_subscription]
[create_order_subscription, create_payment_subscription] >> send_notifications
send_notifications >> [receive_order_messages, receive_payment_messages]
receive_order_messages >> process_orders
receive_payment_messages >> process_payments

Advanced Message Handling

from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
from azure.servicebus import ServiceBusMessage
import json

def advanced_message_operations():
    """Demonstrate advanced Service Bus operations."""
    admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
    message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
    
    # Create queue with advanced configuration
    admin_hook.create_queue(
        queue_name='advanced-queue',
        max_delivery_count=5,
        dead_lettering_on_message_expiration=True,
        default_message_time_to_live=timedelta(hours=24),
        duplicate_detection_history_time_window=timedelta(minutes=10)
    )
    
    # Send message with properties
    message_data = {
        "id": "msg-001",
        "data": "Important business data",
        "timestamp": datetime.now().isoformat()
    }
    
    # Create message with custom properties
    message = ServiceBusMessage(
        body=json.dumps(message_data),
        content_type="application/json",
        message_id="msg-001",
        session_id="session-001"
    )
    
    # Set custom properties
    message.application_properties = {
        "priority": "high",
        "department": "finance",
        "requires_processing": True
    }
    
    message_hook.send_message('advanced-queue', message)
    
    # Receive and process messages
    received_messages = message_hook.receive_message(
        queue_name='advanced-queue',
        max_message_count=10,
        max_wait_time=60
    )
    
    for msg in received_messages:
        print(f"Message ID: {msg.message_id}")
        print(f"Content Type: {msg.content_type}")
        print(f"Properties: {msg.application_properties}")
        print(f"Body: {msg.body}")
        
        # Complete the message to remove it from queue
        message_hook.get_conn().get_queue_receiver('advanced-queue').complete_message(msg)

def monitor_queue_metrics():
    """Monitor Service Bus queue metrics."""
    admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
    
    # Get queue properties and metrics
    queue_properties = admin_hook.get_queue('data-processing-queue')
    
    print(f"Active Message Count: {queue_properties.active_message_count}")
    print(f"Dead Letter Message Count: {queue_properties.dead_letter_message_count}")
    print(f"Scheduled Message Count: {queue_properties.scheduled_message_count}")
    print(f"Size in Bytes: {queue_properties.size_in_bytes}")
    
    # Alert if queue has too many messages
    if queue_properties.active_message_count > 1000:
        print("WARNING: Queue has high message count!")
        
    return {
        'active_messages': queue_properties.active_message_count,
        'dead_letter_messages': queue_properties.dead_letter_message_count,
        'queue_size_bytes': queue_properties.size_in_bytes
    }

Connection Configuration

Service Bus Connection (azure_service_bus)

Configure Azure Service Bus connections in Airflow:

# Connection configuration for Service Bus
{
    "conn_id": "azure_service_bus_default",
    "conn_type": "azure_service_bus", 
    "host": "myservicebus.servicebus.windows.net",
    "extra": {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret"
    }
}

Authentication Methods

Azure Service Bus supports multiple authentication methods:

  1. Service Principal Authentication:

    extra = {
        "tenant_id": "your-tenant-id",
        "client_id": "your-client-id",
        "client_secret": "your-client-secret"
    }
  2. Connection String Authentication:

    extra = {
        "connection_string": "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key"
    }
  3. Managed Identity Authentication:

    extra = {
        "managed_identity_client_id": "your-managed-identity-client-id"
    }
  4. Shared Access Key Authentication:

    extra = {
        "shared_access_key_name": "your-key-name",
        "shared_access_key_value": "your-key-value"
    }

Error Handling

Common Exception Patterns

from azure.servicebus.exceptions import ServiceBusError, MessageAlreadySettled
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook

def robust_message_handling():
    """Demonstrate error handling patterns."""
    admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
    message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
    
    try:
        # Attempt to create queue
        admin_hook.create_queue('test-queue')
    except ServiceBusError as e:
        if "already exists" in str(e).lower():
            print("Queue already exists, continuing...")
        else:
            print(f"Service Bus error: {e}")
            raise
    
    try:
        # Send message with error handling
        message_hook.send_message('test-queue', 'test message')
    except ServiceBusError as e:
        print(f"Failed to send message: {e}")
        # Implement retry logic or alternative handling
        
    try:
        # Receive messages with proper completion
        messages = message_hook.receive_message('test-queue')
        for message in messages:
            try:
                # Process message
                print(f"Processing: {message.body}")
                # Complete message to remove from queue
                message_hook.get_conn().get_queue_receiver('test-queue').complete_message(message)
            except MessageAlreadySettled:
                print("Message was already settled")
            except Exception as e:
                print(f"Failed to process message: {e}")
                # Abandon message to return to queue
                message_hook.get_conn().get_queue_receiver('test-queue').abandon_message(message)
    
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

Connection Testing

def test_service_bus_connection():
    """Test Service Bus connection and capabilities."""
    try:
        admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
        message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
        
        # Test admin operations
        queues = admin_hook.list_queues()
        print(f"Found {len(queues)} queues")
        
        # Test message operations
        success, message = admin_hook.test_connection()
        if success:
            print("Service Bus connection successful")
        else:
            print(f"Service Bus connection failed: {message}")
            
    except Exception as e:
        print(f"Service Bus connection test failed: {e}")

Performance Considerations

Optimizing Message Throughput

def optimized_batch_operations():
    """Optimize Service Bus operations for high throughput."""
    message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
    
    # Use message batching for better performance
    messages = []
    for i in range(100):
        message = ServiceBusMessage(
            body=f"Batch message {i}",
            message_id=f"batch-msg-{i}"
        )
        messages.append(message)
    
    # Send batch of messages
    message_hook.send_list_of_messages('high-throughput-queue', messages)
    
    # Receive messages in batches
    batch_messages = message_hook.receive_message(
        queue_name='high-throughput-queue',
        max_message_count=32,  # Optimal batch size
        max_wait_time=10
    )
    
    # Process messages in parallel
    for message in batch_messages:
        # Process message logic here
        pass

def configure_high_performance_queue():
    """Configure queue for high performance scenarios."""
    admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
    
    admin_hook.create_queue(
        queue_name='high-perf-queue',
        enable_partitioning=True,  # Enable partitioning for higher throughput
        max_size_in_megabytes=5120,  # Larger queue size
        duplicate_detection_history_time_window=timedelta(minutes=1),  # Shorter deduplication window
        enable_batched_operations=True  # Enable batched operations
    )

This comprehensive documentation covers all Azure Service Bus capabilities in the Apache Airflow Microsoft Azure Provider, including administrative operations, message handling, topic/subscription patterns, and performance optimization techniques.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json