or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
golangpkg:golang/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus@v1.10.0

docs

admin

index.mdqueues.mdrules.mdsubscriptions.mdtopics.md
client.mderrors.mdindex.mdmessaging.mdreceiver.mdsender.mdsessions.md
tile.json

tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus

tessl install tessl/golang-github-com--azure--azure-sdk-for-go--sdk--messaging--azservicebus@1.10.1

Client module for Azure Service Bus, a highly reliable cloud messaging service providing real-time and fault-tolerant communication between distributed senders and receivers.

topics.mddocs/admin/

Topic Management

Topic management operations in the admin client enable creating, configuring, updating, and monitoring Service Bus topics for publish-subscribe messaging patterns.

Capabilities

Create Topic

func (ac *Client) CreateTopic(ctx context.Context, topicName string, options *CreateTopicOptions) (CreateTopicResponse, error)

Creates a topic with configurable properties.

Example:

resp, err := adminClient.CreateTopic(context.Background(), "events", &admin.CreateTopicOptions{
    Properties: &admin.TopicProperties{
        MaxSizeInMegabytes:                  to.Ptr(int32(2048)),
        RequiresDuplicateDetection:          to.Ptr(true),
        DefaultMessageTimeToLive:            to.Ptr("P7D"),  // 7 days
        DuplicateDetectionHistoryTimeWindow: to.Ptr("PT10M"),
        EnableBatchedOperations:             to.Ptr(true),
        EnablePartitioning:                  to.Ptr(true),
        SupportOrdering:                     to.Ptr(true),
    },
})
if err != nil {
    panic(err)
}

fmt.Printf("Created topic: %s\n", resp.TopicName)

Get Topic

func (ac *Client) GetTopic(ctx context.Context, topicName string, options *GetTopicOptions) (*GetTopicResponse, error)

Gets topic configuration properties by name. Returns nil response and nil error if the topic does not exist.

Example:

resp, err := adminClient.GetTopic(context.Background(), "events", nil)
if err != nil {
    panic(err)
}

if resp == nil {
    fmt.Println("Topic does not exist")
    return
}

fmt.Printf("Topic: %s\n", resp.TopicName)
if resp.SupportOrdering != nil && *resp.SupportOrdering {
    fmt.Println("Ordering is supported")
}
if resp.EnablePartitioning != nil && *resp.EnablePartitioning {
    fmt.Println("Partitioning is enabled")
}

Get Topic Runtime Properties

func (ac *Client) GetTopicRuntimeProperties(ctx context.Context, topicName string, options *GetTopicRuntimePropertiesOptions) (*GetTopicRuntimePropertiesResponse, error)

Gets runtime statistics for a topic including subscription count and size. Returns nil response and nil error if the topic does not exist.

Example:

resp, err := adminClient.GetTopicRuntimeProperties(context.Background(), "events", nil)
if err != nil {
    panic(err)
}

if resp != nil {
    fmt.Printf("Topic: %s\n", resp.TopicName)
    fmt.Printf("Subscription Count: %d\n", resp.SubscriptionCount)
    fmt.Printf("Scheduled Messages: %d\n", resp.ScheduledMessageCount)
    fmt.Printf("Size: %d bytes\n", resp.SizeInBytes)
    fmt.Printf("Created At: %s\n", resp.CreatedAt)
    fmt.Printf("Updated At: %s\n", resp.UpdatedAt)
}

Update Topic

func (ac *Client) UpdateTopic(ctx context.Context, topicName string, properties TopicProperties, options *UpdateTopicOptions) (UpdateTopicResponse, error)

Updates an existing topic's configuration properties.

Example:

// Get current properties
topicResp, err := adminClient.GetTopic(context.Background(), "events", nil)
if err != nil {
    panic(err)
}

// Modify properties
props := topicResp.TopicProperties
props.DefaultMessageTimeToLive = to.Ptr("P14D")  // 14 days
props.SupportOrdering = to.Ptr(true)

