tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.
Topic management operations in the admin client enable creating, configuring, updating, and monitoring Service Bus topics for publish-subscribe messaging patterns.
func (ac *Client) CreateTopic(ctx context.Context, topicName string, options *CreateTopicOptions) (CreateTopicResponse, error)Creates a topic with configurable properties.
Example:
resp, err := adminClient.CreateTopic(context.Background(), "events", &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
MaxSizeInMegabytes: to.Ptr(int32(2048)),
RequiresDuplicateDetection: to.Ptr(true),
DefaultMessageTimeToLive: to.Ptr("P7D"), // 7 days
DuplicateDetectionHistoryTimeWindow: to.Ptr("PT10M"),
EnableBatchedOperations: to.Ptr(true),
EnablePartitioning: to.Ptr(true),
SupportOrdering: to.Ptr(true),
},
})
if err != nil {
panic(err)
}
fmt.Printf("Created topic: %s\n", resp.TopicName)func (ac *Client) GetTopic(ctx context.Context, topicName string, options *GetTopicOptions) (*GetTopicResponse, error)Gets topic configuration properties by name. Returns nil response and nil error if the topic does not exist.
Example:
resp, err := adminClient.GetTopic(context.Background(), "events", nil)
if err != nil {
panic(err)
}
if resp == nil {
fmt.Println("Topic does not exist")
return
}
fmt.Printf("Topic: %s\n", resp.TopicName)
if resp.SupportOrdering != nil && *resp.SupportOrdering {
fmt.Println("Ordering is supported")
}
if resp.EnablePartitioning != nil && *resp.EnablePartitioning {
fmt.Println("Partitioning is enabled")
}func (ac *Client) GetTopicRuntimeProperties(ctx context.Context, topicName string, options *GetTopicRuntimePropertiesOptions) (*GetTopicRuntimePropertiesResponse, error)Gets runtime statistics for a topic including subscription count and size. Returns nil response and nil error if the topic does not exist.
Example:
resp, err := adminClient.GetTopicRuntimeProperties(context.Background(), "events", nil)
if err != nil {
panic(err)
}
if resp != nil {
fmt.Printf("Topic: %s\n", resp.TopicName)
fmt.Printf("Subscription Count: %d\n", resp.SubscriptionCount)
fmt.Printf("Scheduled Messages: %d\n", resp.ScheduledMessageCount)
fmt.Printf("Size: %d bytes\n", resp.SizeInBytes)
fmt.Printf("Created At: %s\n", resp.CreatedAt)
fmt.Printf("Updated At: %s\n", resp.UpdatedAt)
}func (ac *Client) UpdateTopic(ctx context.Context, topicName string, properties TopicProperties, options *UpdateTopicOptions) (UpdateTopicResponse, error)Updates an existing topic's configuration properties.
Example:
// Get current properties
topicResp, err := adminClient.GetTopic(context.Background(), "events", nil)
if err != nil {
panic(err)
}
// Modify properties
props := topicResp.TopicProperties
props.DefaultMessageTimeToLive = to.Ptr("P14D") // 14 days
props.SupportOrdering = to.Ptr(true)
// Update topic
updateResp, err := adminClient.UpdateTopic(context.Background(), "events", props, nil)
if err != nil {
panic(err)
}
fmt.Printf("Updated topic: %s\n", updateResp.TopicName)func (ac *Client) DeleteTopic(ctx context.Context, topicName string, options *DeleteTopicOptions) (DeleteTopicResponse, error)Deletes a topic and all its subscriptions.
Example:
resp, err := adminClient.DeleteTopic(context.Background(), "events", nil)
if err != nil {
panic(err)
}
fmt.Println("Topic and all subscriptions deleted successfully")func (ac *Client) NewListTopicsPager(options *ListTopicsOptions) *runtime.Pager[ListTopicsResponse]Creates a pager for listing all topics with their configuration properties.
Example:
pager := adminClient.NewListTopicsPager(&admin.ListTopicsOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, topic := range page.Topics {
fmt.Printf("Topic: %s\n", topic.TopicName)
if topic.EnablePartitioning != nil && *topic.EnablePartitioning {
fmt.Println(" - Partitioned")
}
if topic.SupportOrdering != nil && *topic.SupportOrdering {
fmt.Println(" - Supports ordering")
}
}
}func (ac *Client) NewListTopicsRuntimePropertiesPager(options *ListTopicsRuntimePropertiesOptions) *runtime.Pager[ListTopicsRuntimePropertiesResponse]Creates a pager for listing runtime properties of all topics.
Example:
pager := adminClient.NewListTopicsRuntimePropertiesPager(&admin.ListTopicsRuntimePropertiesOptions{
MaxPageSize: 20,
})
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, topicProps := range page.TopicRuntimeProperties {
fmt.Printf("%s: %d subscriptions, %d scheduled messages\n",
topicProps.TopicName,
topicProps.SubscriptionCount,
topicProps.ScheduledMessageCount,
)
}
}type TopicProperties struct {
MaxSizeInMegabytes *int32
RequiresDuplicateDetection *bool
DefaultMessageTimeToLive *string
DuplicateDetectionHistoryTimeWindow *string
EnableBatchedOperations *bool
Status *EntityStatus
AutoDeleteOnIdle *string
EnablePartitioning *bool
SupportOrdering *bool
UserMetadata *string
AuthorizationRules []AuthorizationRule
MaxMessageSizeInKilobytes *int64
}Configuration properties for a topic:
type TopicRuntimeProperties struct {
SizeInBytes int64
CreatedAt time.Time
UpdatedAt time.Time
AccessedAt time.Time
SubscriptionCount int32
ScheduledMessageCount int32
}Runtime statistics for a topic:
type CreateTopicOptions struct {
Properties *TopicProperties
}Options for creating a topic:
type CreateTopicResponse struct {
TopicName string
TopicProperties
}Response from creating a topic, includes the topic name and all properties.
type GetTopicOptions struct {
// Currently empty, reserved for future expansion
}Options for getting a topic.
type GetTopicResponse struct {
TopicName string
TopicProperties
}Response from getting a topic, includes the topic name and all properties.
type GetTopicRuntimePropertiesOptions struct {
// Currently empty, reserved for future expansion
}Options for getting topic runtime properties.
type GetTopicRuntimePropertiesResponse struct {
TopicName string
TopicRuntimeProperties
}Response from getting topic runtime properties, includes the topic name and all runtime statistics.
type UpdateTopicOptions struct {
// Currently empty, reserved for future expansion
}Options for updating a topic.
type UpdateTopicResponse struct {
TopicName string
TopicProperties
}Response from updating a topic, includes the topic name and updated properties.
type DeleteTopicOptions struct {
// Currently empty, reserved for future expansion
}Options for deleting a topic.
type DeleteTopicResponse struct {
Value *TopicProperties
}Response from deleting a topic.
type ListTopicsOptions struct {
MaxPageSize int32
}Options for listing topics:
type ListTopicsResponse struct {
Topics []TopicItem
}Response from listing topics.
type TopicItem struct {
TopicName string
TopicProperties
}A single topic item in the list response.
type ListTopicsRuntimePropertiesOptions struct {
MaxPageSize int32
}Options for listing topic runtime properties:
type ListTopicsRuntimePropertiesResponse struct {
TopicRuntimeProperties []TopicRuntimePropertiesItem
}Response from listing topic runtime properties.
type TopicRuntimePropertiesItem struct {
TopicName string
TopicRuntimeProperties
}A single topic runtime properties item in the list response.
resp, err := adminClient.CreateTopic(context.Background(), "ordered-events", &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
SupportOrdering: to.Ptr(true),
DefaultMessageTimeToLive: to.Ptr("P1D"),
},
})resp, err := adminClient.CreateTopic(context.Background(), "high-volume-events", &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
EnablePartitioning: to.Ptr(true),
MaxSizeInMegabytes: to.Ptr(int32(5120)), // Larger size for partitioned topics
},
})resp, err := adminClient.CreateTopic(context.Background(), "idempotent-events", &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
RequiresDuplicateDetection: to.Ptr(true),
DuplicateDetectionHistoryTimeWindow: to.Ptr("PT1H"), // 1 hour window
},
})func monitorTopic(adminClient *admin.Client, topicName string) {
// Get topic runtime properties
topicResp, err := adminClient.GetTopicRuntimeProperties(context.Background(), topicName, nil)
if err != nil {
panic(err)
}
fmt.Printf("Topic: %s\n", topicResp.TopicName)
fmt.Printf("Subscriptions: %d\n", topicResp.SubscriptionCount)
fmt.Printf("Scheduled Messages: %d\n", topicResp.ScheduledMessageCount)
fmt.Printf("Size: %d bytes\n", topicResp.SizeInBytes)
// List all subscriptions
subPager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)
for subPager.More() {
page, err := subPager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, sub := range page.SubscriptionRuntimeProperties {
fmt.Printf(" Subscription: %s\n", sub.SubscriptionName)
fmt.Printf(" Active: %d, DeadLetter: %d\n",
sub.ActiveMessageCount,
sub.DeadLetterMessageCount,
)
}
}
}func createProductionTopic(adminClient *admin.Client, topicName string) error {
_, err := adminClient.CreateTopic(context.Background(), topicName, &admin.CreateTopicOptions{
Properties: &admin.TopicProperties{
// Message handling
DefaultMessageTimeToLive: to.Ptr("P7D"),
RequiresDuplicateDetection: to.Ptr(true),
DuplicateDetectionHistoryTimeWindow: to.Ptr("PT30M"),
// Performance
EnableBatchedOperations: to.Ptr(true),
EnablePartitioning: to.Ptr(true),
MaxSizeInMegabytes: to.Ptr(int32(5120)),
// Ordering
SupportOrdering: to.Ptr(true),
// Metadata
UserMetadata: to.Ptr("Production event topic"),
},
})
return err
}func updateTopicTTL(adminClient *admin.Client, topicName string, ttl string) error {
// Get current properties
resp, err := adminClient.GetTopic(context.Background(), topicName, nil)
if err != nil {
return err
}
if resp == nil {
return fmt.Errorf("topic not found")
}
// Update TTL
props := resp.TopicProperties
props.DefaultMessageTimeToLive = to.Ptr(ttl)
_, err = adminClient.UpdateTopic(context.Background(), topicName, props, nil)
return err
}func listPartitionedTopics(adminClient *admin.Client) []string {
var partitionedTopics []string
pager := adminClient.NewListTopicsPager(nil)
for pager.More() {
page, err := pager.NextPage(context.Background())
if err != nil {
panic(err)
}
for _, topic := range page.Topics {
if topic.EnablePartitioning != nil && *topic.EnablePartitioning {
partitionedTopics = append(partitionedTopics, topic.TopicName)
}
}
}
return partitionedTopics
}func checkTopicHealth(adminClient *admin.Client, topicName string) error {
resp, err := adminClient.GetTopicRuntimeProperties(context.Background(), topicName, nil)
if err != nil {
return fmt.Errorf("failed to get topic properties: %w", err)
}
if resp == nil {
return fmt.Errorf("topic does not exist")
}
// Check if topic has subscriptions
if resp.SubscriptionCount == 0 {
return fmt.Errorf("topic has no subscriptions")
}
// Check topic size
maxSizeBytes := int64(5 * 1024 * 1024 * 1024) // 5 GB
if resp.SizeInBytes > maxSizeBytes {
return fmt.Errorf("topic size (%d bytes) exceeds threshold", resp.SizeInBytes)
}
return nil
}