or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus@v1.10.0

docs

client.mderrors.mdindex.mdmessaging.mdreceiver.mdsender.mdsessions.md
tile.json

tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus

tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1

Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.

sessions.mddocs/

Session Receivers

The SessionReceiver type handles session-aware message processing with guaranteed FIFO ordering, session state management, and session lock renewal. Sessions enable related messages to be processed sequentially with exclusive access.

Capabilities

Session Information

Get Session ID

func (sr *SessionReceiver) SessionID() string

Returns the session ID for this SessionReceiver. The session ID identifies the session and ensures that only one receiver can process messages from this session at a time.

Example:

sessionID := sessionReceiver.SessionID()
fmt.Printf("Processing session: %s\n", sessionID)

Get Lock Expiration

func (sr *SessionReceiver) LockedUntil() time.Time

Returns the time when the session lock expires. The lock can be renewed using RenewSessionLock if more processing time is needed.

Example:

lockedUntil := sessionReceiver.LockedUntil()
fmt.Printf("Session locked until: %s\n", lockedUntil)

if time.Until(lockedUntil) < time.Minute {
    fmt.Println("Warning: Session lock expiring soon")
}

Session State Management

Get Session State

func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSessionStateOptions) ([]byte, error)

Retrieves the state data associated with the session. Session state is arbitrary byte data that can be used to track processing progress or store session-specific information. Returns an *Error if the operation fails.

Example:

state, err := sessionReceiver.GetSessionState(context.Background(), nil)
if err != nil {
    panic(err)
}

if state != nil {
    fmt.Printf("Session state: %s\n", string(state))
}

Set Session State

func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error

Sets the state data associated with the session. Pass nil for the state parameter to clear the stored session state. Use this to track processing progress or store session-specific information. Returns an *Error if the operation fails.

Example:

// Set session state
progress := map[string]any{
    "processedCount": 10,
    "lastProcessedID": "msg-123",
    "startedAt": time.Now(),
}
stateBytes, _ := json.Marshal(progress)

err := sessionReceiver.SetSessionState(context.Background(), stateBytes, nil)
if err != nil {
    panic(err)
}

// Clear session state when done
err = sessionReceiver.SetSessionState(context.Background(), nil, nil)

Session Lock Management

Renew Session Lock

func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error

Renews the session lock, extending the exclusive access period. The new expiration time is available via LockedUntil(). Use this when session processing takes longer than the initial lock duration. Returns an *Error if the operation fails.

Example:

// Processing a session with many messages
for {
    messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
    if err != nil {
        break
    }

    // Check if session lock needs renewal
    if time.Until(sessionReceiver.LockedUntil()) < time.Second*30 {
        err = sessionReceiver.RenewSessionLock(context.Background(), nil)
        if err != nil {
            var sbErr *azservicebus.Error
            if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
                fmt.Println("Session lock lost, cannot continue")
                return
            }
            panic(err)
        }
        fmt.Printf("Session lock renewed until: %s\n", sessionReceiver.LockedUntil())
    }

    // Process messages
    for _, msg := range messages {
        processMessage(msg)
        sessionReceiver.CompleteMessage(context.Background(), msg, nil)
    }
}

Message Operations

SessionReceiver supports all message operations from the standard Receiver:

Receive Messages

func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

Receives up to maxMessages messages from the session. This function blocks until at least one message is received or the context is cancelled. Messages within a session are delivered in FIFO order. Returns an *Error if the operation fails.

When using ReceiveModeReceiveAndDelete, you can continue calling ReceiveMessages even after closing the receiver to drain the internal cache. Returns *Error with Code == CodeClosed when fully drained.

Example:

messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
    panic(err)
}

for _, msg := range messages {
    fmt.Printf("Session %s - Received: %s\n", *msg.SessionID, string(msg.Body))
}

Complete Message

func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error

Completes a message, deleting it from the subscription or queue. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.

Abandon Message

func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

Abandons a message, making it available for redelivery. Increments the delivery count. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.

Defer Message

func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

Defers a message for later processing. Retrieve deferred messages using ReceiveDeferredMessages. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.

Dead Letter Message

func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

Moves a message to the dead letter queue. Can only be used with ReceiveModePeekLock. Returns an *Error if the operation fails.

