Microsoft Azure Client Libraries for Python meta-package providing comprehensive access to Azure cloud services and management capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Azure messaging and integration services provide reliable message queuing, event distribution, and enterprise integration capabilities. This includes Service Bus for enterprise messaging and Event Grid for event-driven architectures.
Provides reliable cloud messaging as a service and simple hybrid integration. Supports queues for point-to-point communication and topics/subscriptions for publish-subscribe patterns.
class ServiceBusService:
"""
Client for Azure Service Bus operations.
Parameters:
- service_namespace: str, Service Bus namespace name (optional)
- account_key: str, Service Bus access key (optional)
- issuer: str, Service Bus issuer (optional)
- host_base: str, Service Bus host base URL (optional)
- timeout: int, Request timeout in seconds (optional)
"""
def __init__(self, service_namespace=None, account_key=None, issuer=None,
host_base=None, timeout=60, **kwargs): ...
# Queue operations
def create_queue(self, queue_name: str, queue=None, fail_on_exist=False): ...
def delete_queue(self, queue_name: str, fail_not_exist=False): ...
def get_queue(self, queue_name: str): ...
def list_queues(self): ...
def send_queue_message(self, queue_name: str, message): ...
def peek_lock_queue_message(self, queue_name: str, timeout=60): ...
def receive_queue_message(self, queue_name: str, peek_lock=True, timeout=60): ...
def delete_queue_message(self, queue_name: str, message): ...
def unlock_queue_message(self, queue_name: str, message): ...
def renew_lock_queue_message(self, queue_name: str, message): ...
# Topic operations
def create_topic(self, topic_name: str, topic=None, fail_on_exist=False): ...
def delete_topic(self, topic_name: str, fail_not_exist=False): ...
def get_topic(self, topic_name: str): ...
def list_topics(self): ...
def send_topic_message(self, topic_name: str, message): ...
# Subscription operations
def create_subscription(self, topic_name: str, subscription_name: str,
subscription=None, fail_on_exist=False): ...
def delete_subscription(self, topic_name: str, subscription_name: str,
fail_not_exist=False): ...
def get_subscription(self, topic_name: str, subscription_name: str): ...
def list_subscriptions(self, topic_name: str): ...
def receive_subscription_message(self, topic_name: str, subscription_name: str,
peek_lock=True, timeout=60): ...
def delete_subscription_message(self, topic_name: str, subscription_name: str, message): ...
def unlock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
def renew_lock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
# Rule operations
def create_rule(self, topic_name: str, subscription_name: str, rule_name: str,
rule=None, fail_on_exist=False): ...
def delete_rule(self, topic_name: str, subscription_name: str, rule_name: str,
fail_not_exist=False): ...
def get_rule(self, topic_name: str, subscription_name: str, rule_name: str): ...
def list_rules(self, topic_name: str, subscription_name: str): ...Data models for Service Bus entities and messages.
class Queue:
"""Represents a Service Bus queue."""
def __init__(self): ...
lock_duration: str # Message lock duration
max_size_in_megabytes: int # Maximum queue size
requires_duplicate_detection: bool # Duplicate detection enabled
requires_session: bool # Session support enabled
default_message_time_to_live: str # Default TTL
dead_lettering_on_message_expiration: bool # Dead letter on expiration
duplicate_detection_history_time_window: str # Duplicate detection window
max_delivery_count: int # Maximum delivery attempts
enable_batched_operations: bool # Batched operations enabled
size_in_bytes: int # Current size in bytes
message_count: int # Current message count
class Topic:
"""Represents a Service Bus topic."""
def __init__(self): ...
default_message_time_to_live: str # Default TTL
max_size_in_megabytes: int # Maximum topic size
requires_duplicate_detection: bool # Duplicate detection enabled
duplicate_detection_history_time_window: str # Duplicate detection window
enable_batched_operations: bool # Batched operations enabled
size_in_bytes: int # Current size in bytes
filtering_messages_before_publishing: bool # Pre-publish filtering
class Subscription:
"""Represents a Service Bus subscription."""
def __init__(self): ...
lock_duration: str # Message lock duration
requires_session: bool # Session support enabled
default_message_time_to_live: str # Default TTL
dead_lettering_on_message_expiration: bool # Dead letter on expiration
dead_lettering_on_filter_evaluation_exceptions: bool # Dead letter on filter errors
enable_batched_operations: bool # Batched operations enabled
max_delivery_count: int # Maximum delivery attempts
message_count: int # Current message count
class Message:
"""Represents a Service Bus message."""
def __init__(self, body=None, **kwargs): ...
body: str # Message body
content_type: str # Content type
correlation_id: str # Correlation identifier
delivery_count: int # Delivery attempt count
enqueued_time_utc: str # Enqueue timestamp
expires_at_utc: str # Expiration timestamp
label: str # Message label
message_id: str # Message identifier
reply_to: str # Reply-to address
reply_to_session_id: str # Reply-to session ID
scheduled_enqueue_time_utc: str # Scheduled enqueue time
session_id: str # Session identifier
time_to_live: int # Time to live in seconds
to: str # Destination address
broker_properties: dict # Broker properties
custom_properties: dict # Custom properties
class Rule:
"""Represents a Service Bus subscription rule."""
def __init__(self): ...
action: object # Rule action
filter: object # Rule filter
name: str # Rule nameException types for Service Bus operations.
class AzureServiceBusPeekLockError(Exception):
"""Exception raised for peek-lock operation errors."""
pass
class AzureServiceBusResourceNotFound(Exception):
"""Exception raised when a Service Bus resource is not found."""
passProvides event routing and delivery service for building event-driven applications. Supports custom topics and system event subscriptions.
class EventGridClient:
"""
Client for Azure Event Grid operations.
Parameters:
- credentials: Authentication credentials
"""
def __init__(self, credentials, **kwargs): ...
def publish_events(self, topic_hostname: str, events: list, **kwargs):
"""
Publish events to an Event Grid topic.
Parameters:
- topic_hostname: str, Event Grid topic hostname
- events: list, List of events to publish
"""class EventGridEvent:
"""Represents an Event Grid event."""
def __init__(self, subject: str, event_type: str, data: object, data_version: str, **kwargs): ...
id: str # Event identifier
subject: str # Event subject
data: object # Event data
event_type: str # Event type
event_time: str # Event timestamp
data_version: str # Data schema version
metadata_version: str # Metadata schema version
topic: str # Event topic
class CloudEvent:
"""Represents a CloudEvents specification event."""
def __init__(self, source: str, type: str, **kwargs): ...
id: str # Event identifier
source: str # Event source
spec_version: str # CloudEvents specification version
type: str # Event type
data: object # Event data
data_content_type: str # Data content type
subject: str # Event subject
time: str # Event timestamp# Authentication constants
DEFAULT_RULE_NAME: str # Default subscription rule name
AZURE_SERVICEBUS_NAMESPACE: str # Environment variable for namespace
AZURE_SERVICEBUS_ACCESS_KEY: str # Environment variable for access key
AZURE_SERVICEBUS_ISSUER: str # Environment variable for issuer
# Service endpoints
SERVICE_BUS_HOST_BASE: str # Service Bus host base URL
# Configuration
DEFAULT_HTTP_TIMEOUT: int # Default HTTP request timeoutfrom azure.servicebus import ServiceBusService
from azure.servicebus.models import Message
# Create Service Bus client
sbs = ServiceBusService(
service_namespace='mynamespace',
account_key='mykey'
)
# Create a queue
sbs.create_queue('myqueue')
# Send a message
message = Message('Hello, World!')
message.custom_properties = {'priority': 'high'}
sbs.send_queue_message('myqueue', message)
# Receive a message
message = sbs.receive_queue_message('myqueue', peek_lock=True)
if message.body:
print(f"Received: {message.body}")
print(f"Priority: {message.custom_properties.get('priority')}")
# Delete the message after processing
sbs.delete_queue_message('myqueue', message)
# List queues
queues = sbs.list_queues()
for queue in queues:
print(f"Queue: {queue.name}, Messages: {queue.message_count}")from azure.servicebus import ServiceBusService
from azure.servicebus.models import Message, Topic, Subscription, Rule
# Create topic
topic = Topic()
topic.max_size_in_megabytes = 5120
topic.default_message_time_to_live = 'PT1H' # 1 hour
sbs.create_topic('mytopic', topic)
# Create subscription
subscription = Subscription()
subscription.lock_duration = 'PT1M' # 1 minute
subscription.max_delivery_count = 10
sbs.create_subscription('mytopic', 'mysubscription', subscription)
# Send message to topic
message = Message('Topic message')
message.label = 'notification'
sbs.send_topic_message('mytopic', message)
# Receive message from subscription
message = sbs.receive_subscription_message('mytopic', 'mysubscription', peek_lock=True)
if message.body:
print(f"Received from subscription: {message.body}")
sbs.delete_subscription_message('mytopic', 'mysubscription', message)
# Create a subscription rule
rule = Rule()
rule.filter = {'type': 'SqlFilter', 'sqlExpression': "label = 'notification'"}
sbs.create_rule('mytopic', 'mysubscription', 'notification-rule', rule)from azure.eventgrid import EventGridClient
from azure.eventgrid.models import EventGridEvent
from datetime import datetime
# Create Event Grid client
eg_client = EventGridClient(credentials)
# Create events
events = [
EventGridEvent(
id='event-1',
subject='user/signup',
data={'userId': '123', 'email': 'user@example.com'},
event_type='UserSignup',
event_time=datetime.utcnow().isoformat() + 'Z',
data_version='1.0'
),
EventGridEvent(
id='event-2',
subject='order/created',
data={'orderId': '456', 'amount': 99.99},
event_type='OrderCreated',
event_time=datetime.utcnow().isoformat() + 'Z',
data_version='1.0'
)
]
# Publish events
topic_hostname = 'mytopic.westus2-1.eventgrid.azure.net'
eg_client.publish_events(topic_hostname, events)
print(f"Published {len(events)} events to {topic_hostname}")from azure.servicebus.models import AzureServiceBusResourceNotFound, AzureServiceBusPeekLockError
try:
# Attempt to get a non-existent queue
queue = sbs.get_queue('nonexistent-queue')
except AzureServiceBusResourceNotFound:
print("Queue not found")
try:
# Attempt to receive from empty queue with short timeout
message = sbs.receive_queue_message('myqueue', timeout=1)
if not message.body:
print("No messages available")
except AzureServiceBusPeekLockError as e:
print(f"Peek-lock error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-azure