CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

message-operations.mddocs/

Message Operations

Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.

Capabilities

Message Sending

Send messages to queues and topics with support for single messages, message lists, and batches.

class ServiceBusSender:
    @property
    def fully_qualified_namespace(self) -> str:
        """The fully qualified namespace URL of the Service Bus."""
    
    @property
    def entity_name(self) -> str:
        """The name of the entity (queue or topic) that the sender sends to."""

    def send_messages(
        self,
        message: Union[ServiceBusMessage, ServiceBusMessageBatch, List[ServiceBusMessage]],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> None:
        """
        Send messages to the Service Bus entity.

        Parameters:
        - message: Single message, list of messages, or message batch to send
        - timeout: Operation timeout in seconds

        Raises:
        - MessageSizeExceededError: If message exceeds size limits
        - ServiceBusError: For other Service Bus related errors
        """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage

client = ServiceBusClient.from_connection_string("your_connection_string")

with client.get_queue_sender("my-queue") as sender:
    # Send a single message
    message = ServiceBusMessage("Hello World")
    sender.send_messages(message)
    
    # Send multiple messages
    messages = [
        ServiceBusMessage("Message 1"),
        ServiceBusMessage("Message 2"),
        ServiceBusMessage("Message 3")
    ]
    sender.send_messages(messages)
    
    # Send a message batch (more efficient for large volumes)
    batch = sender.create_message_batch()
    for i in range(10):
        batch.add_message(ServiceBusMessage(f"Batch message {i}"))
    sender.send_messages(batch)

Message Scheduling

Schedule messages for future delivery with precise timing control.

def schedule_messages(
    self,
    messages: Union[ServiceBusMessage, List[ServiceBusMessage]],
    schedule_time_utc: datetime,
    *,
    timeout: Optional[float] = None,
    **kwargs
) -> List[int]:
    """
    Schedule messages for future delivery.

    Parameters:
    - messages: Single message or list of messages to schedule
    - schedule_time_utc: UTC datetime when messages should be delivered
    - timeout: Operation timeout in seconds

    Returns:
    List of sequence numbers for the scheduled messages

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

def cancel_scheduled_messages(
    self,
    sequence_numbers: Union[int, List[int]],
    *,
    timeout: Optional[float] = None,
    **kwargs
) -> None:
    """
    Cancel previously scheduled messages.

    Parameters:
    - sequence_numbers: Single sequence number or list of sequence numbers to cancel
    - timeout: Operation timeout in seconds

    Raises:
    - MessageNotFoundError: If scheduled message is not found
    - ServiceBusError: For other Service Bus related errors
    """

Usage Example

from datetime import datetime, timedelta
from azure.servicebus import ServiceBusClient, ServiceBusMessage

client = ServiceBusClient.from_connection_string("your_connection_string")

with client.get_queue_sender("my-queue") as sender:
    # Schedule a message for 1 hour from now
    future_time = datetime.utcnow() + timedelta(hours=1)
    message = ServiceBusMessage("This message will be delivered in 1 hour")
    
    sequence_numbers = sender.schedule_messages(message, future_time)
    print(f"Scheduled message with sequence number: {sequence_numbers[0]}")
    
    # Cancel the scheduled message if needed
    sender.cancel_scheduled_messages(sequence_numbers[0])

Message Batching

Create and manage message batches for efficient bulk sending.

def create_message_batch(
    self,
    max_size_in_bytes: Optional[int] = None
) -> ServiceBusMessageBatch:
    """
    Create an empty message batch.

    Parameters:
    - max_size_in_bytes: Maximum size of the batch in bytes (uses service limit if None)

    Returns:
    Empty ServiceBusMessageBatch instance

    Raises:
    - ValueError: If max_size_in_bytes is invalid
    """

Message Receiving

Receive and process messages from queues and subscriptions with various modes and options.

class ServiceBusReceiver:
    @property
    def fully_qualified_namespace(self) -> str:
        """The fully qualified namespace URL of the Service Bus."""
    
    @property
    def entity_path(self) -> str:
        """The full path of the entity (queue or subscription) that the receiver receives from."""
    
    @property
    def session(self) -> Optional[ServiceBusSession]:
        """The session object if this is a session-enabled receiver, None otherwise."""

    def receive_messages(
        self,
        max_message_count: int = 1,
        max_wait_time: Optional[float] = None
    ) -> List[ServiceBusReceivedMessage]:
        """
        Receive messages from the Service Bus entity.

        Parameters:
        - max_message_count: Maximum number of messages to receive (1-256)
        - max_wait_time: Maximum time to wait for messages in seconds

        Returns:
        List of ServiceBusReceivedMessage objects

        Raises:
        - ServiceBusError: For Service Bus related errors
        """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode

client = ServiceBusClient.from_connection_string("your_connection_string")

with client.get_queue_receiver("my-queue") as receiver:
    # Receive up to 10 messages, wait up to 5 seconds
    messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
    
    for message in messages:
        print(f"Received: {message.body}")
        # Process the message
        try:
            # Your message processing logic here
            process_message(message)
            
            # Complete the message to remove it from the queue
            receiver.complete_message(message)
        except Exception as e:
            print(f"Error processing message: {e}")
            # Abandon the message to make it available for redelivery
            receiver.abandon_message(message)

Message Peeking

Peek at messages without receiving them (messages remain in the queue/subscription).

def peek_messages(
    self,
    max_message_count: int = 1,
    *,
    sequence_number: int = 0,
    timeout: Optional[float] = None,
    **kwargs
) -> List[ServiceBusReceivedMessage]:
    """
    Peek at messages without receiving them.

    Parameters:
    - max_message_count: Maximum number of messages to peek (1-32)
    - sequence_number: Starting sequence number for peeking
    - timeout: Operation timeout in seconds

    Returns:
    List of ServiceBusReceivedMessage objects (without locks)

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

Message Settlement

Various ways to settle (complete processing of) received messages.

def complete_message(self, message: ServiceBusReceivedMessage) -> None:
    """
    Complete a message, removing it from the queue/subscription.

    Parameters:
    - message: The message to complete

    Raises:
    - MessageAlreadySettled: If message was already settled
    - MessageLockLostError: If message lock has expired
    - ServiceBusError: For other Service Bus related errors
    """

def abandon_message(self, message: ServiceBusReceivedMessage) -> None:
    """
    Abandon a message, making it available for redelivery.

    Parameters:
    - message: The message to abandon

    Raises:
    - MessageAlreadySettled: If message was already settled
    - MessageLockLostError: If message lock has expired
    - ServiceBusError: For other Service Bus related errors
    """

def defer_message(self, message: ServiceBusReceivedMessage) -> None:
    """
    Defer a message for later processing.

    Parameters:
    - message: The message to defer

    Raises:
    - MessageAlreadySettled: If message was already settled
    - MessageLockLostError: If message lock has expired
    - ServiceBusError: For other Service Bus related errors
    """

def dead_letter_message(
    self,
    message: ServiceBusReceivedMessage,
    reason: Optional[str] = None,
    error_description: Optional[str] = None
) -> None:
    """
    Move a message to the dead letter sub-queue.

    Parameters:
    - message: The message to dead letter
    - reason: Reason for dead lettering
    - error_description: Description of the error

    Raises:
    - MessageAlreadySettled: If message was already settled
    - MessageLockLostError: If message lock has expired
    - ServiceBusError: For other Service Bus related errors
    """

Message Lock Management

Manage message locks to prevent timeout during processing.

def renew_message_lock(
    self,
    message: ServiceBusReceivedMessage,
    *,
    timeout: Optional[float] = None,
    **kwargs
) -> datetime:
    """
    Renew the lock on a message.

    Parameters:
    - message: The message whose lock should be renewed
    - timeout: Operation timeout in seconds

    Returns:
    New lock expiration time

    Raises:
    - MessageAlreadySettled: If message was already settled
    - MessageLockLostError: If message lock has expired
    - ServiceBusError: For other Service Bus related errors
    """

Iterator Support

ServiceBusReceiver supports iteration for continuous message processing.

def __iter__(self) -> Iterator[ServiceBusReceivedMessage]:
    """
    Iterate over messages from the receiver.
    
    Returns:
    Iterator that yields ServiceBusReceivedMessage objects
    """

def __next__(self) -> ServiceBusReceivedMessage:
    """
    Get the next message from the receiver.
    
    Returns:
    Next ServiceBusReceivedMessage
    
    Raises:
    StopIteration: When no more messages are available
    """

Usage Example

with client.get_queue_receiver("my-queue") as receiver:
    # Iterate over messages as they arrive
    for message in receiver:
        print(f"Received: {message.body}")
        try:
            process_message(message)
            receiver.complete_message(message)
        except Exception as e:
            print(f"Error: {e}")
            receiver.abandon_message(message)
        
        # Break after processing 10 messages
        if message.delivery_count and message.delivery_count >= 10:
            break

Context Manager Support

All senders and receivers support context manager protocol for automatic resource cleanup.

# ServiceBusSender
def __enter__(self) -> ServiceBusSender: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

# ServiceBusReceiver  
def __enter__(self) -> ServiceBusReceiver: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

def close(self) -> None:
    """Close the sender/receiver and release resources."""

Asynchronous Message Operations

For asynchronous operations, use the async versions of ServiceBusSender and ServiceBusReceiver from the azure.servicebus.aio module.

from azure.servicebus.aio import ServiceBusSender, ServiceBusReceiver

# All methods have async equivalents
class ServiceBusSender:
    async def send_messages(self, message, **kwargs) -> None: ...
    async def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
    async def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
    async def close(self) -> None: ...
    async def __aenter__(self) -> ServiceBusSender: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...

class ServiceBusReceiver:
    async def receive_messages(self, max_message_count=1, max_wait_time=None) -> List[ServiceBusReceivedMessage]: ...
    async def peek_messages(self, max_message_count=1, **kwargs) -> List[ServiceBusReceivedMessage]: ...
    async def complete_message(self, message: ServiceBusReceivedMessage) -> None: ...
    async def abandon_message(self, message: ServiceBusReceivedMessage) -> None: ...
    async def defer_message(self, message: ServiceBusReceivedMessage) -> None: ...
    async def dead_letter_message(self, message: ServiceBusReceivedMessage, reason=None, error_description=None) -> None: ...
    async def renew_message_lock(self, message: ServiceBusReceivedMessage, **kwargs) -> datetime: ...
    async def close(self) -> None: ...
    async def __aenter__(self) -> ServiceBusReceiver: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
    def __aiter__(self) -> AsyncIterator[ServiceBusReceivedMessage]: ...
    async def __anext__(self) -> ServiceBusReceivedMessage: ...

Usage Example

import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage

async def send_and_receive():
    async with ServiceBusClient.from_connection_string("your_connection_string") as client:
        # Send messages asynchronously
        async with client.get_queue_sender("my-queue") as sender:
            messages = [ServiceBusMessage(f"Message {i}") for i in range(5)]
            await sender.send_messages(messages)
        
        # Receive messages asynchronously
        async with client.get_queue_receiver("my-queue") as receiver:
            async for message in receiver:
                print(f"Received: {message.body}")
                await receiver.complete_message(message)

asyncio.run(send_and_receive())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json