// Update topic
updateResp, err := adminClient.UpdateTopic(context.Background(), "events", props, nil)
if err != nil {
    panic(err)
}

fmt.Printf("Updated topic: %s\n", updateResp.TopicName)

Delete Topic

func (ac *Client) DeleteTopic(ctx context.Context, topicName string, options *DeleteTopicOptions) (DeleteTopicResponse, error)

Deletes a topic and all its subscriptions.

Example:

resp, err := adminClient.DeleteTopic(context.Background(), "events", nil)
if err != nil {
    panic(err)
}

fmt.Println("Topic and all subscriptions deleted successfully")

List Topics

func (ac *Client) NewListTopicsPager(options *ListTopicsOptions) *runtime.Pager[ListTopicsResponse]

Creates a pager for listing all topics with their configuration properties.

Example:

pager := adminClient.NewListTopicsPager(&admin.ListTopicsOptions{
    MaxPageSize: 20,
})

for pager.More() {
    page, err := pager.NextPage(context.Background())
    if err != nil {
        panic(err)
    }

    for _, topic := range page.Topics {
        fmt.Printf("Topic: %s\n", topic.TopicName)
        if topic.EnablePartitioning != nil && *topic.EnablePartitioning {
            fmt.Println("  - Partitioned")
        }
        if topic.SupportOrdering != nil && *topic.SupportOrdering {
            fmt.Println("  - Supports ordering")
        }
    }
}

List Topic Runtime Properties

func (ac *Client) NewListTopicsRuntimePropertiesPager(options *ListTopicsRuntimePropertiesOptions) *runtime.Pager[ListTopicsRuntimePropertiesResponse]

Creates a pager for listing runtime properties of all topics.

Example:

pager := adminClient.NewListTopicsRuntimePropertiesPager(&admin.ListTopicsRuntimePropertiesOptions{
    MaxPageSize: 20,
})

for pager.More() {
    page, err := pager.NextPage(context.Background())
    if err != nil {
        panic(err)
    }

    for _, topicProps := range page.TopicRuntimeProperties {
        fmt.Printf("%s: %d subscriptions, %d scheduled messages\n",
            topicProps.TopicName,
            topicProps.SubscriptionCount,
            topicProps.ScheduledMessageCount,
        )
    }
}

Types

TopicProperties

type TopicProperties struct {
    MaxSizeInMegabytes *int32
    RequiresDuplicateDetection *bool
    DefaultMessageTimeToLive *string
    DuplicateDetectionHistoryTimeWindow *string
    EnableBatchedOperations *bool
    Status *EntityStatus
    AutoDeleteOnIdle *string
    EnablePartitioning *bool
    SupportOrdering *bool
    UserMetadata *string
    AuthorizationRules []AuthorizationRule
    MaxMessageSizeInKilobytes *int64
}

Configuration properties for a topic:

  • MaxSizeInMegabytes: Maximum topic size in MB. Default: 1024 MB.
  • RequiresDuplicateDetection: Enable duplicate message detection based on MessageID.
  • DefaultMessageTimeToLive: Default message expiration time (ISO 8601 duration, e.g., "P7D" for 7 days).
  • DuplicateDetectionHistoryTimeWindow: Duplicate detection history window (ISO 8601 duration). Default: 10 minutes.
  • EnableBatchedOperations: Enable server-side batching for improved throughput.
  • Status: Topic status (Active, Disabled, SendDisabled, ReceiveDisabled).
  • AutoDeleteOnIdle: Auto-delete topic after idle period (ISO 8601 duration).
  • EnablePartitioning: Partition topic across multiple message brokers for higher throughput.
  • SupportOrdering: Maintain message order when forwarding to subscriptions.
  • UserMetadata: Custom metadata string.
  • AuthorizationRules: Security rules with keys and access rights.
  • MaxMessageSizeInKilobytes: Maximum message size in KB (Premium tier only).

TopicRuntimeProperties

type TopicRuntimeProperties struct {
    SizeInBytes int64
    CreatedAt time.Time
    UpdatedAt time.Time
    AccessedAt time.Time
    SubscriptionCount int32
    ScheduledMessageCount int32
}

