or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus@v1.10.0

docs

client.mderrors.mdindex.mdmessaging.mdreceiver.mdsender.mdsessions.md
tile.json

tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus

tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1

Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.

receiver.mddocs/

Receiving Messages

The Receiver type provides methods for receiving messages from queues or subscriptions using a pull-based pattern, with support for message settlement, peeking, and deferred message retrieval.

Capabilities

Receive Messages

func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

Receives up to maxMessages messages from a queue or subscription. This function blocks until at least one message is received or the context is cancelled. Returns an *Error if the operation fails.

When using ReceiveModeReceiveAndDelete, you can continue calling ReceiveMessages even after the Receiver has been closed to drain the internal cache. When all cached messages are consumed, it returns an *Error with Code == CodeClosed.

Example:

messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
    panic(err)
}

for _, msg := range messages {
    fmt.Printf("Received: %s\n", string(msg.Body))
    // Process message...
}

Complete Message

func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error

Completes a message, deleting it from the queue or subscription. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.

Example:

for _, msg := range messages {
    // Process message
    err := processMessage(msg)
    if err == nil {
        // Successfully processed, complete it
        err = receiver.CompleteMessage(context.Background(), msg, nil)
        if err != nil {
            panic(err)
        }
    }
}

Abandon Message

func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

Abandons a message, making it available again from the queue or subscription. This increments the delivery count and may cause the message to be dead-lettered if the maximum delivery count is exceeded. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.

Example:

err := processMessage(msg)
if err != nil {
    // Processing failed, abandon for retry
    err = receiver.AbandonMessage(context.Background(), msg, &azservicebus.AbandonMessageOptions{
        PropertiesToModify: map[string]any{
            "error": err.Error(),
        },
    })
    if err != nil {
        panic(err)
    }
}

Defer Message

func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

Defers a message for later processing. Deferred messages can be retrieved using ReceiveDeferredMessages with their sequence numbers. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.

Example:

// Defer message for later processing
err := receiver.DeferMessage(context.Background(), msg, &azservicebus.DeferMessageOptions{
    PropertiesToModify: map[string]any{
        "deferredReason": "waiting for dependency",
    },
})
if err != nil {
    panic(err)
}

// Store sequence number for later retrieval
sequenceNumber := *msg.SequenceNumber

Dead Letter Message

func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

Moves a message to the dead letter queue for the queue or subscription. Dead-lettered messages can be received by creating a receiver with ReceiverOptions.SubQueue set to SubQueueDeadLetter. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.

Example:

err := processMessage(msg)
if err != nil {
    // Permanent failure, move to dead letter queue
    err = receiver.DeadLetterMessage(context.Background(), msg, &azservicebus.DeadLetterOptions{
        Reason:           to.Ptr("ProcessingFailed"),
        ErrorDescription: to.Ptr(err.Error()),
        PropertiesToModify: map[string]any{
            "failedAt": time.Now(),
        },
    })
    if err != nil {
        panic(err)
    }
}

Renew Message Lock

func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, options *RenewMessageLockOptions) error

Renews the lock on a message, updating the LockedUntil field on the message. Use this when processing a message takes longer than the lock duration. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.

Example:

msg := messages[0]

// Start processing
for i := 0; i < 10; i++ {
    // Check if lock is about to expire
    if time.Until(*msg.LockedUntil) < time.Second*10 {
        // Renew the lock
        err := receiver.RenewMessageLock(context.Background(), msg, nil)
        if err != nil {
            panic(err)
        }
    }

    // Continue processing
    time.Sleep(time.Second * 5)
}

// Complete when done
receiver.CompleteMessage(context.Background(), msg, nil)

Receive Deferred Messages

func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)

Receives messages that were previously deferred using DeferMessage. Provide the sequence numbers of the deferred messages to retrieve them. Returns an *Error if the operation fails.

Example:

// Retrieve previously deferred messages
deferredMessages, err := receiver.ReceiveDeferredMessages(
    context.Background(),
    []int64{12345, 12346, 12347},
    nil,
)
if err != nil {
    panic(err)
}

for _, msg := range deferredMessages {
    // Process deferred message
    err := processMessage(msg)
    if err == nil {
        receiver.CompleteMessage(context.Background(), msg, nil)
    }
}

Peek Messages

func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

Peeks messages without locking or deleting them. This allows you to browse messages without affecting their state. Peeked messages cannot be settled with CompleteMessage, AbandonMessage, DeferMessage, or DeadLetterMessage. Returns an *Error if the operation fails.

The Receiver stores the last peeked sequence number internally and uses it as the start location for the next PeekMessages call. You can override this by setting PeekMessagesOptions.FromSequenceNumber.

Example:

// Peek at next 10 messages
messages, err := receiver.PeekMessages(context.Background(), 10, nil)
if err != nil {
    panic(err)
}

for _, msg := range messages {
    fmt.Printf("Peeked message: %s (SeqNum: %d)\n", string(msg.Body), *msg.SequenceNumber)
}

// Peek from specific sequence number
messages, err = receiver.PeekMessages(context.Background(), 10, &azservicebus.PeekMessagesOptions{
    FromSequenceNumber: to.Ptr(int64(1000)),
})

