tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Azure Service Bus Go SDK provides structured error handling with specific error codes and comprehensive logging support for diagnosing issues.
The SDK uses a custom Error type that includes a Code field for programmatic error handling.
type Error struct {
Code Code
// ... unexported fields
}The Error type represents a Service Bus-specific error. The Code field is part of the published API and can be used to handle different error scenarios programmatically. The error message from Error() and underlying wrapped errors are subject to change.
Example:
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) {
switch sbErr.Code {
case azservicebus.CodeLockLost:
fmt.Println("Message lock expired")
case azservicebus.CodeConnectionLost:
fmt.Println("Connection lost, retries exhausted")
case azservicebus.CodeUnauthorizedAccess:
fmt.Println("Invalid or expired credentials")
default:
fmt.Printf("Service Bus error: %v\n", err)
}
} else {
fmt.Printf("Non-Service Bus error: %v\n", err)
}
}type Code stringError codes enable programmatic handling of different failure scenarios.
const CodeUnauthorizedAccess CodeThe credentials provided are not valid for use with a particular entity, or have expired. This typically indicates:
Example:
sender, err := client.NewSender("restricted-queue", nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeUnauthorizedAccess {
fmt.Println("Access denied - check credentials and permissions")
}
}const CodeConnectionLost CodeThe connection was lost and all retry attempts failed. This reflects an extended outage or connection disruption and may require manual intervention. Common causes:
Example:
err := sender.SendMessage(context.Background(), msg, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeConnectionLost {
fmt.Println("Connection lost - check network and Service Bus availability")
// Implement reconnection logic or alerting
}
}const CodeLockLost CodeThe lock token for a message has expired. The message will be available again after the lock period expires, or may go to the dead letter queue if delivery attempts have been exceeded. This occurs when:
Example:
err := receiver.CompleteMessage(context.Background(), msg, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
fmt.Println("Message lock expired - message will be redelivered")
// Don't retry settlement - message is already unlocked
}
}const CodeTimeout CodeThe service timed out during an operation. Common scenarios:
AcceptNextSessionForQueue() or AcceptNextSessionForSubscription() when no sessions are availableReceiveMessages() when context deadline is exceededExample:
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
sessionReceiver, err := client.AcceptNextSessionForQueue(ctx, "myqueue", nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
fmt.Println("No available sessions within timeout period")
// This is expected when no sessions are ready
}
}const CodeNotFound CodeThe entity (queue, topic, or subscription) does not exist. This occurs when:
Example:
receiver, err := client.NewReceiverForQueue("nonexistent-queue", nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeNotFound {
fmt.Println("Queue does not exist - check entity name")
}
}const CodeClosed CodeThe link or connection for this sender/receiver has been closed. When using ReceiveModeReceiveAndDelete, this code indicates the receiver's internal cache has been drained after closing.
Example:
receiver, _ := client.NewReceiverForQueue("myqueue", &azservicebus.ReceiverOptions{
ReceiveMode: azservicebus.ReceiveModeReceiveAndDelete,
})
// Close receiver
receiver.Close(context.Background())
// Can still drain cache in ReceiveAndDelete mode
for {
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeClosed {
fmt.Println("Receiver closed and cache drained")
break
}
}
// Process cached messages
}var ErrMessageTooLarge = errors.New("the message is too large")ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.AddMessage() or if the message is being sent on its own and exceeds the maximum link size.
Example:
batch, _ := sender.NewMessageBatch(context.Background(), nil)
for _, msg := range messages {
err := batch.AddMessage(msg, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
// Batch is full, send it
if batch.NumMessages() > 0 {
sender.SendMessageBatch(context.Background(), batch, nil)
}
// Create new batch and retry
batch, _ = sender.NewMessageBatch(context.Background(), nil)
err = batch.AddMessage(msg, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
// Single message is too large
fmt.Printf("Message exceeds maximum size: %d bytes\n", len(msg.Body))
continue
}
}
}
// Send final batch
if batch.NumMessages() > 0 {
sender.SendMessageBatch(context.Background(), batch, nil)
}type RetryOptions struct {
MaxRetries int
RetryDelay time.Duration
MaxRetryDelay time.Duration
}Configure automatic retry behavior for operations with exponential backoff.
Example:
client, err := azservicebus.NewClient(
"myservicebus.servicebus.windows.net",
credential,
&azservicebus.ClientOptions{
RetryOptions: azservicebus.RetryOptions{
MaxRetries: 5,
RetryDelay: time.Second * 2,
MaxRetryDelay: time.Second * 60,
},
},
)The SDK automatically retries transient failures (network issues, throttling, etc.) up to MaxRetries times with exponential backoff starting at RetryDelay and capped at MaxRetryDelay. After all retries are exhausted, operations return the last error encountered.
The SDK uses the Azure SDK for Go logging framework. Enable logging to diagnose connection issues, message flow, and errors.
Logging events are classified by operation type for filtering.
const EventConnEventConn is used whenever the SDK creates a connection or any links (receivers, senders). Use this to debug connection establishment and link creation.
const EventAuthEventAuth is used when the SDK performs authentication or claims negotiation. Use this to debug credential and permission issues.
const EventReceiverEventReceiver represents operations that happen on Receivers. Use this to track message receiving, settlement, and receiver lifecycle.
const EventSenderEventSender represents operations that happen on Senders. Use this to track message sending, batching, and sender lifecycle.
const EventAdminEventAdmin is used for operations in the azservicebus/admin.Client. Use this to track entity management operations.
import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"log"
)
// Enable logging for all Service Bus events
log.SetEvents(
azservicebus.EventConn,
azservicebus.EventAuth,
azservicebus.EventReceiver,
azservicebus.EventSender,
)
// Set custom listener
log.SetListener(func(event log.Event, message string) {
log.Printf("[%s] %s", event, message)
})// Only log connection and authentication events
log.SetEvents(
azservicebus.EventConn,
azservicebus.EventAuth,
)
// Log to different outputs based on event type
log.SetListener(func(event log.Event, message string) {
switch event {
case azservicebus.EventAuth:
securityLogger.Printf("[AUTH] %s", message)
case azservicebus.EventSender, azservicebus.EventReceiver:
messageLogger.Printf("[MSG] %s", message)
default:
generalLogger.Printf("[%s] %s", event, message)
}
})func processMessages(receiver *azservicebus.Receiver) error {
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) {
switch sbErr.Code {
case azservicebus.CodeConnectionLost:
// Wait and retry
time.Sleep(time.Second * 5)
return processMessages(receiver)
case azservicebus.CodeUnauthorizedAccess:
// Fatal error - credential issue
return fmt.Errorf("authentication failed: %w", err)
}
}
return err
}
for _, msg := range messages {
err := processMessage(msg)
if err != nil {
// Processing failed
receiver.AbandonMessage(context.Background(), msg, nil)
continue
}
// Success
err = receiver.CompleteMessage(context.Background(), msg, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeLockLost {
// Lock expired during processing
fmt.Println("Message lock lost - will be redelivered")
continue
}
return err
}
}
return nil
}func acceptNextSession(client *azservicebus.Client, queueName string) (*azservicebus.SessionReceiver, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
receiver, err := client.AcceptNextSessionForQueue(ctx, queueName, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) && sbErr.Code == azservicebus.CodeTimeout {
// No sessions available - this is expected
return nil, nil
}
return nil, err
}
return receiver, nil
}func sendWithFallback(sender *azservicebus.Sender, msg *azservicebus.Message) error {
err := sender.SendMessage(context.Background(), msg, nil)
if err != nil {
var sbErr *azservicebus.Error
if errors.As(err, &sbErr) {
switch sbErr.Code {
case azservicebus.CodeConnectionLost:
// Store message locally for retry
return storeForRetry(msg)
case azservicebus.CodeUnauthorizedAccess:
// Try to refresh credentials
return refreshCredentialsAndRetry(sender, msg)
}
}
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
// Compress or split message
return sendCompressed(sender, msg)
}
return err
}
return nil
}