Runtime statistics for a topic:

  • SizeInBytes: Current topic size in bytes
  • CreatedAt: Topic creation timestamp
  • UpdatedAt: Last update timestamp
  • AccessedAt: Last access timestamp
  • SubscriptionCount: Number of subscriptions to the topic
  • ScheduledMessageCount: Number of scheduled messages

CreateTopicOptions

type CreateTopicOptions struct {
    Properties *TopicProperties
}

Options for creating a topic:

  • Properties: Topic configuration properties

CreateTopicResponse

type CreateTopicResponse struct {
    TopicName string
    TopicProperties
}

Response from creating a topic, includes the topic name and all properties.

GetTopicOptions

type GetTopicOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting a topic.

GetTopicResponse

type GetTopicResponse struct {
    TopicName string
    TopicProperties
}

Response from getting a topic, includes the topic name and all properties.

GetTopicRuntimePropertiesOptions

type GetTopicRuntimePropertiesOptions struct {
    // Currently empty, reserved for future expansion
}

Options for getting topic runtime properties.

GetTopicRuntimePropertiesResponse

type GetTopicRuntimePropertiesResponse struct {
    TopicName string
    TopicRuntimeProperties
}

Response from getting topic runtime properties, includes the topic name and all runtime statistics.

UpdateTopicOptions

type UpdateTopicOptions struct {
    // Currently empty, reserved for future expansion
}

Options for updating a topic.

UpdateTopicResponse

type UpdateTopicResponse struct {
    TopicName string
    TopicProperties
}

Response from updating a topic, includes the topic name and updated properties.

DeleteTopicOptions

type DeleteTopicOptions struct {
    // Currently empty, reserved for future expansion
}

Options for deleting a topic.

DeleteTopicResponse

type DeleteTopicResponse struct {
    Value *TopicProperties
}

Response from deleting a topic.

ListTopicsOptions

type ListTopicsOptions struct {
    MaxPageSize int32
}

Options for listing topics:

  • MaxPageSize: Maximum number of topics per page

ListTopicsResponse

type ListTopicsResponse struct {
    Topics []TopicItem
}

Response from listing topics.

TopicItem

type TopicItem struct {
    TopicName string
    TopicProperties
}

A single topic item in the list response.

ListTopicsRuntimePropertiesOptions

type ListTopicsRuntimePropertiesOptions struct {
    MaxPageSize int32
}

Options for listing topic runtime properties:

  • MaxPageSize: Maximum number of topics per page

ListTopicsRuntimePropertiesResponse

type ListTopicsRuntimePropertiesResponse struct {
    TopicRuntimeProperties []TopicRuntimePropertiesItem
}

Response from listing topic runtime properties.

TopicRuntimePropertiesItem

type TopicRuntimePropertiesItem struct {
    TopicName string
    TopicRuntimeProperties
}

A single topic runtime properties item in the list response.

Usage Patterns

Create Topic with Ordering Support

resp, err := adminClient.CreateTopic(context.Background(), "ordered-events", &admin.CreateTopicOptions{
    Properties: &admin.TopicProperties{
        SupportOrdering:          to.Ptr(true),
        DefaultMessageTimeToLive: to.Ptr("P1D"),
    },
})

Create Partitioned Topic for High Throughput

resp, err := adminClient.CreateTopic(context.Background(), "high-volume-events", &admin.CreateTopicOptions{
    Properties: &admin.TopicProperties{
        EnablePartitioning: to.Ptr(true),
        MaxSizeInMegabytes: to.Ptr(int32(5120)),  // Larger size for partitioned topics
    },
})

Create Topic with Duplicate Detection

resp, err := adminClient.CreateTopic(context.Background(), "idempotent-events", &admin.CreateTopicOptions{
    Properties: &admin.TopicProperties{
        RequiresDuplicateDetection:          to.Ptr(true),
        DuplicateDetectionHistoryTimeWindow: to.Ptr("PT1H"),  // 1 hour window
    },
})

