tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Subscription management operations in the admin client enable creating, configuring, updating, and monitoring topic subscriptions for publish-subscribe messaging patterns.
func (ac *Client) CreateSubscription(ctx context.Context, topicName, subscriptionName string, options *CreateSubscriptionOptions) (CreateSubscriptionResponse, error)Creates a subscription to a topic with configurable properties.
Example:
resp, err := adminClient.CreateSubscription(
context.Background(),
"events",
"processor-subscription",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
LockDuration: to.Ptr("PT1M"),
RequiresSession: to.Ptr(false),
DefaultMessageTimeToLive: to.Ptr("P7D"),
DeadLetteringOnMessageExpiration: to.Ptr(true),
EnableDeadLetteringOnFilterEvaluationExceptions: to.Ptr(true),
MaxDeliveryCount: to.Ptr(int32(5)),
EnableBatchedOperations: to.Ptr(true),
},
},
)
if err != nil {
panic(err)
}
fmt.Printf("Created subscription: %s on topic: %s\n", resp.SubscriptionName, resp.TopicName)func (ac *Client) GetSubscription(ctx context.Context, topicName, subscriptionName string, options *GetSubscriptionOptions) (*GetSubscriptionResponse, error)Gets subscription configuration properties. Returns nil response and nil error if the subscription does not exist.
Example:
resp, err := adminClient.GetSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
panic(err)
}
if resp == nil {
fmt.Println("Subscription does not exist")
return
}
fmt.Printf("Subscription: %s\n", resp.SubscriptionName)
fmt.Printf("Topic: %s\n", resp.TopicName)
if resp.MaxDeliveryCount != nil {
fmt.Printf("Max Delivery Count: %d\n", *resp.MaxDeliveryCount)
}func (ac *Client) GetSubscriptionRuntimeProperties(ctx context.Context, topicName, subscriptionName string, options *GetSubscriptionRuntimePropertiesOptions) (*GetSubscriptionRuntimePropertiesResponse, error)Gets runtime statistics for a subscription including message counts. Returns nil response and nil error if the subscription does not exist.
Example:
resp, err := adminClient.GetSubscriptionRuntimeProperties(
context.Background(),
"events",
"processor-subscription",
nil,
)
if err != nil {
panic(err)
}
if resp != nil {
fmt.Printf("Subscription: %s\n", resp.SubscriptionName)
fmt.Printf("Topic: %s\n", resp.TopicName)
fmt.Printf("Active Messages: %d\n", resp.ActiveMessageCount)
fmt.Printf("Dead Letter Messages: %d\n", resp.DeadLetterMessageCount)
fmt.Printf("Total Messages: %d\n", resp.TotalMessageCount)
fmt.Printf("Created At: %s\n", resp.CreatedAt)
}func (ac *Client) UpdateSubscription(ctx context.Context, topicName, subscriptionName string, properties SubscriptionProperties, options *UpdateSubscriptionOptions) (UpdateSubscriptionResponse, error)Updates an existing subscription's configuration properties.
Example:
// Get current properties
subResp, err := adminClient.GetSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
panic(err)
}
// Modify properties
props := subResp.SubscriptionProperties
props.MaxDeliveryCount = to.Ptr(int32(10))
props.LockDuration = to.Ptr("PT2M")
// Update subscription
updateResp, err := adminClient.UpdateSubscription(
context.Background(),
"events",
"processor-subscription",
props,
nil,
)
if err != nil {
panic(err)
}
fmt.Printf("Updated subscription: %s\n", updateResp.SubscriptionName)func (ac *Client) DeleteSubscription(ctx context.Context, topicName, subscriptionName string, options *DeleteSubscriptionOptions) (DeleteSubscriptionResponse, error)Deletes a subscription.
Example:
_, err := adminClient.DeleteSubscription(context.Background(), "events", "processor-subscription", nil)
if err != nil {
panic(err)
}
fmt.Println("Subscription deleted successfully")func (ac *Client) NewListSubscriptionsPager(topicName string, options *ListSubscriptionsOptions) *runtime.Pager[ListSubscriptionsResponse]Creates a pager for listing all subscriptions for a topic with their configuration properties.
Example:
pager := adminClient.NewListSubscriptionsPager("events", &admin.ListSubscriptionsOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, sub := range page.Subscriptions {
fmt.Printf("Subscription: %s\n", sub.SubscriptionName)
if sub.RequiresSession != nil && *sub.RequiresSession {
fmt.Println(" - Session-enabled")
}
if sub.ForwardTo != nil {
fmt.Printf(" - Forwards to: %s\n", *sub.ForwardTo)
}
}
}func (ac *Client) NewListSubscriptionsRuntimePropertiesPager(topicName string, options *ListSubscriptionsRuntimePropertiesOptions) *runtime.Pager[ListSubscriptionsRuntimePropertiesResponse]Creates a pager for listing runtime properties of all subscriptions for a topic.
Example:
pager := adminClient.NewListSubscriptionsRuntimePropertiesPager("events", &admin.ListSubscriptionsRuntimePropertiesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, subProps := range page.SubscriptionRuntimeProperties {
fmt.Printf("%s: %d active, %d dead-letter\n",
subProps.SubscriptionName,
subProps.ActiveMessageCount,
subProps.DeadLetterMessageCount,
)
}
}type SubscriptionProperties struct {
LockDuration *string
RequiresSession *bool
DefaultMessageTimeToLive *string
DeadLetteringOnMessageExpiration *bool
EnableDeadLetteringOnFilterEvaluationExceptions *bool
MaxDeliveryCount *int32
Status *EntityStatus
AutoDeleteOnIdle *string
ForwardTo *string
ForwardDeadLetteredMessagesTo *string
EnableBatchedOperations *bool
UserMetadata *string
DefaultRule *RuleProperties
}Configuration properties for a subscription:
type SubscriptionRuntimeProperties struct {
TotalMessageCount int64
ActiveMessageCount int32
DeadLetterMessageCount int32
TransferMessageCount int32
TransferDeadLetterMessageCount int32
AccessedAt time.Time
CreatedAt time.Time
UpdatedAt time.Time
}Runtime statistics for a subscription:
type CreateSubscriptionOptions struct {
Properties *SubscriptionProperties
}Options for creating a subscription:
type CreateSubscriptionResponse struct {
SubscriptionName string
TopicName string
SubscriptionProperties
}Response from creating a subscription, includes the subscription name, topic name, and all properties.
type GetSubscriptionOptions struct {
// Currently empty, reserved for future expansion
}Options for getting a subscription.
type GetSubscriptionResponse struct {
SubscriptionName string
TopicName string
SubscriptionProperties
}Response from getting a subscription, includes the subscription name, topic name, and all properties.
type GetSubscriptionRuntimePropertiesOptions struct {
// Currently empty, reserved for future expansion
}Options for getting subscription runtime properties.
type GetSubscriptionRuntimePropertiesResponse struct {
TopicName string
SubscriptionName string
SubscriptionRuntimeProperties
}Response from getting subscription runtime properties, includes the subscription name, topic name, and all runtime statistics.
type UpdateSubscriptionOptions struct {
// Currently empty, reserved for future expansion
}Options for updating a subscription.
type UpdateSubscriptionResponse struct {
TopicName string
SubscriptionName string
SubscriptionProperties
}Response from updating a subscription, includes the subscription name, topic name, and updated properties.
type DeleteSubscriptionOptions struct {
// Currently empty, reserved for future expansion
}Options for deleting a subscription.
type DeleteSubscriptionResponse struct {
// Empty response
}Response from deleting a subscription.
type ListSubscriptionsOptions struct {
MaxPageSize int32
}Options for listing subscriptions:
type ListSubscriptionsResponse struct {
Subscriptions []SubscriptionPropertiesItem
}Response from listing subscriptions.
type SubscriptionPropertiesItem struct {
TopicName string
SubscriptionName string
SubscriptionProperties
}A single subscription item in the list response.
type ListSubscriptionsRuntimePropertiesOptions struct {
MaxPageSize int32
}Options for listing subscription runtime properties:
type ListSubscriptionsRuntimePropertiesResponse struct {
SubscriptionRuntimeProperties []SubscriptionRuntimePropertiesItem
}Response from listing subscription runtime properties.
type SubscriptionRuntimePropertiesItem struct {
TopicName string
SubscriptionName string
SubscriptionRuntimeProperties
}A single subscription runtime properties item in the list response.
resp, err := adminClient.CreateSubscription(
context.Background(),
"orders",
"order-processor",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
RequiresSession: to.Ptr(true),
LockDuration: to.Ptr("PT5M"),
MaxDeliveryCount: to.Ptr(int32(3)),
DefaultMessageTimeToLive: to.Ptr("P1D"),
},
},
)// Create subscription that forwards to a queue
resp, err := adminClient.CreateSubscription(
context.Background(),
"events",
"archive-subscription",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
ForwardTo: to.Ptr("archive-queue"),
ForwardDeadLetteredMessagesTo: to.Ptr("failed-archive-queue"),
DeadLetteringOnMessageExpiration: to.Ptr(true),
},
},
)// Create subscription with a SQL filter
resp, err := adminClient.CreateSubscription(
context.Background(),
"events",
"high-priority-events",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
DefaultRule: &admin.RuleProperties{
Name: "$Default",
Filter: &admin.SQLFilter{
Expression: "priority > 5",
},
},
EnableDeadLetteringOnFilterEvaluationExceptions: to.Ptr(true),
},
},
)func monitorSubscription(adminClient *admin.Client, topicName, subscriptionName string) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
resp, err := adminClient.GetSubscriptionRuntimeProperties(
context.Background(),
topicName,
subscriptionName,
nil,
)
if err != nil {
log.Printf("Error getting subscription properties: %v", err)
continue
}
// Alert on dead letter messages
if resp.DeadLetterMessageCount > 100 {
log.Printf("WARNING: %d messages in dead letter queue", resp.DeadLetterMessageCount)
}
// Alert on message backlog
if resp.ActiveMessageCount > 10000 {
log.Printf("High message count: %d active messages", resp.ActiveMessageCount)
}
// Check for processing
if resp.ActiveMessageCount > 0 && time.Since(resp.AccessedAt) > time.Minute*5 {
log.Printf("WARNING: Messages not being processed (last access: %s)", resp.AccessedAt)
}
}
}func createPubSubSystem(adminClient *admin.Client) error {
// Create topic
_, err := adminClient.CreateTopic(context.Background(), "events", &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
EnablePartitioning: to.Ptr(true),
SupportOrdering: to.Ptr(true),
},
})
if err != nil {
return err
}
// Create high-priority subscription
_, err = adminClient.CreateSubscription(
context.Background(),
"events",
"high-priority",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
MaxDeliveryCount: to.Ptr(int32(3)),
DeadLetteringOnMessageExpiration: to.Ptr(true),
DefaultRule: &admin.RuleProperties{
Name: "$Default",
Filter: &admin.SQLFilter{
Expression: "priority = 'high'",
},
},
},
},
)
if err != nil {
return err
}
// Create standard subscription
_, err = adminClient.CreateSubscription(
context.Background(),
"events",
"standard",
&admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
MaxDeliveryCount: to.Ptr(int32(10)),
DefaultRule: &admin.RuleProperties{
Name: "$Default",
Filter: &admin.SQLFilter{
Expression: "priority = 'standard'",
},
},
},
},
)
return err
}func scaleSubscriptionDelivery(adminClient *admin.Client, topicName, subscriptionName string, maxDelivery int32) error {
// Get current properties
resp, err := adminClient.GetSubscription(context.Background(), topicName, subscriptionName, nil)
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("subscription not found")
}
// Update max delivery count
props := resp.SubscriptionProperties
props.MaxDeliveryCount = to.Ptr(maxDelivery)
_, err = adminClient.UpdateSubscription(context.Background(), topicName, subscriptionName, props, nil)
return err
}func listSubscriptionsWithMetrics(adminClient *admin.Client, topicName string) {
pager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, sub := range page.SubscriptionRuntimeProperties {
fmt.Printf("Subscription: %s\n", sub.SubscriptionName)
fmt.Printf(" Active: %d\n", sub.ActiveMessageCount)
fmt.Printf(" Dead Letter: %d\n", sub.DeadLetterMessageCount)
fmt.Printf(" Total: %d\n", sub.TotalMessageCount)
fmt.Printf(" Last Access: %s\n", sub.AccessedAt)
fmt.Printf(" Created: %s\n", sub.CreatedAt)
fmt.Println()
}
}
}func cleanupIdleSubscriptions(adminClient *admin.Client, topicName string, idleThreshold time.Duration) error {
pager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
return err
}
for _, sub := range page.SubscriptionRuntimeProperties {
// Check if subscription is idle
if time.Since(sub.AccessedAt) > idleThreshold && sub.ActiveMessageCount == 0 {
log.Printf("Deleting idle subscription: %s (last access: %s)",
sub.SubscriptionName, sub.AccessedAt)
_, err := adminClient.DeleteSubscription(
context.Background(),
topicName,
sub.SubscriptionName,
nil,
)
if err != nil {
log.Printf("Error deleting subscription %s: %v", sub.SubscriptionName, err)
}
}
}
}
return nil
}