tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Queue management operations in the admin client enable creating, configuring, updating, and monitoring Service Bus queues.
func (ac *Client) CreateQueue(ctx context.Context, queueName string, options *CreateQueueOptions) (CreateQueueResponse, error)Creates a queue with configurable properties.
Example:
resp, err := adminClient.CreateQueue(context.Background(), "myqueue", &admin.CreateQueueOptions{
Properties: &admin.QueueProperties{
LockDuration: to.Ptr("PT1M"), // 1 minute
MaxSizeInMegabytes: to.Ptr(int32(2048)),
RequiresDuplicateDetection: to.Ptr(true),
RequiresSession: to.Ptr(false),
DefaultMessageTimeToLive: to.Ptr("P14D"), // 14 days
DeadLetteringOnMessageExpiration: to.Ptr(true),
DuplicateDetectionHistoryTimeWindow: to.Ptr("PT10M"), // 10 minutes
MaxDeliveryCount: to.Ptr(int32(10)),
EnableBatchedOperations: to.Ptr(true),
EnablePartitioning: to.Ptr(true),
},
})
if err != nil {
panic(err)
}
fmt.Printf("Created queue: %s\n", resp.QueueName)func (ac *Client) GetQueue(ctx context.Context, queueName string, options *GetQueueOptions) (*GetQueueResponse, error)Gets queue configuration properties by name. Returns nil response and nil error if the queue does not exist.
Example:
resp, err := adminClient.GetQueue(context.Background(), "myqueue", nil)
if err != nil {
panic(err)
}
if resp == nil {
fmt.Println("Queue does not exist")
return
}
fmt.Printf("Queue: %s\n", resp.QueueName)
if resp.MaxDeliveryCount != nil {
fmt.Printf("Max Delivery Count: %d\n", *resp.MaxDeliveryCount)
}
if resp.LockDuration != nil {
fmt.Printf("Lock Duration: %s\n", *resp.LockDuration)
}func (ac *Client) GetQueueRuntimeProperties(ctx context.Context, queueName string, options *GetQueueRuntimePropertiesOptions) (*GetQueueRuntimePropertiesResponse, error)Gets runtime statistics for a queue including message counts and size. Returns nil response and nil error if the queue does not exist.
Example:
resp, err := adminClient.GetQueueRuntimeProperties(context.Background(), "myqueue", nil)
if err != nil {
panic(err)
}
if resp != nil {
fmt.Printf("Queue: %s\n", resp.QueueName)
fmt.Printf("Active Messages: %d\n", resp.ActiveMessageCount)
fmt.Printf("Dead Letter Messages: %d\n", resp.DeadLetterMessageCount)
fmt.Printf("Scheduled Messages: %d\n", resp.ScheduledMessageCount)
fmt.Printf("Total Messages: %d\n", resp.TotalMessageCount)
fmt.Printf("Size: %d bytes\n", resp.SizeInBytes)
fmt.Printf("Created At: %s\n", resp.CreatedAt)
fmt.Printf("Updated At: %s\n", resp.UpdatedAt)
}func (ac *Client) UpdateQueue(ctx context.Context, queueName string, properties QueueProperties, options *UpdateQueueOptions) (UpdateQueueResponse, error)Updates an existing queue's configuration properties.
Example:
// Get current properties
queueResp, err := adminClient.GetQueue(context.Background(), "myqueue", nil)
if err != nil {
panic(err)
}
// Modify specific properties
props := queueResp.QueueProperties
props.MaxDeliveryCount = to.Ptr(int32(5))
props.DefaultMessageTimeToLive = to.Ptr("P7D") // 7 days
// Update queue
updateResp, err := adminClient.UpdateQueue(context.Background(), "myqueue", props, nil)
if err != nil {
panic(err)
}
fmt.Printf("Updated queue: %s\n", updateResp.QueueName)func (ac *Client) DeleteQueue(ctx context.Context, queueName string, options *DeleteQueueOptions) (DeleteQueueResponse, error)Deletes a queue.
Example:
_, err := adminClient.DeleteQueue(context.Background(), "myqueue", nil)
if err != nil {
panic(err)
}
fmt.Println("Queue deleted successfully")func (ac *Client) NewListQueuesPager(options *ListQueuesOptions) *runtime.Pager[ListQueuesResponse]Creates a pager for listing all queues with their configuration properties.
Example:
pager := adminClient.NewListQueuesPager(&admin.ListQueuesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, queue := range page.Queues {
fmt.Printf("Queue: %s\n", queue.QueueName)
if queue.RequiresSession != nil && *queue.RequiresSession {
fmt.Println(" - Session-enabled")
}
if queue.EnablePartitioning != nil && *queue.EnablePartitioning {
fmt.Println(" - Partitioned")
}
}
}func (ac *Client) NewListQueuesRuntimePropertiesPager(options *ListQueuesRuntimePropertiesOptions) *runtime.Pager[ListQueuesRuntimePropertiesResponse]Creates a pager for listing runtime properties of all queues.
Example:
pager := adminClient.NewListQueuesRuntimePropertiesPager(&admin.ListQueuesRuntimePropertiesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, queueProps := range page.QueueRuntimeProperties {
fmt.Printf("%s: %d active, %d dead-letter, %d scheduled\n",
queueProps.QueueName,
queueProps.ActiveMessageCount,
queueProps.DeadLetterMessageCount,
queueProps.ScheduledMessageCount,
)
}
}type QueueProperties struct {
LockDuration *string
MaxSizeInMegabytes *int32
RequiresDuplicateDetection *bool
RequiresSession *bool
DefaultMessageTimeToLive *string
DeadLetteringOnMessageExpiration *bool
DuplicateDetectionHistoryTimeWindow *string
MaxDeliveryCount *int32
EnableBatchedOperations *bool
Status *EntityStatus
AutoDeleteOnIdle *string
EnablePartitioning *bool
ForwardTo *string
ForwardDeadLetteredMessagesTo *string
UserMetadata *string
AuthorizationRules []AuthorizationRule
MaxMessageSizeInKilobytes *int64
}Configuration properties for a queue:
type QueueRuntimeProperties struct {
SizeInBytes int64
CreatedAt time.Time
UpdatedAt time.Time
AccessedAt time.Time
TotalMessageCount int64
ActiveMessageCount int32
DeadLetterMessageCount int32
ScheduledMessageCount int32
TransferDeadLetterMessageCount int32
TransferMessageCount int32
}Runtime statistics for a queue:
type CreateQueueOptions struct {
Properties *QueueProperties
}Options for creating a queue:
type CreateQueueResponse struct {
QueueName string
QueueProperties
}Response from creating a queue, includes the queue name and all properties.
type GetQueueOptions struct {
// Currently empty, reserved for future expansion
}Options for getting a queue.
type GetQueueResponse struct {
QueueName string
QueueProperties
}Response from getting a queue, includes the queue name and all properties.
type GetQueueRuntimePropertiesOptions struct {
// Currently empty, reserved for future expansion
}Options for getting queue runtime properties.
type GetQueueRuntimePropertiesResponse struct {
QueueName string
QueueRuntimeProperties
}Response from getting queue runtime properties, includes the queue name and all runtime statistics.
type UpdateQueueOptions struct {
// Currently empty, reserved for future expansion
}Options for updating a queue.
type UpdateQueueResponse struct {
QueueName string
QueueProperties
}Response from updating a queue, includes the queue name and updated properties.
type DeleteQueueOptions struct {
// Currently empty, reserved for future expansion
}Options for deleting a queue.
type DeleteQueueResponse struct {
// Empty response
}Response from deleting a queue.
type ListQueuesOptions struct {
MaxPageSize int32
}Options for listing queues:
type ListQueuesResponse struct {
Queues []QueueItem
}Response from listing queues.
type QueueItem struct {
QueueName string
QueueProperties
}A single queue item in the list response.
type ListQueuesRuntimePropertiesOptions struct {
MaxPageSize int32
}Options for listing queue runtime properties:
type ListQueuesRuntimePropertiesResponse struct {
QueueRuntimeProperties []QueueRuntimePropertiesItem
}Response from listing queue runtime properties.
type QueueRuntimePropertiesItem struct {
QueueName string
QueueRuntimeProperties
}A single queue runtime properties item in the list response.
resp, err := adminClient.CreateQueue(context.Background(), "orders-queue", &admin.CreateQueueOptions{
Properties: &admin.QueueProperties{
RequiresSession: to.Ptr(true),
LockDuration: to.Ptr("PT5M"),
MaxDeliveryCount: to.Ptr(int32(3)),
DefaultMessageTimeToLive: to.Ptr("P1D"),
},
})// Create dead letter destination
adminClient.CreateQueue(context.Background(), "failed-messages", nil)
// Create main queue with dead letter forwarding
resp, err := adminClient.CreateQueue(context.Background(), "processing-queue", &admin.CreateQueueOptions{
Properties: &admin.QueueProperties{
MaxDeliveryCount: to.Ptr(int32(5)),
ForwardDeadLetteredMessagesTo: to.Ptr("failed-messages"),
},
})resp, err := adminClient.CreateQueue(context.Background(), "high-throughput-queue", &admin.CreateQueueOptions{
Properties: &admin.QueueProperties{
EnablePartitioning: to.Ptr(true),
MaxSizeInMegabytes: to.Ptr(int32(5120)), // Larger size for partitioned queues
},
})resp, err := adminClient.CreateQueue(context.Background(), "idempotent-queue", &admin.CreateQueueOptions{
Properties: &admin.QueueProperties{
RequiresDuplicateDetection: to.Ptr(true),
DuplicateDetectionHistoryTimeWindow: to.Ptr("PT1H"), // 1 hour window
},
})func monitorQueueHealth(adminClient *admin.Client, queueName string) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
resp, err := adminClient.GetQueueRuntimeProperties(context.Background(), queueName, nil)
if err != nil {
log.Printf("Error getting queue properties: %v", err)
continue
}
// Alert on dead letter messages
if resp.DeadLetterMessageCount > 10 {
log.Printf("WARNING: %d messages in dead letter queue", resp.DeadLetterMessageCount)
}
// Alert on queue size
if resp.SizeInBytes > 1024*1024*1024 { // 1 GB
log.Printf("WARNING: Queue size is %d bytes", resp.SizeInBytes)
}
// Monitor message backlog
if resp.ActiveMessageCount > 1000 {
log.Printf("High message count: %d active messages", resp.ActiveMessageCount)
}
}
}func scaleQueueDeliveryAttempts(adminClient *admin.Client, queueName string, maxDelivery int32) error {
// Get current properties
resp, err := adminClient.GetQueue(context.Background(), queueName, nil)
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("queue not found")
}
// Update max delivery count
props := resp.QueueProperties
props.MaxDeliveryCount = to.Ptr(maxDelivery)
_, err = adminClient.UpdateQueue(context.Background(), queueName, props, nil)
return err
}