Peek Messages

func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

Peeks messages without locking or deleting them. The SessionReceiver stores the last peeked sequence number internally. Override with PeekMessagesOptions.FromSequenceNumber. Peeked messages cannot be settled. Returns an *Error if the operation fails.

For more information: https://aka.ms/azsdk/servicebus/message-browsing

Receive Deferred Messages

func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)

Receives messages that were deferred using DeferMessage. Returns an *Error if the operation fails.

Close Session Receiver

func (r *SessionReceiver) Close(ctx context.Context) error

Permanently closes the session receiver and releases the session lock. Always call Close when done processing the session.

Example:

sessionReceiver, _ := client.AcceptSessionForQueue(ctx, "myqueue", "session-123", nil)
defer sessionReceiver.Close(context.Background())

Usage Patterns

Processing a Specific Session

// Accept a specific session
sessionReceiver, err := client.AcceptSessionForQueue(
    context.Background(),
    "myqueue",
    "session-123",
    nil,
)
if err != nil {
    panic(err)
}
defer sessionReceiver.Close(context.Background())

// Get session state
state, _ := sessionReceiver.GetSessionState(context.Background(), nil)
fmt.Printf("Session state: %s\n", string(state))

// Process messages in FIFO order
for {
    messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
    if err != nil {
        break
    }

    for _, msg := range messages {
        err = processMessage(msg)
        if err == nil {
            sessionReceiver.CompleteMessage(context.Background(), msg, nil)
        }
    }
}

// Clear session state when done
sessionReceiver.SetSessionState(context.Background(), nil, nil)

Processing Next Available Session

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

// Accept next available session
sessionReceiver, err := client.AcceptNextSessionForQueue(ctx, "myqueue", nil)
if err != nil {
    var sbErr *azservicebus.Error
    if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
        fmt.Println("No sessions available")
        return
    }
    panic(err)
}
defer sessionReceiver.Close(context.Background())

fmt.Printf("Processing session: %s\n", sessionReceiver.SessionID())

// Process session messages
messages, _ := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
for _, msg := range messages {
    processMessage(msg)
    sessionReceiver.CompleteMessage(context.Background(), msg, nil)
}

Long-Running Session Processing

sessionReceiver, _ := client.AcceptSessionForQueue(ctx, "myqueue", "session-123", nil)
defer sessionReceiver.Close(context.Background())

// Load initial state
state, _ := sessionReceiver.GetSessionState(context.Background(), nil)
processedCount := 0
if state != nil {
    json.Unmarshal(state, &processedCount)
}

for {
    // Check and renew lock if needed
    if time.Until(sessionReceiver.LockedUntil()) < time.Minute {
        err := sessionReceiver.RenewSessionLock(context.Background(), nil)
        if err != nil {
            fmt.Println("Failed to renew session lock")
            break
        }
    }

    // Receive and process messages
    messages, err := sessionReceiver.ReceiveMessages(context.Background(), 10, nil)
    if err != nil {
        break
    }

    for _, msg := range messages {
        processMessage(msg)
        sessionReceiver.CompleteMessage(context.Background(), msg, nil)
        processedCount++

        // Update session state periodically
        if processedCount%10 == 0 {
            stateBytes, _ := json.Marshal(processedCount)
            sessionReceiver.SetSessionState(context.Background(), stateBytes, nil)
        }
    }
}

Types

SessionReceiver

type SessionReceiver struct {
    // ... unexported fields
}

SessionReceiver is a Receiver that handles sessions with state management and FIFO ordering.

SessionReceiverOptions

type SessionReceiverOptions struct {
    ReceiveMode ReceiveMode
}

Options for creating a SessionReceiver:

  • ReceiveMode: Controls when messages are deleted (ReceiveModePeekLock by default, or ReceiveModeReceiveAndDelete)

More information about receive modes: https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations

GetSessionStateOptions

type GetSessionStateOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting session state.

SetSessionStateOptions

type SetSessionStateOptions struct {
    // Currently empty, reserved for future expansion
}

Options for setting session state.

RenewSessionLockOptions

type RenewSessionLockOptions struct {
    // Currently empty, reserved for future expansion
}

Options for renewing session lock.