CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure

Microsoft Azure Client Libraries for Python meta-package providing comprehensive access to Azure cloud services and management capabilities

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

messaging-integration.mddocs/

Messaging and Integration

Azure messaging and integration services provide reliable message queuing, event distribution, and enterprise integration capabilities. This includes Service Bus for enterprise messaging and Event Grid for event-driven architectures.

Capabilities

Azure Service Bus

Provides reliable cloud messaging as a service and simple hybrid integration. Supports queues for point-to-point communication and topics/subscriptions for publish-subscribe patterns.

class ServiceBusService:
    """
    Client for Azure Service Bus operations.
    
    Parameters:
    - service_namespace: str, Service Bus namespace name (optional)
    - account_key: str, Service Bus access key (optional)  
    - issuer: str, Service Bus issuer (optional)
    - host_base: str, Service Bus host base URL (optional)
    - timeout: int, Request timeout in seconds (optional)
    """
    def __init__(self, service_namespace=None, account_key=None, issuer=None, 
                 host_base=None, timeout=60, **kwargs): ...
    
    # Queue operations
    def create_queue(self, queue_name: str, queue=None, fail_on_exist=False): ...
    def delete_queue(self, queue_name: str, fail_not_exist=False): ...
    def get_queue(self, queue_name: str): ...
    def list_queues(self): ...
    def send_queue_message(self, queue_name: str, message): ...
    def peek_lock_queue_message(self, queue_name: str, timeout=60): ...
    def receive_queue_message(self, queue_name: str, peek_lock=True, timeout=60): ...
    def delete_queue_message(self, queue_name: str, message): ...
    def unlock_queue_message(self, queue_name: str, message): ...
    def renew_lock_queue_message(self, queue_name: str, message): ...
    
    # Topic operations  
    def create_topic(self, topic_name: str, topic=None, fail_on_exist=False): ...
    def delete_topic(self, topic_name: str, fail_not_exist=False): ...
    def get_topic(self, topic_name: str): ...
    def list_topics(self): ...
    def send_topic_message(self, topic_name: str, message): ...
    
    # Subscription operations
    def create_subscription(self, topic_name: str, subscription_name: str, 
                          subscription=None, fail_on_exist=False): ...
    def delete_subscription(self, topic_name: str, subscription_name: str, 
                          fail_not_exist=False): ...
    def get_subscription(self, topic_name: str, subscription_name: str): ...
    def list_subscriptions(self, topic_name: str): ...
    def receive_subscription_message(self, topic_name: str, subscription_name: str, 
                                   peek_lock=True, timeout=60): ...
    def delete_subscription_message(self, topic_name: str, subscription_name: str, message): ...
    def unlock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
    def renew_lock_subscription_message(self, topic_name: str, subscription_name: str, message): ...
    
    # Rule operations
    def create_rule(self, topic_name: str, subscription_name: str, rule_name: str, 
                   rule=None, fail_on_exist=False): ...
    def delete_rule(self, topic_name: str, subscription_name: str, rule_name: str, 
                   fail_not_exist=False): ...
    def get_rule(self, topic_name: str, subscription_name: str, rule_name: str): ...
    def list_rules(self, topic_name: str, subscription_name: str): ...

Service Bus Models

Data models for Service Bus entities and messages.

class Queue:
    """Represents a Service Bus queue."""
    def __init__(self): ...
    
    lock_duration: str  # Message lock duration
    max_size_in_megabytes: int  # Maximum queue size
    requires_duplicate_detection: bool  # Duplicate detection enabled
    requires_session: bool  # Session support enabled
    default_message_time_to_live: str  # Default TTL
    dead_lettering_on_message_expiration: bool  # Dead letter on expiration
    duplicate_detection_history_time_window: str  # Duplicate detection window
    max_delivery_count: int  # Maximum delivery attempts
    enable_batched_operations: bool  # Batched operations enabled
    size_in_bytes: int  # Current size in bytes
    message_count: int  # Current message count

