tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
The SessionReceiver type handles session-aware message processing with guaranteed FIFO ordering, session state management, and session lock renewal. Sessions enable related messages to be processed sequentially with exclusive access.
func (sr *SessionReceiver) SessionID() stringReturns the session ID for this SessionReceiver. The session ID identifies the session and ensures that only one receiver can process messages from this session at a time.
Example:
sessionID := sessionReceiver.SessionID()
fmt.Printf("Processing session: %s\n", sessionID)func (sr *SessionReceiver) LockedUntil() time.TimeReturns the time when the session lock expires. The lock can be renewed using RenewSessionLock if more processing time is needed.
Example:
lockedUntil := sessionReceiver.LockedUntil()
fmt.Printf("Session locked until: %s\n", lockedUntil)
if time.Until(lockedUntil) < time.Minute {
fmt.Println("Warning: Session lock expiring soon")
}func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSessionStateOptions) ([]byte, error)Retrieves the state data associated with the session. Session state is arbitrary byte data that can be used to track processing progress or store session-specific information. Returns an *Error if the operation fails.
Example:
state, err := sessionReceiver.GetSessionState(context.Background(), nil)
if err != nil {
panic(err)
}
if state != nil {
fmt.Printf("Session state: %s\n", string(state))
}func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) errorSets the state data associated with the session. Pass nil for the state parameter to clear the stored session state. Use this to track processing progress or store session-specific information. Returns an *Error if the operation fails.
Example:
// Set session state
progress := map[string]any{
"processedCount": 10,
"lastProcessedID": "msg-123",
"startedAt": time.Now(),
}
stateBytes, _ := json.Marshal(progress)
err := sessionReceiver.SetSessionState(context.Background(), stateBytes, nil)
if err != nil {
panic(err)
}
// Clear session state when done
err = sessionReceiver.SetSessionState(context.Background(), nil, nil)func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) errorRenews the session lock, extending the exclusive access period. The new expiration time is available via LockedUntil(). Use this when session processing takes longer than the initial lock duration. Returns an *Error if the operation fails.
Example:
// Processing a session with many messages
for {
messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
break
}
// Check if session lock needs renewal
if time.Until(sessionReceiver.LockedUntil()) < time.Second*30 {
err = sessionReceiver.RenewSessionLock(context.Background(), nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
fmt.Println("Session lock lost, cannot continue")
return
}
panic(err)
}
fmt.Printf("Session lock renewed until: %s\n", sessionReceiver.LockedUntil())
}
// Process messages
for _, msg := range messages {
processMessage(msg)
sessionReceiver.CompleteMessage(context.Background(), msg, nil)
}
}SessionReceiver supports all message operations from the standard Receiver:
func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)Receives up to maxMessages messages from the session. This function blocks until at least one message is received or the context is cancelled. Messages within a session are delivered in FIFO order. Returns an *Error if the operation fails.
When using ReceiveModeReceiveAndDelete, you can continue calling ReceiveMessages even after closing the receiver to drain the internal cache. Returns *Error with Code == CodeClosed when fully drained.
Example:
messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
panic(err)
}
for _, msg := range messages {
fmt.Printf("Session %s - Received: %s\n", *msg.SessionID, string(msg.Body))
}func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) errorCompletes a message, deleting it from the subscription or queue. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.
func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) errorAbandons a message, making it available for redelivery. Increments the delivery count. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.
func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) errorDefers a message for later processing. Retrieve deferred messages using ReceiveDeferredMessages. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.
func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) errorMoves a message to the dead letter queue. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.
func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)Peeks messages without locking or deleting them. The SessionReceiver stores the last peeked sequence number internally. Override with PeekMessagesOptions.FromSequenceNumber. Peeked messages cannot be settled. Returns an *Error if the operation fails.
For more information: https://aka.ms/azsdk/servicebus/message-browsing
func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)Receives messages that were deferred using DeferMessage. Returns an *Error if the operation fails.
func (r *SessionReceiver) Close(ctx context.Context) errorPermanently closes the session receiver and releases the session lock. Always call Close when done processing the session.
Example:
sessionReceiver, _ := client.AcceptSessionForQueue(ctx, "myqueue", "session-123", nil)
defer sessionReceiver.Close(context.Background())// Accept a specific session
sessionReceiver, err := client.AcceptSessionForQueue(
context.Background(),
"myqueue",
"session-123",
nil,
)
if err != nil {
panic(err)
}
defer sessionReceiver.Close(context.Background())
// Get session state
state, _ := sessionReceiver.GetSessionState(context.Background(), nil)
fmt.Printf("Session state: %s\n", string(state))
// Process messages in FIFO order
for {
messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
break
}
for _, msg := range messages {
err = processMessage(msg)
if err == nil {
sessionReceiver.CompleteMessage(context.Background(), msg, nil)
}
}
}
// Clear session state when done
sessionReceiver.SetSessionState(context.Background(), nil, nil)ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
// Accept next available session
sessionReceiver, err := client.AcceptNextSessionForQueue(ctx, "myqueue", nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
fmt.Println("No sessions available")
return
}
panic(err)
}
defer sessionReceiver.Close(context.Background())
fmt.Printf("Processing session: %s\n", sessionReceiver.SessionID())
// Process session messages
messages, _ := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
for _, msg := range messages {
processMessage(msg)
sessionReceiver.CompleteMessage(context.Background(), msg, nil)
}sessionReceiver, _ := client.AcceptSessionForQueue(ctx, "myqueue", "session-123", nil)
defer sessionReceiver.Close(context.Background())
// Load initial state
state, _ := sessionReceiver.GetSessionState(context.Background(), nil)
processedCount := 0
if state != nil {
json.Unmarshal(state, &processedCount)
}
for {
// Check and renew lock if needed
if time.Until(sessionReceiver.LockedUntil()) < time.Minute {
err := sessionReceiver.RenewSessionLock(context.Background(), nil)
if err != nil {
fmt.Println("Failed to renew session lock")
break
}
}
// Receive and process messages
messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
break
}
for _, msg := range messages {
processMessage(msg)
sessionReceiver.CompleteMessage(context.Background(), msg, nil)
processedCount++
// Update session state periodically
if processedCount%10 == 0 {
stateBytes, _ := json.Marshal(processedCount)
sessionReceiver.SetSessionState(context.Background(), stateBytes, nil)
}
}
}type SessionReceiver struct {
// ... unexported fields
}SessionReceiver is a Receiver that handles sessions with state management and FIFO ordering.
type SessionReceiverOptions struct {
ReceiveMode ReceiveMode
}Options for creating a SessionReceiver:
ReceiveModePeekLock by default, or ReceiveModeReceiveAndDelete)More information about receive modes: https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
type GetSessionStateOptions struct {
// Currently empty, reserved for future expansion
}Options for getting session state.
type SetSessionStateOptions struct {
// Currently empty, reserved for future expansion
}Options for setting session state.
type RenewSessionLockOptions struct {
// Currently empty, reserved for future expansion
}Options for renewing session lock.