For more information about message browsing, see https://aka.ms/azsdk/servicebus/message-browsing

Close Receiver

func (r *Receiver) Close(ctx context.Context) error

Permanently closes the receiver and releases resources. Always call Close when done with the receiver.

Example:

receiver, _ := client.NewReceiverForQueue("myqueue", nil)
defer receiver.Close(context.Background())

Receive Modes

The receive mode controls when messages are deleted from Service Bus.

ReceiveModePeekLock

const ReceiveModePeekLock ReceiveMode

Default mode. Messages are locked when received, preventing multiple receivers from processing the same message. You control the message lifecycle using settlement methods:

  • CompleteMessage - Removes the message from Service Bus
  • AbandonMessage - Releases the message for redelivery
  • DeferMessage - Defers the message for later retrieval
  • DeadLetterMessage - Moves the message to the dead letter queue

The lock has a duration (default 1 minute, configurable in queue/subscription properties). Use RenewMessageLock to extend the lock if processing takes longer.

More information: https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations

ReceiveModeReceiveAndDelete

const ReceiveModeReceiveAndDelete ReceiveMode

Messages are automatically deleted as soon as they are received. Settlement methods (CompleteMessage, AbandonMessage, etc.) cannot be used with this mode.

This mode allows you to continue calling ReceiveMessages even after the Receiver has been closed, to drain the internal cache. When all cached messages are consumed, ReceiveMessages returns an *Error with Code == CodeClosed.

Sub-Queues

Sub-queues provide access to special message queues associated with the main queue or subscription.

SubQueueDeadLetter

const SubQueueDeadLetter SubQueue = 1

Targets the dead letter queue for a queue or subscription. Dead-lettered messages accumulate here when:

  • Maximum delivery count is exceeded
  • Messages are explicitly dead-lettered using DeadLetterMessage
  • Messages expire with dead-lettering enabled

Example:

// Receive from dead letter queue
dlqReceiver, err := client.NewReceiverForQueue("myqueue", &azservicebus.ReceiverOptions{
    SubQueue: azservicebus.SubQueueDeadLetter,
})
if err != nil {
    panic(err)
}
defer dlqReceiver.Close(context.Background())

messages, _ := dlqReceiver.ReceiveMessages(context.Background(), 10, nil)
for _, msg := range messages {
    fmt.Printf("Dead letter reason: %s\n", *msg.DeadLetterReason)
    fmt.Printf("Dead letter description: %s\n", *msg.DeadLetterErrorDescription)
}

SubQueueTransfer

const SubQueueTransfer SubQueue = 2

Targets the transfer dead letter queue for a queue or subscription. Messages that fail to be forwarded (auto-forwarding) accumulate here.

Types

Receiver

type Receiver struct {
    // ... unexported fields
}

Receiver receives messages using pull-based functions.

ReceiverOptions

type ReceiverOptions struct {
    ReceiveMode ReceiveMode
    SubQueue SubQueue
}

Options for creating a Receiver:

  • ReceiveMode: Controls when messages are deleted (ReceiveModePeekLock or ReceiveModeReceiveAndDelete)
  • SubQueue: Target a sub-queue like SubQueueDeadLetter or SubQueueTransfer

ReceiveMessagesOptions

type ReceiveMessagesOptions struct {
    TimeAfterFirstMessage time.Duration
}

Options for receiving messages:

  • TimeAfterFirstMessage: Controls how long to wait after receiving the first message before returning the accumulated batch. Default depends on receive mode: 20ms for PeekLock, 1s for ReceiveAndDelete.

CompleteMessageOptions

type CompleteMessageOptions struct {
    // Currently empty, reserved for future expansion
}

Options for completing a message.

AbandonMessageOptions

type AbandonMessageOptions struct {
    PropertiesToModify map[string]any
}

Options for abandoning a message:

  • PropertiesToModify: Properties to modify in the message when it is abandoned

DeferMessageOptions

type DeferMessageOptions struct {
    PropertiesToModify map[string]any
}

Options for deferring a message:

  • PropertiesToModify: Properties to modify in the message when it is deferred

DeadLetterOptions

type DeadLetterOptions struct {
    ErrorDescription *string
    Reason *string
    PropertiesToModify map[string]any
}

Options for dead-lettering a message:

  • ErrorDescription: Error description that caused the dead lettering
  • Reason: Reason for dead-lettering the message
  • PropertiesToModify: Properties to modify in the message when it is dead-lettered

RenewMessageLockOptions

type RenewMessageLockOptions struct {
    // Currently empty, reserved for future expansion
}

Options for renewing a message lock.

ReceiveDeferredMessagesOptions

type ReceiveDeferredMessagesOptions struct {
    // Currently empty, reserved for future expansion
}

Options for receiving deferred messages.

PeekMessagesOptions

type PeekMessagesOptions struct {
    FromSequenceNumber *int64
}

Options for peeking messages:

  • FromSequenceNumber: Override the starting sequence number for peeking (by default, continues from the last peeked position)

ReceiveMode

type ReceiveMode int

Represents the lock style for a receiver.

SubQueue

type SubQueue int

Allows you to target a sub-queue of a queue or subscription.