tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
The Receiver type provides methods for receiving messages from queues or subscriptions using a pull-based pattern, with support for message settlement, peeking, and deferred message retrieval.
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)Receives up to maxMessages messages from a queue or subscription. This function blocks until at least one message is received or the context is cancelled. Returns an *Error if the operation fails.
When using ReceiveModeReceiveAndDelete, you can continue calling ReceiveMessages even after the Receiver has been closed to drain the internal cache. When all cached messages are consumed, it returns an *Error with Code == CodeClosed.
Example:
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
panic(err)
}
for _, msg := range messages {
fmt.Printf("Received: %s\n", string(msg.Body))
// Process message...
}func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) errorCompletes a message, deleting it from the queue or subscription. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.
Example:
for _, msg := range messages {
// Process message
err := processMessage(msg)
if err == nil {
// Successfully processed, complete it
err = receiver.CompleteMessage(context.Background(), msg, nil)
if err != nil {
panic(err)
}
}
}func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) errorAbandons a message, making it available again from the queue or subscription. This increments the delivery count and may cause the message to be dead-lettered if the maximum delivery count is exceeded. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.
Example:
err := processMessage(msg)
if err != nil {
// Processing failed, abandon for retry
err = receiver.AbandonMessage(context.Background(), msg, &azservicebus.AbandonMessageOptions{
PropertiesToModify: map[string]any{
"error": err.Error(),
},
})
if err != nil {
panic(err)
}
}func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) errorDefers a message for later processing. Deferred messages can be retrieved using ReceiveDeferredMessages with their sequence numbers. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.
Example:
// Defer message for later processing
err := receiver.DeferMessage(context.Background(), msg, &azservicebus.DeferMessageOptions{
PropertiesToModify: map[string]any{
"deferredReason": "waiting for dependency",
},
})
if err != nil {
panic(err)
}
// Store sequence number for later retrieval
sequenceNumber := *msg.SequenceNumberfunc (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) errorMoves a message to the dead letter queue for the queue or subscription. Dead-lettered messages can be received by creating a receiver with ReceiverOptions.SubQueue set to SubQueueDeadLetter. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.
Example:
err := processMessage(msg)
if err != nil {
// Permanent failure, move to dead letter queue
err = receiver.DeadLetterMessage(context.Background(), msg, &azservicebus.DeadLetterOptions{
Reason: to.Ptr("ProcessingFailed"),
ErrorDescription: to.Ptr(err.Error()),
PropertiesToModify: map[string]any{
"failedAt": time.Now(),
},
})
if err != nil {
panic(err)
}
}func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, options *RenewMessageLockOptions) errorRenews the lock on a message, updating the LockedUntil field on the message. Use this when processing a message takes longer than the lock duration. This method can only be used when the Receiver was opened with ReceiveModePeekLock. Returns an *Error if the operation fails.
Example:
msg := messages[0]
// Start processing
for i := 0; i < 10; i++ {
// Check if lock is about to expire
if time.Until(*msg.LockedUntil) < time.Second*10 {
// Renew the lock
err := receiver.RenewMessageLock(context.Background(), msg, nil)
if err != nil {
panic(err)
}
}
// Continue processing
time.Sleep(time.Second * 5)
}
// Complete when done
receiver.CompleteMessage(context.Background(), msg, nil)func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)Receives messages that were previously deferred using DeferMessage. Provide the sequence numbers of the deferred messages to retrieve them. Returns an *Error if the operation fails.
Example:
// Retrieve previously deferred messages
deferredMessages, err := receiver.ReceiveDeferredMessages(
context.Background(),
[]int64{12345, 12346, 12347},
nil,
)
if err != nil {
panic(err)
}
for _, msg := range deferredMessages {
// Process deferred message
err := processMessage(msg)
if err == nil {
receiver.CompleteMessage(context.Background(), msg, nil)
}
}func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)Peeks messages without locking or deleting them. This allows you to browse messages without affecting their state. Peeked messages cannot be settled with CompleteMessage, AbandonMessage, DeferMessage, or DeadLetterMessage. Returns an *Error if the operation fails.
The Receiver stores the last peeked sequence number internally and uses it as the start location for the next PeekMessages call. You can override this by setting PeekMessagesOptions.FromSequenceNumber.
Example:
// Peek at next 10 messages
messages, err := receiver.PeekMessages(context.Background(), 10, nil)
if err != nil {
panic(err)
}
for _, msg := range messages {
fmt.Printf("Peeked message: %s (SeqNum: %d)\n", string(msg.Body), *msg.SequenceNumber)
}
// Peek from specific sequence number
messages, err = receiver.PeekMessages(context.Background(), 10, &azservicebus.PeekMessagesOptions{
FromSequenceNumber: to.Ptr(int64(1000)),
})For more information about message browsing, see https://aka.ms/azsdk/servicebus/message-browsing
func (r *Receiver) Close(ctx context.Context) errorPermanently closes the receiver and releases resources. Always call Close when done with the receiver.
Example:
receiver, _ := client.NewReceiverForQueue("myqueue", nil)
defer receiver.Close(context.Background())The receive mode controls when messages are deleted from Service Bus.
const ReceiveModePeekLock ReceiveModeDefault mode. Messages are locked when received, preventing multiple receivers from processing the same message. You control the message lifecycle using settlement methods:
CompleteMessage - Removes the message from Service BusAbandonMessage - Releases the message for redeliveryDeferMessage - Defers the message for later retrievalDeadLetterMessage - Moves the message to the dead letter queueThe lock has a duration (default 1 minute, configurable in queue/subscription properties). Use RenewMessageLock to extend the lock if processing takes longer.
More information: https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
const ReceiveModeReceiveAndDelete ReceiveModeMessages are automatically deleted as soon as they are received. Settlement methods (CompleteMessage, AbandonMessage, etc.) cannot be used with this mode.
This mode allows you to continue calling ReceiveMessages even after the Receiver has been closed, to drain the internal cache. When all cached messages are consumed, ReceiveMessages returns an *Error with Code == CodeClosed.
Sub-queues provide access to special message queues associated with the main queue or subscription.
const SubQueueDeadLetter SubQueue = 1Targets the dead letter queue for a queue or subscription. Dead-lettered messages accumulate here when:
DeadLetterMessageExample:
// Receive from dead letter queue
dlqReceiver, err := client.NewReceiverForQueue("myqueue", &azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
})
if err != nil {
panic(err)
}
defer dlqReceiver.Close(context.Background())
messages, _ := dlqReceiver.ReceiveMessages(context.Background(), 10, nil)
for _, msg := range messages {
fmt.Printf("Dead letter reason: %s\n", *msg.DeadLetterReason)
fmt.Printf("Dead letter description: %s\n", *msg.DeadLetterErrorDescription)
}const SubQueueTransfer SubQueue = 2Targets the transfer dead letter queue for a queue or subscription. Messages that fail to be forwarded (auto-forwarding) accumulate here.
type Receiver struct {
// ... unexported fields
}Receiver receives messages using pull-based functions.
type ReceiverOptions struct {
ReceiveMode ReceiveMode
SubQueue SubQueue
}Options for creating a Receiver:
ReceiveModePeekLock or ReceiveModeReceiveAndDelete)SubQueueDeadLetter or SubQueueTransfertype ReceiveMessagesOptions struct {
TimeAfterFirstMessage time.Duration
}Options for receiving messages:
type CompleteMessageOptions struct {
// Currently empty, reserved for future expansion
}Options for completing a message.
type AbandonMessageOptions struct {
PropertiesToModify map[string]any
}Options for abandoning a message:
type DeferMessageOptions struct {
PropertiesToModify map[string]any
}Options for deferring a message:
type DeadLetterOptions struct {
ErrorDescription *string
Reason *string
PropertiesToModify map[string]any
}Options for dead-lettering a message:
type RenewMessageLockOptions struct {
// Currently empty, reserved for future expansion
}Options for renewing a message lock.
type ReceiveDeferredMessagesOptions struct {
// Currently empty, reserved for future expansion
}Options for receiving deferred messages.
type PeekMessagesOptions struct {
FromSequenceNumber *int64
}Options for peeking messages:
type ReceiveMode intRepresents the lock style for a receiver.
type SubQueue intAllows you to target a sub-queue of a queue or subscription.