tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Azure Service Bus is a highly reliable cloud messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers. This library enables Go applications to send and receive messages through queues and topics/subscriptions with advanced features like sessions, batching, dead letter queues, message scheduling, and comprehensive error handling.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebusimport (
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"context"
)For entity management (creating queues, topics, subscriptions):
import "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"package main
import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func main() {
// Create client using Azure identity
credential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(
"myservicebus.servicebus.windows.net",
credential,
nil,
)
if err != nil {
panic(err)
}
defer client.Close(context.Background())
// Send a message to a queue
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.Background())
err = sender.SendMessage(context.Background(), &azservicebus.Message{
Body: []byte("Hello, Service Bus!"),
}, nil)
if err != nil {
panic(err)
}
// Receive messages from a queue
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.Background())
messages, err := receiver.ReceiveMessages(context.Background(), 10, nil)
if err != nil {
panic(err)
}
for _, msg := range messages {
fmt.Printf("Received: %s\n", string(msg.Body))
// Complete the message to remove it from the queue
err = receiver.CompleteMessage(context.Background(), msg, nil)
if err != nil {
panic(err)
}
}
}The SDK is organized into two main components:
azservicebus)Provides messaging capabilities for sending and receiving messages:
azservicebus/admin)Provides entity management for creating and configuring Service Bus resources:
Create and configure Service Bus clients with Azure identity or connection strings, supporting custom endpoints, WebSocket connections, TLS configuration, and retry policies.
func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error)
func NewClientFromConnectionString(connectionString string, options *ClientOptions) (*Client, error)Construct messages with metadata, create batches for efficient transmission, and work with advanced AMQP message types for interoperability.
type Message struct {
Body []byte
ApplicationProperties map[string]any
ContentType *string
CorrelationID *string
MessageID *string
PartitionKey *string
SessionID *string
Subject *string
TimeToLive *time.Duration
// ... additional fields
}
type MessageBatch struct { /* ... */ }
func (mb *MessageBatch) AddMessage(m *Message, options *AddMessageOptions) error
func (mb *MessageBatch) NumMessages() int32
func (mb *MessageBatch) NumBytes() uint64Send individual messages, batches, or schedule messages for future delivery with support for standard Message and advanced AMQPAnnotatedMessage types.
type Sender struct { /* ... */ }
func (s *Sender) SendMessage(ctx context.Context, message *Message, options *SendMessageOptions) error
func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch, options *SendMessageBatchOptions) error
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)
func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time, options *ScheduleMessagesOptions) ([]int64, error)
func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) errorReceive messages using pull-based patterns with peek-lock or receive-and-delete modes, supporting message settlement (complete, abandon, defer, dead-letter), message lock renewal, peeking, and deferred message retrieval.
type Receiver struct { /* ... */ }
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)
func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error
func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error
func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error
func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error)
func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)Handle session-aware receivers for ordered message processing with session state management, session lock renewal, and FIFO delivery guarantees.
type SessionReceiver struct { /* ... */ }
func (sr *SessionReceiver) SessionID() string
func (sr *SessionReceiver) LockedUntil() time.Time
func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSessionStateOptions) ([]byte, error)
func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error
func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) errorHandle Service Bus errors with specific error codes, configure retry policies with exponential backoff, and manage connection failures, lock expirations, and timeouts.
type Error struct {
Code Code
// ... unexported fields
}
type Code string
const (
CodeUnauthorizedAccess Code
CodeConnectionLost Code
CodeLockLost Code
CodeTimeout Code
CodeNotFound Code
CodeClosed Code
)
type RetryOptions struct {
MaxRetries int
RetryDelay time.Duration
MaxRetryDelay time.Duration
}Create, update, delete, and query Service Bus entities including queues, topics, subscriptions, and rules with configuration properties and runtime statistics.
// admin.Client methods
func (ac *Client) CreateQueue(ctx context.Context, queueName string, options *CreateQueueOptions) (CreateQueueResponse, error)
func (ac *Client) CreateTopic(ctx context.Context, topicName string, options *CreateTopicOptions) (CreateTopicResponse, error)
func (ac *Client) CreateSubscription(ctx context.Context, topicName, subscriptionName string, options *CreateSubscriptionOptions) (CreateSubscriptionResponse, error)
func (ac *Client) CreateRule(ctx context.Context, topicName, subscriptionName string, options *CreateRuleOptions) (CreateRuleResponse, error)