class Topic:
    """Represents a Service Bus topic."""
    def __init__(self): ...
    
    default_message_time_to_live: str  # Default TTL
    max_size_in_megabytes: int  # Maximum topic size
    requires_duplicate_detection: bool  # Duplicate detection enabled
    duplicate_detection_history_time_window: str  # Duplicate detection window
    enable_batched_operations: bool  # Batched operations enabled
    size_in_bytes: int  # Current size in bytes
    filtering_messages_before_publishing: bool  # Pre-publish filtering

class Subscription:
    """Represents a Service Bus subscription."""
    def __init__(self): ...
    
    lock_duration: str  # Message lock duration
    requires_session: bool  # Session support enabled
    default_message_time_to_live: str  # Default TTL
    dead_lettering_on_message_expiration: bool  # Dead letter on expiration
    dead_lettering_on_filter_evaluation_exceptions: bool  # Dead letter on filter errors
    enable_batched_operations: bool  # Batched operations enabled
    max_delivery_count: int  # Maximum delivery attempts
    message_count: int  # Current message count

class Message:
    """Represents a Service Bus message."""
    def __init__(self, body=None, **kwargs): ...
    
    body: str  # Message body
    content_type: str  # Content type
    correlation_id: str  # Correlation identifier
    delivery_count: int  # Delivery attempt count
    enqueued_time_utc: str  # Enqueue timestamp
    expires_at_utc: str  # Expiration timestamp
    label: str  # Message label
    message_id: str  # Message identifier
    reply_to: str  # Reply-to address
    reply_to_session_id: str  # Reply-to session ID
    scheduled_enqueue_time_utc: str  # Scheduled enqueue time
    session_id: str  # Session identifier
    time_to_live: int  # Time to live in seconds
    to: str  # Destination address
    broker_properties: dict  # Broker properties
    custom_properties: dict  # Custom properties

class Rule:
    """Represents a Service Bus subscription rule."""
    def __init__(self): ...
    
    action: object  # Rule action
    filter: object  # Rule filter
    name: str  # Rule name

Service Bus Exceptions

Exception types for Service Bus operations.

class AzureServiceBusPeekLockError(Exception):
    """Exception raised for peek-lock operation errors."""
    pass

class AzureServiceBusResourceNotFound(Exception):
    """Exception raised when a Service Bus resource is not found."""
    pass

Azure Event Grid

Provides event routing and delivery service for building event-driven applications. Supports custom topics and system event subscriptions.

class EventGridClient:
    """
    Client for Azure Event Grid operations.
    
    Parameters:
    - credentials: Authentication credentials
    """
    def __init__(self, credentials, **kwargs): ...
    
    def publish_events(self, topic_hostname: str, events: list, **kwargs):
        """
        Publish events to an Event Grid topic.
        
        Parameters:
        - topic_hostname: str, Event Grid topic hostname
        - events: list, List of events to publish
        """

Event Grid Models

class EventGridEvent:
    """Represents an Event Grid event."""
    def __init__(self, subject: str, event_type: str, data: object, data_version: str, **kwargs): ...
    
    id: str  # Event identifier
    subject: str  # Event subject
    data: object  # Event data
    event_type: str  # Event type
    event_time: str  # Event timestamp
    data_version: str  # Data schema version
    metadata_version: str  # Metadata schema version
    topic: str  # Event topic

class CloudEvent:
    """Represents a CloudEvents specification event."""
    def __init__(self, source: str, type: str, **kwargs): ...
    
    id: str  # Event identifier
    source: str  # Event source
    spec_version: str  # CloudEvents specification version
    type: str  # Event type
    data: object  # Event data
    data_content_type: str  # Data content type
    subject: str  # Event subject
    time: str  # Event timestamp

Constants and Configuration

Service Bus Constants

# Authentication constants
DEFAULT_RULE_NAME: str  # Default subscription rule name
AZURE_SERVICEBUS_NAMESPACE: str  # Environment variable for namespace
AZURE_SERVICEBUS_ACCESS_KEY: str  # Environment variable for access key
AZURE_SERVICEBUS_ISSUER: str  # Environment variable for issuer

