Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.
Create receivers that work with sessions for stateful message processing.
# From ServiceBusClient
def get_queue_receiver(
self,
queue_name: str,
*,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
**kwargs
) -> ServiceBusReceiver:
"""
Create a session-enabled queue receiver.
Parameters:
- queue_name: Name of the session-enabled queue
- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
"""
def get_subscription_receiver(
self,
topic_name: str,
subscription_name: str,
*,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
**kwargs
) -> ServiceBusReceiver:
"""
Create a session-enabled subscription receiver.
Parameters:
- topic_name: Name of the topic
- subscription_name: Name of the session-enabled subscription
- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
"""from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION
client = ServiceBusClient.from_connection_string("your_connection_string")
# Connect to a specific session
with client.get_queue_receiver("my-session-queue", session_id="session-123") as receiver:
session = receiver.session
print(f"Connected to session: {session.session_id}")
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
print(f"Session message: {message.body}")
receiver.complete_message(message)
# Connect to next available session
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
if receiver.session:
print(f"Connected to available session: {receiver.session.session_id}")
# Process session messagesManage session state and properties for stateful messaging scenarios.
class ServiceBusSession:
@property
def session_id(self) -> str:
"""
The unique identifier of the session.
Returns:
Session ID string
"""
@property
def locked_until_utc(self) -> Optional[datetime]:
"""
The time when the session lock expires.
Returns:
UTC datetime when session lock expires, or None if lock has expired
"""
@property
def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]:
"""
Error information if auto-renewal of session lock failed.
Returns:
Exception instance if auto-renewal failed, otherwise None
"""
def get_state(
self,
*,
timeout: Optional[float] = None,
**kwargs
) -> bytes:
"""
Get the session state data.
Parameters:
- timeout: Operation timeout in seconds
Returns:
Session state as bytes
Raises:
- SessionLockLostError: If session lock has expired
- ServiceBusError: For other Service Bus related errors
"""
def set_state(
self,
state: Optional[Union[str, bytes, bytearray]],
*,
timeout: Optional[float] = None,
**kwargs
) -> None:
"""
Set the session state data.
Parameters:
- state: Session state data (str, bytes, bytearray, or None to clear)
- timeout: Operation timeout in seconds
Raises:
- SessionLockLostError: If session lock has expired
- ServiceBusError: For other Service Bus related errors
"""
def renew_lock(
self,
*,
timeout: Optional[float] = None,
**kwargs
) -> datetime:
"""
Renew the session lock.
Parameters:
- timeout: Operation timeout in seconds
Returns:
New session lock expiration time
Raises:
- SessionLockLostError: If session lock has expired
- ServiceBusError: For other Service Bus related errors
"""from azure.servicebus import ServiceBusClient, ServiceBusMessage
import json
client = ServiceBusClient.from_connection_string("your_connection_string")
# Sending session messages
with client.get_queue_sender("my-session-queue") as sender:
# All messages with the same session_id will be delivered in order
for i in range(5):
message = ServiceBusMessage(
f"Order step {i}",
session_id="order-12345"
)
sender.send_messages(message)
# Processing session messages with state management
with client.get_queue_receiver("my-session-queue", session_id="order-12345") as receiver:
session = receiver.session
# Get current session state
try:
state_data = session.get_state()
if state_data:
order_state = json.loads(state_data.decode())
print(f"Current order state: {order_state}")
else:
order_state = {"processed_steps": [], "status": "pending"}
except Exception:
order_state = {"processed_steps": [], "status": "pending"}
# Process messages in order
messages = receiver.receive_messages(max_message_count=10, max_wait_time=30)
for message in messages:
print(f"Processing: {message.body}")
# Update state
order_state["processed_steps"].append(message.body)
if len(order_state["processed_steps"]) >= 5:
order_state["status"] = "completed"
# Save updated state
session.set_state(json.dumps(order_state).encode())
# Complete the message
receiver.complete_message(message)
print(f"Final order state: {order_state}")Use AutoLockRenewer to automatically maintain session locks during long-running processing.
class AutoLockRenewer:
def __init__(
self,
max_lock_renewal_duration: float = 300,
on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
max_workers: Optional[int] = None
):
"""
Initialize AutoLockRenewer for automatic lock renewal.
Parameters:
- max_lock_renewal_duration: Maximum time to renew locks (seconds)
- on_lock_renew_failure: Callback function for renewal failures
- executor: Custom thread pool executor
- max_workers: Maximum number of worker threads
"""
def register(
self,
renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
timeout: float = 300
) -> None:
"""
Register a session or message for automatic lock renewal.
Parameters:
- renewable: ServiceBusSession or ServiceBusReceivedMessage to auto-renew
- timeout: Maximum renewal duration in seconds
Raises:
- ValueError: If renewable is already registered or invalid
"""
def close(self) -> None:
"""
Stop all auto-renewal and cleanup resources.
"""from azure.servicebus import ServiceBusClient, AutoLockRenewer
import time
def on_lock_renew_failure(renewable, error):
print(f"Lock renewal failed for {renewable}: {error}")
client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(
max_lock_renewal_duration=600, # 10 minutes
on_lock_renew_failure=on_lock_renew_failure
)
with client.get_queue_receiver("my-session-queue", session_id="long-session") as receiver:
session = receiver.session
# Register session for auto-renewal
auto_renewer.register(session, timeout=600)
try:
# Long-running processing
messages = receiver.receive_messages(max_message_count=100, max_wait_time=60)
for message in messages:
# Register message for auto-renewal during processing
auto_renewer.register(message, timeout=300)
try:
# Simulate long processing time
print(f"Processing message: {message.body}")
time.sleep(30) # Long processing
receiver.complete_message(message)
print("Message completed successfully")
except Exception as e:
print(f"Error processing message: {e}")
receiver.abandon_message(message)
finally:
auto_renewer.close()Constants and enums for session management.
class ServiceBusSessionFilter(Enum):
"""Filter for selecting sessions."""
NEXT_AVAILABLE = 0
# Constant for convenience
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter = ServiceBusSessionFilter.NEXT_AVAILABLESpecific exceptions related to session operations.
class SessionLockLostError(ServiceBusError):
"""
The session lock has expired.
All unsettled messages that have been received can no longer be settled.
The session must be re-acquired to continue processing.
"""
class SessionCannotBeLockedError(ServiceBusError):
"""
The requested session cannot be locked.
This typically occurs when:
- The session is already locked by another client
- The session does not exist
- The session ID is invalid
"""from azure.servicebus import (
ServiceBusClient,
SessionLockLostError,
SessionCannotBeLockedError,
NEXT_AVAILABLE_SESSION
)
client = ServiceBusClient.from_connection_string("your_connection_string")
try:
with client.get_queue_receiver("my-session-queue", session_id="busy-session") as receiver:
session = receiver.session
messages = receiver.receive_messages()
for message in messages:
try:
# Process message
process_message_with_session_state(message, session)
receiver.complete_message(message)
except SessionLockLostError:
print("Session lock lost - session must be re-acquired")
break
except SessionCannotBeLockedError:
print("Session is locked by another client, trying next available session")
# Try to get any available session instead
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
if receiver.session:
print(f"Got available session: {receiver.session.session_id}")
# Process messages from this sessionFor asynchronous operations, use the async versions from the azure.servicebus.aio module.
from azure.servicebus.aio import ServiceBusClient, ServiceBusReceiver, ServiceBusSession
class ServiceBusSession:
# Same properties as sync version
@property
def session_id(self) -> str: ...
@property
def locked_until_utc(self) -> Optional[datetime]: ...
@property
def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]: ...
# Async methods
async def get_state(self, *, timeout: Optional[float] = None, **kwargs) -> bytes: ...
async def set_state(self, state, *, timeout: Optional[float] = None, **kwargs) -> None: ...
async def renew_lock(self, *, timeout: Optional[float] = None, **kwargs) -> datetime: ...import asyncio
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION
async def process_session_messages():
async with ServiceBusClient.from_connection_string("your_connection_string") as client:
async with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
session = receiver.session
if not session:
print("No available sessions")
return
print(f"Processing session: {session.session_id}")
# Get session state
state_data = await session.get_state()
session_state = json.loads(state_data.decode()) if state_data else {"count": 0}
# Process messages
async for message in receiver:
print(f"Received: {message.body}")
# Update session state
session_state["count"] += 1
await session.set_state(json.dumps(session_state).encode())
# Complete message
await receiver.complete_message(message)
# Break after 10 messages
if session_state["count"] >= 10:
break
asyncio.run(process_session_messages())Install with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10