0
# Session Management
1
2
Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.
3
4
## Capabilities
5
6
### Session-Enabled Receivers
7
8
Create receivers that work with sessions for stateful message processing.
9
10
```python { .api }
11
# From ServiceBusClient
12
def get_queue_receiver(
13
self,
14
queue_name: str,
15
*,
16
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
17
**kwargs
18
) -> ServiceBusReceiver:
19
"""
20
Create a session-enabled queue receiver.
21
22
Parameters:
23
- queue_name: Name of the session-enabled queue
24
- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
25
"""
26
27
def get_subscription_receiver(
28
self,
29
topic_name: str,
30
subscription_name: str,
31
*,
32
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
33
**kwargs
34
) -> ServiceBusReceiver:
35
"""
36
Create a session-enabled subscription receiver.
37
38
Parameters:
39
- topic_name: Name of the topic
40
- subscription_name: Name of the session-enabled subscription
41
- session_id: Specific session ID or NEXT_AVAILABLE_SESSION for any available session
42
"""
43
```
44
45
#### Usage Example
46
47
```python
48
from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION
49
50
client = ServiceBusClient.from_connection_string("your_connection_string")
51
52
# Connect to a specific session
53
with client.get_queue_receiver("my-session-queue", session_id="session-123") as receiver:
54
session = receiver.session
55
print(f"Connected to session: {session.session_id}")
56
57
messages = receiver.receive_messages(max_message_count=10)
58
for message in messages:
59
print(f"Session message: {message.body}")
60
receiver.complete_message(message)
61
62
# Connect to next available session
63
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
64
if receiver.session:
65
print(f"Connected to available session: {receiver.session.session_id}")
66
# Process session messages
67
```
68
69
### ServiceBusSession Operations
70
71
Manage session state and properties for stateful messaging scenarios.
72
73
```python { .api }
74
class ServiceBusSession:
75
@property
76
def session_id(self) -> str:
77
"""
78
The unique identifier of the session.
79
80
Returns:
81
Session ID string
82
"""
83
84
@property
85
def locked_until_utc(self) -> Optional[datetime]:
86
"""
87
The time when the session lock expires.
88
89
Returns:
90
UTC datetime when session lock expires, or None if lock has expired
91
"""
92
93
@property
94
def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]:
95
"""
96
Error information if auto-renewal of session lock failed.
97
98
Returns:
99
Exception instance if auto-renewal failed, otherwise None
100
"""
101
102
def get_state(
103
self,
104
*,
105
timeout: Optional[float] = None,
106
**kwargs
107
) -> bytes:
108
"""
109
Get the session state data.
110
111
Parameters:
112
- timeout: Operation timeout in seconds
113
114
Returns:
115
Session state as bytes
116
117
Raises:
118
- SessionLockLostError: If session lock has expired
119
- ServiceBusError: For other Service Bus related errors
120
"""
121
122
def set_state(
123
self,
124
state: Optional[Union[str, bytes, bytearray]],
125
*,
126
timeout: Optional[float] = None,
127
**kwargs
128
) -> None:
129
"""
130
Set the session state data.
131
132
Parameters:
133
- state: Session state data (str, bytes, bytearray, or None to clear)
134
- timeout: Operation timeout in seconds
135
136
Raises:
137
- SessionLockLostError: If session lock has expired
138
- ServiceBusError: For other Service Bus related errors
139
"""
140
141
def renew_lock(
142
self,
143
*,
144
timeout: Optional[float] = None,
145
**kwargs
146
) -> datetime:
147
"""
148
Renew the session lock.
149
150
Parameters:
151
- timeout: Operation timeout in seconds
152
153
Returns:
154
New session lock expiration time
155
156
Raises:
157
- SessionLockLostError: If session lock has expired
158
- ServiceBusError: For other Service Bus related errors
159
"""
160
```
161
162
#### Usage Example
163
164
```python
165
from azure.servicebus import ServiceBusClient, ServiceBusMessage
166
import json
167
168
client = ServiceBusClient.from_connection_string("your_connection_string")
169
170
# Sending session messages
171
with client.get_queue_sender("my-session-queue") as sender:
172
# All messages with the same session_id will be delivered in order
173
for i in range(5):
174
message = ServiceBusMessage(
175
f"Order step {i}",
176
session_id="order-12345"
177
)
178
sender.send_messages(message)
179
180
# Processing session messages with state management
181
with client.get_queue_receiver("my-session-queue", session_id="order-12345") as receiver:
182
session = receiver.session
183
184
# Get current session state
185
try:
186
state_data = session.get_state()
187
if state_data:
188
order_state = json.loads(state_data.decode())
189
print(f"Current order state: {order_state}")
190
else:
191
order_state = {"processed_steps": [], "status": "pending"}
192
except Exception:
193
order_state = {"processed_steps": [], "status": "pending"}
194
195
# Process messages in order
196
messages = receiver.receive_messages(max_message_count=10, max_wait_time=30)
197
for message in messages:
198
print(f"Processing: {message.body}")
199
200
# Update state
201
order_state["processed_steps"].append(message.body)
202
if len(order_state["processed_steps"]) >= 5:
203
order_state["status"] = "completed"
204
205
# Save updated state
206
session.set_state(json.dumps(order_state).encode())
207
208
# Complete the message
209
receiver.complete_message(message)
210
211
print(f"Final order state: {order_state}")
212
```
213
214
### Automatic Lock Renewal
215
216
Use AutoLockRenewer to automatically maintain session locks during long-running processing.
217
218
```python { .api }
219
class AutoLockRenewer:
220
def __init__(
221
self,
222
max_lock_renewal_duration: float = 300,
223
on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
224
executor: Optional[ThreadPoolExecutor] = None,
225
max_workers: Optional[int] = None
226
):
227
"""
228
Initialize AutoLockRenewer for automatic lock renewal.
229
230
Parameters:
231
- max_lock_renewal_duration: Maximum time to renew locks (seconds)
232
- on_lock_renew_failure: Callback function for renewal failures
233
- executor: Custom thread pool executor
234
- max_workers: Maximum number of worker threads
235
"""
236
237
def register(
238
self,
239
renewable: Union[ServiceBusSession, ServiceBusReceivedMessage],
240
timeout: float = 300
241
) -> None:
242
"""
243
Register a session or message for automatic lock renewal.
244
245
Parameters:
246
- renewable: ServiceBusSession or ServiceBusReceivedMessage to auto-renew
247
- timeout: Maximum renewal duration in seconds
248
249
Raises:
250
- ValueError: If renewable is already registered or invalid
251
"""
252
253
def close(self) -> None:
254
"""
255
Stop all auto-renewal and cleanup resources.
256
"""
257
```
258
259
#### Usage Example
260
261
```python
262
from azure.servicebus import ServiceBusClient, AutoLockRenewer
263
import time
264
265
def on_lock_renew_failure(renewable, error):
266
print(f"Lock renewal failed for {renewable}: {error}")
267
268
client = ServiceBusClient.from_connection_string("your_connection_string")
269
auto_renewer = AutoLockRenewer(
270
max_lock_renewal_duration=600, # 10 minutes
271
on_lock_renew_failure=on_lock_renew_failure
272
)
273
274
with client.get_queue_receiver("my-session-queue", session_id="long-session") as receiver:
275
session = receiver.session
276
277
# Register session for auto-renewal
278
auto_renewer.register(session, timeout=600)
279
280
try:
281
# Long-running processing
282
messages = receiver.receive_messages(max_message_count=100, max_wait_time=60)
283
284
for message in messages:
285
# Register message for auto-renewal during processing
286
auto_renewer.register(message, timeout=300)
287
288
try:
289
# Simulate long processing time
290
print(f"Processing message: {message.body}")
291
time.sleep(30) # Long processing
292
293
receiver.complete_message(message)
294
print("Message completed successfully")
295
296
except Exception as e:
297
print(f"Error processing message: {e}")
298
receiver.abandon_message(message)
299
300
finally:
301
auto_renewer.close()
302
```
303
304
### Session Filtering
305
306
Constants and enums for session management.
307
308
```python { .api }
309
class ServiceBusSessionFilter(Enum):
310
"""Filter for selecting sessions."""
311
NEXT_AVAILABLE = 0
312
313
# Constant for convenience
314
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter = ServiceBusSessionFilter.NEXT_AVAILABLE
315
```
316
317
### Session Error Handling
318
319
Specific exceptions related to session operations.
320
321
```python { .api }
322
class SessionLockLostError(ServiceBusError):
323
"""
324
The session lock has expired.
325
326
All unsettled messages that have been received can no longer be settled.
327
The session must be re-acquired to continue processing.
328
"""
329
330
class SessionCannotBeLockedError(ServiceBusError):
331
"""
332
The requested session cannot be locked.
333
334
This typically occurs when:
335
- The session is already locked by another client
336
- The session does not exist
337
- The session ID is invalid
338
"""
339
```
340
341
#### Usage Example
342
343
```python
344
from azure.servicebus import (
345
ServiceBusClient,
346
SessionLockLostError,
347
SessionCannotBeLockedError,
348
NEXT_AVAILABLE_SESSION
349
)
350
351
client = ServiceBusClient.from_connection_string("your_connection_string")
352
353
try:
354
with client.get_queue_receiver("my-session-queue", session_id="busy-session") as receiver:
355
session = receiver.session
356
messages = receiver.receive_messages()
357
358
for message in messages:
359
try:
360
# Process message
361
process_message_with_session_state(message, session)
362
receiver.complete_message(message)
363
364
except SessionLockLostError:
365
print("Session lock lost - session must be re-acquired")
366
break
367
368
except SessionCannotBeLockedError:
369
print("Session is locked by another client, trying next available session")
370
371
# Try to get any available session instead
372
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
373
if receiver.session:
374
print(f"Got available session: {receiver.session.session_id}")
375
# Process messages from this session
376
```
377
378
## Asynchronous Session Management
379
380
For asynchronous operations, use the async versions from the `azure.servicebus.aio` module.
381
382
```python { .api }
383
from azure.servicebus.aio import ServiceBusClient, ServiceBusReceiver, ServiceBusSession
384
385
class ServiceBusSession:
386
# Same properties as sync version
387
@property
388
def session_id(self) -> str: ...
389
@property
390
def locked_until_utc(self) -> Optional[datetime]: ...
391
@property
392
def auto_renew_error(self) -> Optional[Union[AutoLockRenewFailed, AutoLockRenewTimeout]]: ...
393
394
# Async methods
395
async def get_state(self, *, timeout: Optional[float] = None, **kwargs) -> bytes: ...
396
async def set_state(self, state, *, timeout: Optional[float] = None, **kwargs) -> None: ...
397
async def renew_lock(self, *, timeout: Optional[float] = None, **kwargs) -> datetime: ...
398
```
399
400
#### Usage Example
401
402
```python
403
import asyncio
404
import json
405
from azure.servicebus.aio import ServiceBusClient
406
from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION
407
408
async def process_session_messages():
409
async with ServiceBusClient.from_connection_string("your_connection_string") as client:
410
async with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
411
session = receiver.session
412
if not session:
413
print("No available sessions")
414
return
415
416
print(f"Processing session: {session.session_id}")
417
418
# Get session state
419
state_data = await session.get_state()
420
session_state = json.loads(state_data.decode()) if state_data else {"count": 0}
421
422
# Process messages
423
async for message in receiver:
424
print(f"Received: {message.body}")
425
426
# Update session state
427
session_state["count"] += 1
428
await session.set_state(json.dumps(session_state).encode())
429
430
# Complete message
431
await receiver.complete_message(message)
432
433
# Break after 10 messages
434
if session_state["count"] >= 10:
435
break
436
437
asyncio.run(process_session_messages())
438
```