# Service endpoints
SERVICE_BUS_HOST_BASE: str  # Service Bus host base URL

# Configuration
DEFAULT_HTTP_TIMEOUT: int  # Default HTTP request timeout

Usage Examples

Working with Service Bus Queues

from azure.servicebus import ServiceBusService
from azure.servicebus.models import Message

# Create Service Bus client
sbs = ServiceBusService(
    service_namespace='mynamespace',
    account_key='mykey'
)

# Create a queue
sbs.create_queue('myqueue')

# Send a message
message = Message('Hello, World!')
message.custom_properties = {'priority': 'high'}
sbs.send_queue_message('myqueue', message)

# Receive a message
message = sbs.receive_queue_message('myqueue', peek_lock=True)
if message.body:
    print(f"Received: {message.body}")
    print(f"Priority: {message.custom_properties.get('priority')}")
    
    # Delete the message after processing
    sbs.delete_queue_message('myqueue', message)

# List queues
queues = sbs.list_queues()
for queue in queues:
    print(f"Queue: {queue.name}, Messages: {queue.message_count}")

Working with Service Bus Topics and Subscriptions

from azure.servicebus import ServiceBusService
from azure.servicebus.models import Message, Topic, Subscription, Rule

# Create topic
topic = Topic()
topic.max_size_in_megabytes = 5120
topic.default_message_time_to_live = 'PT1H'  # 1 hour
sbs.create_topic('mytopic', topic)

# Create subscription
subscription = Subscription()
subscription.lock_duration = 'PT1M'  # 1 minute
subscription.max_delivery_count = 10
sbs.create_subscription('mytopic', 'mysubscription', subscription)

# Send message to topic
message = Message('Topic message')
message.label = 'notification'
sbs.send_topic_message('mytopic', message)

# Receive message from subscription
message = sbs.receive_subscription_message('mytopic', 'mysubscription', peek_lock=True)
if message.body:
    print(f"Received from subscription: {message.body}")
    sbs.delete_subscription_message('mytopic', 'mysubscription', message)

# Create a subscription rule
rule = Rule()
rule.filter = {'type': 'SqlFilter', 'sqlExpression': "label = 'notification'"}
sbs.create_rule('mytopic', 'mysubscription', 'notification-rule', rule)

Working with Event Grid

from azure.eventgrid import EventGridClient
from azure.eventgrid.models import EventGridEvent
from datetime import datetime

# Create Event Grid client
eg_client = EventGridClient(credentials)

# Create events
events = [
    EventGridEvent(
        id='event-1',
        subject='user/signup',
        data={'userId': '123', 'email': 'user@example.com'},
        event_type='UserSignup',
        event_time=datetime.utcnow().isoformat() + 'Z',
        data_version='1.0'
    ),
    EventGridEvent(
        id='event-2', 
        subject='order/created',
        data={'orderId': '456', 'amount': 99.99},
        event_type='OrderCreated',
        event_time=datetime.utcnow().isoformat() + 'Z',
        data_version='1.0'
    )
]

# Publish events
topic_hostname = 'mytopic.westus2-1.eventgrid.azure.net'
eg_client.publish_events(topic_hostname, events)
print(f"Published {len(events)} events to {topic_hostname}")

Error Handling

from azure.servicebus.models import AzureServiceBusResourceNotFound, AzureServiceBusPeekLockError

try:
    # Attempt to get a non-existent queue
    queue = sbs.get_queue('nonexistent-queue')
except AzureServiceBusResourceNotFound:
    print("Queue not found")

try:
    # Attempt to receive from empty queue with short timeout
    message = sbs.receive_queue_message('myqueue', timeout=1)
    if not message.body:
        print("No messages available")
except AzureServiceBusPeekLockError as e:
    print(f"Peek-lock error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-azure

docs

compute-services.md

index.md

messaging-integration.md

monitoring-analytics.md

platform-services.md

resource-management.md

security-identity.md

tile.json