Monitor Topic and Subscriptions

func monitorTopic(adminClient *admin.Client, topicName string) {
    // Get topic runtime properties
    topicResp, err := adminClient.GetTopicRuntimeProperties(context.Background(), topicName, nil)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Topic: %s\n", topicResp.TopicName)
    fmt.Printf("Subscriptions: %d\n", topicResp.SubscriptionCount)
    fmt.Printf("Scheduled Messages: %d\n", topicResp.ScheduledMessageCount)
    fmt.Printf("Size: %d bytes\n", topicResp.SizeInBytes)

    // List all subscriptions
    subPager := adminClient.NewListSubscriptionsRuntimePropertiesPager(topicName, nil)
    for subPager.More() {
        page, err := subPager.NextPage(context.Background())
        if err != nil {
            panic(err)
        }

        for _, sub := range page.SubscriptionRuntimeProperties {
            fmt.Printf("  Subscription: %s\n", sub.SubscriptionName)
            fmt.Printf("    Active: %d, DeadLetter: %d\n",
                sub.ActiveMessageCount,
                sub.DeadLetterMessageCount,
            )
        }
    }
}

Create Topic with Complete Configuration

func createProductionTopic(adminClient *admin.Client, topicName string) error {
    _, err := adminClient.CreateTopic(context.Background(), topicName, &admin.CreateTopicOptions{
        Properties: &admin.TopicProperties{
            // Message handling
            DefaultMessageTimeToLive:            to.Ptr("P7D"),
            RequiresDuplicateDetection:          to.Ptr(true),
            DuplicateDetectionHistoryTimeWindow: to.Ptr("PT30M"),

            // Performance
            EnableBatchedOperations: to.Ptr(true),
            EnablePartitioning:      to.Ptr(true),
            MaxSizeInMegabytes:      to.Ptr(int32(5120)),

            // Ordering
            SupportOrdering: to.Ptr(true),

            // Metadata
            UserMetadata: to.Ptr("Production event topic"),
        },
    })

    return err
}

Update Topic Configuration

func updateTopicTTL(adminClient *admin.Client, topicName string, ttl string) error {
    // Get current properties
    resp, err := adminClient.GetTopic(context.Background(), topicName, nil)
    if err != nil {
        return err
    }

    if resp == nil {
        return fmt.Errorf("topic not found")
    }

    // Update TTL
    props := resp.TopicProperties
    props.DefaultMessageTimeToLive = to.Ptr(ttl)

    _, err = adminClient.UpdateTopic(context.Background(), topicName, props, nil)
    return err
}

List Topics with Filtering

func listPartitionedTopics(adminClient *admin.Client) []string {
    var partitionedTopics []string

    pager := adminClient.NewListTopicsPager(nil)
    for pager.More() {
        page, err := pager.NextPage(context.Background())
        if err != nil {
            panic(err)
        }

        for _, topic := range page.Topics {
            if topic.EnablePartitioning != nil && *topic.EnablePartitioning {
                partitionedTopics = append(partitionedTopics, topic.TopicName)
            }
        }
    }

    return partitionedTopics
}

Topic Health Check

func checkTopicHealth(adminClient *admin.Client, topicName string) error {
    resp, err := adminClient.GetTopicRuntimeProperties(context.Background(), topicName, nil)
    if err != nil {
        return fmt.Errorf("failed to get topic properties: %w", err)
    }

    if resp == nil {
        return fmt.Errorf("topic does not exist")
    }

    // Check if topic has subscriptions
    if resp.SubscriptionCount == 0 {
        return fmt.Errorf("topic has no subscriptions")
    }

    // Check topic size
    maxSizeBytes := int64(5 * 1024 * 1024 * 1024)  // 5 GB
    if resp.SizeInBytes > maxSizeBytes {
        return fmt.Errorf("topic size (%d bytes) exceeds threshold", resp.SizeInBytes)
    }

    return nil
}