Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete Azure Service Bus integration providing comprehensive messaging capabilities including queue and topic management, message operations, subscription handling, and administrative functions for reliable messaging scenarios.
Foundation hook for Azure Service Bus operations providing common functionality and connection management.
class BaseAzureServiceBusHook(BaseHook):
"""
Base hook for Azure Service Bus operations.
Provides common functionality and connection management for Service Bus
administrative and message operations.
"""
def get_conn(self) -> ServiceBusClient:
"""
Get authenticated Azure Service Bus client.
Returns:
ServiceBusClient: Service Bus client instance
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Azure Service Bus connection.
Returns:
tuple[bool, str]: Success status and message
"""Hook for Azure Service Bus administrative operations including queue and topic management.
class AdminClientHook(BaseAzureServiceBusHook):
"""
Hook for Azure Service Bus administrative operations.
Provides methods for managing queues, topics, subscriptions, and other
Service Bus administrative tasks.
"""
def get_conn(self) -> ServiceBusAdministrationClient:
"""
Get authenticated Service Bus administration client.
Returns:
ServiceBusAdministrationClient: Administration client instance
"""
def create_queue(
self,
queue_name: str,
max_delivery_count: int | None = None,
dead_lettering_on_message_expiration: bool | None = None,
**kwargs: Any
) -> None:
"""
Create a new Service Bus queue.
Args:
queue_name (str): Name of the queue to create
max_delivery_count (int | None): Maximum delivery attempts
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
**kwargs: Additional queue properties
"""
def delete_queue(self, queue_name: str) -> None:
"""
Delete a Service Bus queue.
Args:
queue_name (str): Name of the queue to delete
"""
def get_queue(self, queue_name: str) -> QueueProperties:
"""
Get properties of a Service Bus queue.
Args:
queue_name (str): Name of the queue
Returns:
QueueProperties: Queue configuration and runtime information
"""
def list_queues(self) -> list[QueueProperties]:
"""
List all queues in the Service Bus namespace.
Returns:
list[QueueProperties]: List of queue properties
"""
def update_queue(
self,
queue_name: str,
queue_properties: QueueProperties,
**kwargs: Any
) -> QueueProperties:
"""
Update an existing Service Bus queue.
Args:
queue_name (str): Name of the queue to update
queue_properties (QueueProperties): New queue properties
**kwargs: Additional update parameters
Returns:
QueueProperties: Updated queue properties
"""
def create_topic(
self,
topic_name: str,
max_size_in_megabytes: int | None = None,
enable_partitioning: bool | None = None,
**kwargs: Any
) -> None:
"""
Create a new Service Bus topic.
Args:
topic_name (str): Name of the topic to create
max_size_in_megabytes (int | None): Maximum topic size
enable_partitioning (bool | None): Enable topic partitioning
**kwargs: Additional topic properties
"""
def delete_topic(self, topic_name: str) -> None:
"""
Delete a Service Bus topic.
Args:
topic_name (str): Name of the topic to delete
"""
def get_topic(self, topic_name: str) -> TopicProperties:
"""
Get properties of a Service Bus topic.
Args:
topic_name (str): Name of the topic
Returns:
TopicProperties: Topic configuration and runtime information
"""
def list_topics(self) -> list[TopicProperties]:
"""
List all topics in the Service Bus namespace.
Returns:
list[TopicProperties]: List of topic properties
"""
def create_subscription(
self,
topic_name: str,
subscription_name: str,
max_delivery_count: int | None = None,
dead_lettering_on_message_expiration: bool | None = None,
**kwargs: Any
) -> None:
"""
Create a subscription for a Service Bus topic.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription to create
max_delivery_count (int | None): Maximum delivery attempts
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
**kwargs: Additional subscription properties
"""
def delete_subscription(
self,
topic_name: str,
subscription_name: str
) -> None:
"""
Delete a subscription from a Service Bus topic.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription to delete
"""
def get_subscription(
self,
topic_name: str,
subscription_name: str
) -> SubscriptionProperties:
"""
Get properties of a Service Bus subscription.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription
Returns:
SubscriptionProperties: Subscription configuration and runtime information
"""
def list_subscriptions(self, topic_name: str) -> list[SubscriptionProperties]:
"""
List all subscriptions for a Service Bus topic.
Args:
topic_name (str): Name of the topic
Returns:
list[SubscriptionProperties]: List of subscription properties
"""
def update_subscription(
self,
topic_name: str,
subscription_name: str,
subscription_properties: SubscriptionProperties,
**kwargs: Any
) -> SubscriptionProperties:
"""
Update an existing Service Bus subscription.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription
subscription_properties (SubscriptionProperties): New subscription properties
**kwargs: Additional update parameters
Returns:
SubscriptionProperties: Updated subscription properties
"""Hook for Azure Service Bus message operations including sending and receiving messages.
class MessageHook(BaseAzureServiceBusHook):
"""
Hook for Azure Service Bus message operations.
Provides methods for sending and receiving messages from queues and topics,
with support for message batching and transaction handling.
"""
def get_conn(self) -> ServiceBusClient:
"""
Get authenticated Service Bus client for message operations.
Returns:
ServiceBusClient: Service Bus client instance
"""
def send_message(
self,
queue_name: str,
message: str | ServiceBusMessage,
batch: bool = False,
**kwargs: Any
) -> None:
"""
Send a message to a Service Bus queue.
Args:
queue_name (str): Name of the target queue
message (str | ServiceBusMessage): Message content or ServiceBusMessage object
batch (bool): Whether to send as part of a batch (default: False)
**kwargs: Additional message properties
"""
def send_list_of_messages(
self,
queue_name: str,
messages: list[ServiceBusMessage],
**kwargs: Any
) -> None:
"""
Send multiple messages to a Service Bus queue.
Args:
queue_name (str): Name of the target queue
messages (list[ServiceBusMessage]): List of messages to send
**kwargs: Additional send parameters
"""
def receive_message(
self,
queue_name: str,
max_message_count: int = 1,
max_wait_time: int = 5,
**kwargs: Any
) -> list[ServiceBusReceivedMessage]:
"""
Receive messages from a Service Bus queue.
Args:
queue_name (str): Name of the source queue
max_message_count (int): Maximum number of messages to receive (default: 1)
max_wait_time (int): Maximum wait time in seconds (default: 5)
**kwargs: Additional receive parameters
Returns:
list[ServiceBusReceivedMessage]: List of received messages
"""
def peek_messages(
self,
queue_name: str,
message_count: int = 1,
sequence_number: int | None = None
) -> list[ServiceBusReceivedMessage]:
"""
Peek at messages in a Service Bus queue without removing them.
Args:
queue_name (str): Name of the queue to peek
message_count (int): Number of messages to peek (default: 1)
sequence_number (int | None): Starting sequence number
Returns:
list[ServiceBusReceivedMessage]: List of peeked messages
"""
def send_topic_message(
self,
topic_name: str,
message: str | ServiceBusMessage,
batch: bool = False,
**kwargs: Any
) -> None:
"""
Send a message to a Service Bus topic.
Args:
topic_name (str): Name of the target topic
message (str | ServiceBusMessage): Message content or ServiceBusMessage object
batch (bool): Whether to send as part of a batch (default: False)
**kwargs: Additional message properties
"""
def receive_subscription_message(
self,
topic_name: str,
subscription_name: str,
max_message_count: int = 1,
max_wait_time: int = 5,
**kwargs: Any
) -> list[ServiceBusReceivedMessage]:
"""
Receive messages from a Service Bus subscription.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription
max_message_count (int): Maximum number of messages to receive (default: 1)
max_wait_time (int): Maximum wait time in seconds (default: 5)
**kwargs: Additional receive parameters
Returns:
list[ServiceBusReceivedMessage]: List of received messages
"""Execute Azure Service Bus operations as Airflow tasks with comprehensive queue and topic management capabilities.
class AzureServiceBusCreateQueueOperator(BaseOperator):
"""
Creates Azure Service Bus queues.
Supports creating queues with custom properties and configuration
options for messaging requirements.
"""
def __init__(
self,
*,
queue_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
max_delivery_count: int | None = None,
dead_lettering_on_message_expiration: bool | None = None,
**kwargs
):
"""
Initialize Service Bus create queue operator.
Args:
queue_name (str): Name of the queue to create
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
max_delivery_count (int | None): Maximum delivery attempts
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
"""
class AzureServiceBusDeleteQueueOperator(BaseOperator):
"""
Deletes Azure Service Bus queues.
Removes queues from the Service Bus namespace with
error handling for non-existent queues.
"""
def __init__(
self,
*,
queue_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
**kwargs
):
"""
Initialize Service Bus delete queue operator.
Args:
queue_name (str): Name of the queue to delete
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
"""class AzureServiceBusSendMessageOperator(BaseOperator):
"""
Sends messages to Azure Service Bus queues.
Supports sending single messages or batches with custom
message properties and routing information.
"""
def __init__(
self,
*,
queue_name: str,
message: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
batch: bool = False,
**kwargs
):
"""
Initialize Service Bus send message operator.
Args:
queue_name (str): Target queue name
message (str): Message content to send
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
batch (bool): Whether to send as batch message
"""
class AzureServiceBusReceiveMessageOperator(BaseOperator):
"""
Receives messages from Azure Service Bus queues.
Retrieves messages with configurable receive options
and message handling parameters.
"""
def __init__(
self,
*,
queue_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
max_message_count: int = 1,
max_wait_time: int = 5,
**kwargs
):
"""
Initialize Service Bus receive message operator.
Args:
queue_name (str): Source queue name
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
max_message_count (int): Maximum messages to receive
max_wait_time (int): Maximum wait time in seconds
"""class AzureServiceBusTopicCreateOperator(BaseOperator):
"""
Creates Azure Service Bus topics.
Supports creating topics with custom configuration
and partitioning options for publish-subscribe scenarios.
"""
def __init__(
self,
*,
topic_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
max_size_in_megabytes: int | None = None,
enable_partitioning: bool | None = None,
**kwargs
):
"""
Initialize Service Bus create topic operator.
Args:
topic_name (str): Name of the topic to create
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
max_size_in_megabytes (int | None): Maximum topic size
enable_partitioning (bool | None): Enable topic partitioning
"""
class AzureServiceBusTopicDeleteOperator(BaseOperator):
"""
Deletes Azure Service Bus topics.
Removes topics and all associated subscriptions
from the Service Bus namespace.
"""
def __init__(
self,
*,
topic_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
**kwargs
):
"""
Initialize Service Bus delete topic operator.
Args:
topic_name (str): Name of the topic to delete
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
"""
class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
"""
Creates Azure Service Bus subscriptions.
Creates subscriptions for topics with filtering rules
and message handling configuration.
"""
def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
max_delivery_count: int | None = None,
dead_lettering_on_message_expiration: bool | None = None,
**kwargs
):
"""
Initialize Service Bus create subscription operator.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription to create
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
max_delivery_count (int | None): Maximum delivery attempts
dead_lettering_on_message_expiration (bool | None): Enable dead lettering
"""
class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
"""
Deletes Azure Service Bus subscriptions.
Removes subscriptions from topics with proper
cleanup and error handling.
"""
def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
**kwargs
):
"""
Initialize Service Bus delete subscription operator.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription to delete
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
"""
class AzureServiceBusUpdateSubscriptionOperator(BaseOperator):
"""
Updates Azure Service Bus subscriptions.
Modifies subscription properties and configuration
for existing subscriptions.
"""
def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
**kwargs
):
"""
Initialize Service Bus update subscription operator.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription to update
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
"""
class ASBReceiveSubscriptionMessageOperator(BaseOperator):
"""
Receives messages from Azure Service Bus subscriptions.
Retrieves messages from topic subscriptions with
configurable receive parameters and filtering.
"""
def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
max_message_count: int = 1,
max_wait_time: int = 5,
**kwargs
):
"""
Initialize Service Bus receive subscription message operator.
Args:
topic_name (str): Name of the topic
subscription_name (str): Name of the subscription
azure_service_bus_conn_id (str): Airflow connection ID for Service Bus
max_message_count (int): Maximum messages to receive
max_wait_time (int): Maximum wait time in seconds
"""from airflow import DAG
from airflow.providers.microsoft.azure.operators.asb import (
AzureServiceBusCreateQueueOperator,
AzureServiceBusSendMessageOperator,
AzureServiceBusReceiveMessageOperator,
AzureServiceBusDeleteQueueOperator
)
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def process_received_messages(**context):
"""Process messages received from Service Bus."""
# Get messages from previous task
messages = context['task_instance'].xcom_pull(task_ids='receive_messages')
for message in messages:
print(f"Processing message: {message.body}")
# Message processing logic here
return len(messages)
dag = DAG(
'service_bus_queue_workflow',
default_args={
'owner': 'messaging-team',
'retries': 2,
'retry_delay': timedelta(minutes=3)
},
description='Service Bus queue messaging workflow',
schedule_interval=timedelta(minutes=15),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Create queue
create_queue = AzureServiceBusCreateQueueOperator(
task_id='create_processing_queue',
queue_name='data-processing-queue',
azure_service_bus_conn_id='service_bus_conn',
max_delivery_count=5,
dead_lettering_on_message_expiration=True,
dag=dag
)
# Send messages
send_message = AzureServiceBusSendMessageOperator(
task_id='send_data_message',
queue_name='data-processing-queue',
message='{"data": "sample_data", "timestamp": "2024-01-01T10:00:00Z"}',
azure_service_bus_conn_id='service_bus_conn',
dag=dag
)
# Receive and process messages
receive_messages = AzureServiceBusReceiveMessageOperator(
task_id='receive_messages',
queue_name='data-processing-queue',
azure_service_bus_conn_id='service_bus_conn',
max_message_count=10,
max_wait_time=30,
dag=dag
)
process_messages = PythonOperator(
task_id='process_messages',
python_callable=process_received_messages,
dag=dag
)
# Cleanup queue (optional)
cleanup_queue = AzureServiceBusDeleteQueueOperator(
task_id='cleanup_queue',
queue_name='data-processing-queue',
azure_service_bus_conn_id='service_bus_conn',
dag=dag
)
create_queue >> send_message >> receive_messages >> process_messages >> cleanup_queuefrom airflow import DAG
from airflow.providers.microsoft.azure.operators.asb import (
AzureServiceBusTopicCreateOperator,
AzureServiceBusSubscriptionCreateOperator,
AzureServiceBusSendMessageOperator,
ASBReceiveSubscriptionMessageOperator
)
from airflow.providers.microsoft.azure.hooks.asb import MessageHook
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def send_notification_messages():
"""Send notification messages to topic."""
hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
notifications = [
{"type": "order", "id": "12345", "status": "completed"},
{"type": "payment", "id": "67890", "status": "processed"},
{"type": "shipping", "id": "11111", "status": "dispatched"}
]
for notification in notifications:
hook.send_topic_message(
topic_name='notifications',
message=str(notification)
)
print(f"Sent {len(notifications)} notifications")
def process_order_notifications(**context):
"""Process order-specific notifications."""
messages = context['task_instance'].xcom_pull(task_ids='receive_order_messages')
for message in messages:
print(f"Processing order notification: {message.body}")
# Order processing logic here
def process_payment_notifications(**context):
"""Process payment-specific notifications."""
messages = context['task_instance'].xcom_pull(task_ids='receive_payment_messages')
for message in messages:
print(f"Processing payment notification: {message.body}")
# Payment processing logic here
dag = DAG(
'service_bus_topic_workflow',
default_args={
'owner': 'notification-team',
'retries': 1,
'retry_delay': timedelta(minutes=2)
},
description='Service Bus topic notification workflow',
schedule_interval=timedelta(minutes=30),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Create topic
create_topic = AzureServiceBusTopicCreateOperator(
task_id='create_notifications_topic',
topic_name='notifications',
azure_service_bus_conn_id='service_bus_conn',
max_size_in_megabytes=1024,
enable_partitioning=True,
dag=dag
)
# Create subscriptions for different notification types
create_order_subscription = AzureServiceBusSubscriptionCreateOperator(
task_id='create_order_subscription',
topic_name='notifications',
subscription_name='order-processor',
azure_service_bus_conn_id='service_bus_conn',
max_delivery_count=3,
dag=dag
)
create_payment_subscription = AzureServiceBusSubscriptionCreateOperator(
task_id='create_payment_subscription',
topic_name='notifications',
subscription_name='payment-processor',
azure_service_bus_conn_id='service_bus_conn',
max_delivery_count=3,
dag=dag
)
# Send notifications
send_notifications = PythonOperator(
task_id='send_notifications',
python_callable=send_notification_messages,
dag=dag
)
# Receive from subscriptions
receive_order_messages = ASBReceiveSubscriptionMessageOperator(
task_id='receive_order_messages',
topic_name='notifications',
subscription_name='order-processor',
azure_service_bus_conn_id='service_bus_conn',
max_message_count=50,
dag=dag
)
receive_payment_messages = ASBReceiveSubscriptionMessageOperator(
task_id='receive_payment_messages',
topic_name='notifications',
subscription_name='payment-processor',
azure_service_bus_conn_id='service_bus_conn',
max_message_count=50,
dag=dag
)
# Process notifications
process_orders = PythonOperator(
task_id='process_order_notifications',
python_callable=process_order_notifications,
dag=dag
)
process_payments = PythonOperator(
task_id='process_payment_notifications',
python_callable=process_payment_notifications,
dag=dag
)
# Set up dependencies
create_topic >> [create_order_subscription, create_payment_subscription]
[create_order_subscription, create_payment_subscription] >> send_notifications
send_notifications >> [receive_order_messages, receive_payment_messages]
receive_order_messages >> process_orders
receive_payment_messages >> process_paymentsfrom airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
from azure.servicebus import ServiceBusMessage
import json
def advanced_message_operations():
"""Demonstrate advanced Service Bus operations."""
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
# Create queue with advanced configuration
admin_hook.create_queue(
queue_name='advanced-queue',
max_delivery_count=5,
dead_lettering_on_message_expiration=True,
default_message_time_to_live=timedelta(hours=24),
duplicate_detection_history_time_window=timedelta(minutes=10)
)
# Send message with properties
message_data = {
"id": "msg-001",
"data": "Important business data",
"timestamp": datetime.now().isoformat()
}
# Create message with custom properties
message = ServiceBusMessage(
body=json.dumps(message_data),
content_type="application/json",
message_id="msg-001",
session_id="session-001"
)
# Set custom properties
message.application_properties = {
"priority": "high",
"department": "finance",
"requires_processing": True
}
message_hook.send_message('advanced-queue', message)
# Receive and process messages
received_messages = message_hook.receive_message(
queue_name='advanced-queue',
max_message_count=10,
max_wait_time=60
)
for msg in received_messages:
print(f"Message ID: {msg.message_id}")
print(f"Content Type: {msg.content_type}")
print(f"Properties: {msg.application_properties}")
print(f"Body: {msg.body}")
# Complete the message to remove it from queue
message_hook.get_conn().get_queue_receiver('advanced-queue').complete_message(msg)
def monitor_queue_metrics():
"""Monitor Service Bus queue metrics."""
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
# Get queue properties and metrics
queue_properties = admin_hook.get_queue('data-processing-queue')
print(f"Active Message Count: {queue_properties.active_message_count}")
print(f"Dead Letter Message Count: {queue_properties.dead_letter_message_count}")
print(f"Scheduled Message Count: {queue_properties.scheduled_message_count}")
print(f"Size in Bytes: {queue_properties.size_in_bytes}")
# Alert if queue has too many messages
if queue_properties.active_message_count > 1000:
print("WARNING: Queue has high message count!")
return {
'active_messages': queue_properties.active_message_count,
'dead_letter_messages': queue_properties.dead_letter_message_count,
'queue_size_bytes': queue_properties.size_in_bytes
}azure_service_bus)Configure Azure Service Bus connections in Airflow:
# Connection configuration for Service Bus
{
"conn_id": "azure_service_bus_default",
"conn_type": "azure_service_bus",
"host": "myservicebus.servicebus.windows.net",
"extra": {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret"
}
}Azure Service Bus supports multiple authentication methods:
Service Principal Authentication:
extra = {
"tenant_id": "your-tenant-id",
"client_id": "your-client-id",
"client_secret": "your-client-secret"
}Connection String Authentication:
extra = {
"connection_string": "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-key"
}Managed Identity Authentication:
extra = {
"managed_identity_client_id": "your-managed-identity-client-id"
}Shared Access Key Authentication:
extra = {
"shared_access_key_name": "your-key-name",
"shared_access_key_value": "your-key-value"
}from azure.servicebus.exceptions import ServiceBusError, MessageAlreadySettled
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
def robust_message_handling():
"""Demonstrate error handling patterns."""
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
try:
# Attempt to create queue
admin_hook.create_queue('test-queue')
except ServiceBusError as e:
if "already exists" in str(e).lower():
print("Queue already exists, continuing...")
else:
print(f"Service Bus error: {e}")
raise
try:
# Send message with error handling
message_hook.send_message('test-queue', 'test message')
except ServiceBusError as e:
print(f"Failed to send message: {e}")
# Implement retry logic or alternative handling
try:
# Receive messages with proper completion
messages = message_hook.receive_message('test-queue')
for message in messages:
try:
# Process message
print(f"Processing: {message.body}")
# Complete message to remove from queue
message_hook.get_conn().get_queue_receiver('test-queue').complete_message(message)
except MessageAlreadySettled:
print("Message was already settled")
except Exception as e:
print(f"Failed to process message: {e}")
# Abandon message to return to queue
message_hook.get_conn().get_queue_receiver('test-queue').abandon_message(message)
except Exception as e:
print(f"Unexpected error: {e}")
raisedef test_service_bus_connection():
"""Test Service Bus connection and capabilities."""
try:
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
# Test admin operations
queues = admin_hook.list_queues()
print(f"Found {len(queues)} queues")
# Test message operations
success, message = admin_hook.test_connection()
if success:
print("Service Bus connection successful")
else:
print(f"Service Bus connection failed: {message}")
except Exception as e:
print(f"Service Bus connection test failed: {e}")def optimized_batch_operations():
"""Optimize Service Bus operations for high throughput."""
message_hook = MessageHook(azure_service_bus_conn_id='service_bus_conn')
# Use message batching for better performance
messages = []
for i in range(100):
message = ServiceBusMessage(
body=f"Batch message {i}",
message_id=f"batch-msg-{i}"
)
messages.append(message)
# Send batch of messages
message_hook.send_list_of_messages('high-throughput-queue', messages)
# Receive messages in batches
batch_messages = message_hook.receive_message(
queue_name='high-throughput-queue',
max_message_count=32, # Optimal batch size
max_wait_time=10
)
# Process messages in parallel
for message in batch_messages:
# Process message logic here
pass
def configure_high_performance_queue():
"""Configure queue for high performance scenarios."""
admin_hook = AdminClientHook(azure_service_bus_conn_id='service_bus_conn')
admin_hook.create_queue(
queue_name='high-perf-queue',
enable_partitioning=True, # Enable partitioning for higher throughput
max_size_in_megabytes=5120, # Larger queue size
duplicate_detection_history_time_window=timedelta(minutes=1), # Shorter deduplication window
enable_batched_operations=True # Enable batched operations
)This comprehensive documentation covers all Azure Service Bus capabilities in the Apache Airflow Microsoft Azure Provider, including administrative operations, message handling, topic/subscription patterns, and performance optimization techniques.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure