CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

session-management.mddocs/

Session Management

Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.

Capabilities

Session-Enabled Receivers

Create receivers that work with sessions for stateful message processing.

# From ServiceBusClient
def get_queue_receiver(
    self,
    queue_name: str,
    *,
    session_id: Optional[Union[str, NextAvailableSessionType]] = None,
    **kwargs
) -> ServiceBusReceiver:
    """
    Create a session-enabled queue receiver.

    Parameters:
    - queue_name: Name of the session-enabled queue
    - session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
    """

def get_subscription_receiver(
    self,
    topic_name: str,
    subscription_name: str,
    *,
    session_id: Optional[Union[str, NextAvailableSessionType]] = None,
    **kwargs
) -> ServiceBusReceiver:
    """
    Create a session-enabled subscription receiver.

    Parameters:
    - topic_name: Name of the topic
    - subscription_name: Name of the session-enabled subscription
    - session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
    """

Usage Example

from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION

client = ServiceBusClient.from_connection_string("your_connection_string")

# Connect to a specific session
with client.get_queue_receiver("my-session-queue", session_id="session-123") as receiver:
    session = receiver.session
    print(f"Connected to session: {session.session_id}")
    
    messages = receiver.receive_messages(max_message_count=10)
    for message in messages:
        print(f"Session message: {message.body}")
        receiver.complete_message(message)

# Connect to next available session
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
    if receiver.session:
        print(f"Connected to available session: {receiver.session.session_id}")
        # Process session messages

ServiceBusSession Operations

Manage session state and properties for stateful messaging scenarios.

class ServiceBusSession:
    @property
    def session_id(self) -> str:
        """
        The unique identifier of the session.
        
        Returns:
        Session ID string
        """
    
    @property
    def locked_until_utc(self) -> Optional[datetime]:
        """
        The time when the session lock expires.
        
        Returns:
        UTC datetime when session lock expires, or None if lock has expired
        """
    
    @property
    def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]:
        """
        Error information if auto-renewal of session lock failed.
        
        Returns:
        Exception instance if auto-renewal failed, otherwise None
        """

    def get_state(
        self,
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> bytes:
        """
        Get the session state data.

        Parameters:
        - timeout: Operation timeout in seconds

        Returns:
        Session state as bytes

        Raises:
        - SessionLockLostError: If session lock has expired
        - ServiceBusError: For other Service Bus related errors
        """

    def set_state(
        self,
        state: Optional[Union[str, bytes, bytearray]],
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> None:
        """
        Set the session state data.

        Parameters:
        - state: Session state data (str, bytes, bytearray, or None to clear)
        - timeout: Operation timeout in seconds

        Raises:
        - SessionLockLostError: If session lock has expired
        - ServiceBusError: For other Service Bus related errors
        """

    def renew_lock(
        self,
        *,
        timeout: Optional[float] = None,
        **kwargs
    ) -> datetime:
        """
        Renew the session lock.

        Parameters:
        - timeout: Operation timeout in seconds

        Returns:
        New session lock expiration time

        Raises:
        - SessionLockLostError: If session lock has expired
        - ServiceBusError: For other Service Bus related errors
        """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage
import json

client = ServiceBusClient.from_connection_string("your_connection_string")

# Sending session messages
with client.get_queue_sender("my-session-queue") as sender:
    # All messages with the same session_id will be delivered in order
    for i in range(5):
        message = ServiceBusMessage(
            f"Order step {i}",
            session_id="order-12345"
        )
        sender.send_messages(message)

# Processing session messages with state management
with client.get_queue_receiver("my-session-queue", session_id="order-12345") as receiver:
    session = receiver.session
    
    # Get current session state
    try:
        state_data = session.get_state()
        if state_data:
            order_state = json.loads(state_data.decode())
            print(f"Current order state: {order_state}")
        else:
            order_state = {"processed_steps": [], "status": "pending"}
    except Exception:
        order_state = {"processed_steps": [], "status": "pending"}
    
    # Process messages in order
    messages = receiver.receive_messages(max_message_count=10, max_wait_time=30)
    for message in messages:
        print(f"Processing: {message.body}")
        
        # Update state
        order_state["processed_steps"].append(message.body)
        if len(order_state["processed_steps"]) >= 5:
            order_state["status"] = "completed"
        
        # Save updated state
        session.set_state(json.dumps(order_state).encode())
        
        # Complete the message
        receiver.complete_message(message)
    
    print(f"Final order state: {order_state}")

Automatic Lock Renewal

Use AutoLockRenewer to automatically maintain session locks during long-running processing.

class AutoLockRenewer:
    def __init__(
        self,
        max_lock_renewal_duration: float = 300,
        on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
        executor: Optional[ThreadPoolExecutor] = None,
        max_workers: Optional[int] = None
    ):
        """
        Initialize AutoLockRenewer for automatic lock renewal.

        Parameters:
        - max_lock_renewal_duration: Maximum time to renew locks (seconds)
        - on_lock_renew_failure: Callback function for renewal failures
        - executor: Custom thread pool executor
        - max_workers: Maximum number of worker threads
        """

    def register(
        self,
        renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
        timeout: float = 300
    ) -> None:
        """
        Register a session or message for automatic lock renewal.

        Parameters:
        - renewable: ServiceBusSession or ServiceBusReceivedMessage to auto-renew
        - timeout: Maximum renewal duration in seconds

        Raises:
        - ValueError: If renewable is already registered or invalid
        """

    def close(self) -> None:
        """
        Stop all auto-renewal and cleanup resources.
        """

Usage Example

from azure.servicebus import ServiceBusClient, AutoLockRenewer
import time

def on_lock_renew_failure(renewable, error):
    print(f"Lock renewal failed for {renewable}: {error}")

client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(
    max_lock_renewal_duration=600,  # 10 minutes
    on_lock_renew_failure=on_lock_renew_failure
)

with client.get_queue_receiver("my-session-queue", session_id="long-session") as receiver:
    session = receiver.session
    
    # Register session for auto-renewal
    auto_renewer.register(session, timeout=600)
    
    try:
        # Long-running processing
        messages = receiver.receive_messages(max_message_count=100, max_wait_time=60)
        
        for message in messages:
            # Register message for auto-renewal during processing
            auto_renewer.register(message, timeout=300)
            
            try:
                # Simulate long processing time
                print(f"Processing message: {message.body}")
                time.sleep(30)  # Long processing
                
                receiver.complete_message(message)
                print("Message completed successfully")
                
            except Exception as e:
                print(f"Error processing message: {e}")
                receiver.abandon_message(message)
                
    finally:
        auto_renewer.close()

Session Filtering

Constants and enums for session management.

class ServiceBusSessionFilter(Enum):
    """Filter for selecting sessions."""
    NEXT_AVAILABLE = 0

# Constant for convenience
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter = ServiceBusSessionFilter.NEXT_AVAILABLE

Session Error Handling

Specific exceptions related to session operations.

class SessionLockLostError(ServiceBusError):
    """
    The session lock has expired.
    
    All unsettled messages that have been received can no longer be settled.
    The session must be re-acquired to continue processing.
    """

class SessionCannotBeLockedError(ServiceBusError):
    """
    The requested session cannot be locked.
    
    This typically occurs when:
    - The session is already locked by another client
    - The session does not exist
    - The session ID is invalid
    """

Usage Example

from azure.servicebus import (
    ServiceBusClient, 
    SessionLockLostError, 
    SessionCannotBeLockedError,
    NEXT_AVAILABLE_SESSION
)

client = ServiceBusClient.from_connection_string("your_connection_string")

try:
    with client.get_queue_receiver("my-session-queue", session_id="busy-session") as receiver:
        session = receiver.session
        messages = receiver.receive_messages()
        
        for message in messages:
            try:
                # Process message
                process_message_with_session_state(message, session)
                receiver.complete_message(message)
                
            except SessionLockLostError:
                print("Session lock lost - session must be re-acquired")
                break
                
except SessionCannotBeLockedError:
    print("Session is locked by another client, trying next available session")
    
    # Try to get any available session instead
    with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
        if receiver.session:
            print(f"Got available session: {receiver.session.session_id}")
            # Process messages from this session

Asynchronous Session Management

For asynchronous operations, use the async versions from the azure.servicebus.aio module.

from azure.servicebus.aio import ServiceBusClient, ServiceBusReceiver, ServiceBusSession

class ServiceBusSession:
    # Same properties as sync version
    @property
    def session_id(self) -> str: ...
    @property
    def locked_until_utc(self) -> Optional[datetime]: ...
    @property
    def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]: ...
    
    # Async methods
    async def get_state(self, *, timeout: Optional[float] = None, **kwargs) -> bytes: ...
    async def set_state(self, state, *, timeout: Optional[float] = None, **kwargs) -> None: ...
    async def renew_lock(self, *, timeout: Optional[float] = None, **kwargs) -> datetime: ...

Usage Example

import asyncio
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION

async def process_session_messages():
    async with ServiceBusClient.from_connection_string("your_connection_string") as client:
        async with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
            session = receiver.session
            if not session:
                print("No available sessions")
                return
                
            print(f"Processing session: {session.session_id}")
            
            # Get session state
            state_data = await session.get_state()
            session_state = json.loads(state_data.decode()) if state_data else {"count": 0}
            
            # Process messages
            async for message in receiver:
                print(f"Received: {message.body}")
                
                # Update session state
                session_state["count"] += 1
                await session.set_state(json.dumps(session_state).encode())
                
                # Complete message
                await receiver.complete_message(message)
                
                # Break after 10 messages
                if session_state["count"] >= 10:
                    break

asyncio.run(process_session_messages())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json